GET /xrpc/app.bsky.actor.searchActorsTypeahead
typeahead.waow.tech
1const std = @import("std");
2const mem = std.mem;
3const net = std.net;
4const posix = std.posix;
5const Allocator = mem.Allocator;
6const Thread = std.Thread;
7const zat = @import("zat");
8const HttpTransport = zat.HttpTransport;
9const LocalDb = @import("db/LocalDb.zig");
10const sync = @import("db/sync.zig");
11const server = @import("server.zig");
12const ingest = @import("ingest.zig");
13
14const log = std.log.scoped(.ingester);
15
16const MAX_HTTP_WORKERS = 8;
17const SOCKET_TIMEOUT_SECS = 5;
18
19const JetstreamArgs = struct {
20 allocator: Allocator,
21 config: ingest.Config,
22 transport: *HttpTransport,
23 local_db: ?*LocalDb,
24};
25
26fn jetstreamThread(args: JetstreamArgs) void {
27 var handler = ingest.IngestHandler.init(
28 args.allocator,
29 args.config,
30 args.transport,
31 args.local_db,
32 ) catch |err| {
33 log.err("handler init failed: {}", .{err});
34 return;
35 };
36 defer handler.deinit();
37
38 var client = zat.JetstreamClient.init(args.allocator, .{
39 .wanted_collections = &.{
40 "app.bsky.actor.profile",
41 "app.bsky.feed.post",
42 "app.bsky.feed.like",
43 "app.bsky.graph.follow",
44 },
45 .cursor = ingest.fetchCursor(args.transport, args.config),
46 });
47 defer client.deinit();
48
49 client.subscribe(&handler);
50}
51
52fn setSocketTimeout(fd: posix.fd_t, secs: u32) !void {
53 const timeout = mem.toBytes(posix.timeval{
54 .sec = @intCast(secs),
55 .usec = 0,
56 });
57 try posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.RCVTIMEO, &timeout);
58 try posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.SNDTIMEO, &timeout);
59}
60
61pub fn main() !void {
62 var gpa = std.heap.GeneralPurposeAllocator(.{}){};
63 defer _ = gpa.deinit();
64 const allocator = gpa.allocator();
65
66 const config = ingest.getConfig();
67 log.info("typeahead ingester → {s}", .{config.worker_url});
68
69 // open local SQLite (optional — graceful degradation if no volume)
70 var local_db = LocalDb.init(allocator);
71 var local_db_ptr: ?*LocalDb = null;
72 local_db.open() catch |err| {
73 log.warn("local db failed to open: {}, search will be unavailable", .{err});
74 };
75 if (local_db.conn != null) {
76 local_db_ptr = &local_db;
77 }
78 defer local_db.deinit();
79
80 // start HTTP server FIRST so Fly proxy doesn't timeout
81 const port: u16 = blk: {
82 const port_str = posix.getenv("PORT") orelse "8080";
83 break :blk std.fmt.parseInt(u16, port_str, 10) catch 8080;
84 };
85
86 const address = try net.Address.parseIp("0.0.0.0", port);
87 var listener = try address.listen(.{ .reuse_address = true });
88 defer listener.deinit();
89
90 log.info("listening on port {d}", .{port});
91
92 // init thread pool for HTTP connections
93 var pool: Thread.Pool = undefined;
94 try pool.init(.{
95 .allocator = allocator,
96 .n_jobs = MAX_HTTP_WORKERS,
97 });
98 defer pool.deinit();
99
100 // start sync thread (background — turso → local SQLite)
101 if (local_db_ptr) |db| {
102 const sync_thread = try Thread.spawn(.{}, sync.syncLoop, .{ allocator, db });
103 sync_thread.detach();
104 }
105
106 // start jetstream thread (background — ingestion)
107 var transport = HttpTransport.init(allocator);
108 transport.keep_alive = false;
109 defer transport.deinit();
110
111 const js_thread = try Thread.spawn(.{}, jetstreamThread, .{JetstreamArgs{
112 .allocator = allocator,
113 .config = config,
114 .transport = &transport,
115 .local_db = local_db_ptr,
116 }});
117 js_thread.detach();
118
119 // main thread: HTTP accept loop
120 while (true) {
121 const conn = listener.accept() catch |err| {
122 log.err("accept error: {}", .{err});
123 continue;
124 };
125
126 setSocketTimeout(conn.stream.handle, SOCKET_TIMEOUT_SECS) catch |err| {
127 log.warn("failed to set socket timeout: {}", .{err});
128 };
129
130 if (local_db_ptr) |db| {
131 pool.spawn(server.handleConnection, .{ conn, db }) catch |err| {
132 log.err("pool spawn error: {}", .{err});
133 conn.stream.close();
134 };
135 } else {
136 // no local DB — just serve health
137 var read_buffer: [1024]u8 = undefined;
138 var write_buffer: [1024]u8 = undefined;
139 var reader = conn.stream.reader(&read_buffer);
140 var writer = conn.stream.writer(&write_buffer);
141 var srv = std.http.Server.init(reader.interface(), &writer.interface);
142 var request = srv.receiveHead() catch {
143 conn.stream.close();
144 continue;
145 };
146 request.respond("{\"error\":\"search unavailable\"}", .{
147 .status = .service_unavailable,
148 .extra_headers = &.{
149 .{ .name = "content-type", .value = "application/json" },
150 },
151 }) catch {};
152 conn.stream.close();
153 }
154 }
155}