GET /xrpc/app.bsky.actor.searchActorsTypeahead
typeahead.waow.tech
1const std = @import("std");
2const mem = std.mem;
3const Allocator = mem.Allocator;
4const zat = @import("zat");
5const logfire = @import("logfire");
6const HttpTransport = zat.HttpTransport;
7const LocalDb = @import("db/LocalDb.zig");
8const sync = @import("db/sync.zig");
9const server = @import("server.zig");
10const ingest = @import("ingest.zig");
11
12const log = std.log.scoped(.ingester);
13
14// override debug_io with a real threaded implementation so Io.Mutex,
15// io.sleep, and network ops work concurrently across threads.
16// without this, std.Options.debug_io uses global_single_threaded which
17// silently serializes all I/O.
18var app_threaded_io: std.Io.Threaded = undefined;
19pub const std_options_debug_threaded_io: ?*std.Io.Threaded = &app_threaded_io;
20const io = std.Options.debug_io;
21
22const SOCKET_TIMEOUT_SECS = 5;
23
24// graceful shutdown: SIGTERM → close listening socket → accept breaks → drain tasks
25var listen_fd: std.atomic.Value(std.posix.fd_t) = std.atomic.Value(std.posix.fd_t).init(-1);
26
27fn handleSigterm(_: std.posix.SIG) callconv(.c) void {
28 const fd = listen_fd.load(.acquire);
29 if (fd >= 0) _ = std.posix.system.shutdown(fd, 2); // SHUT_RDWR
30}
31
32fn cGetenv(name: [*:0]const u8) ?[]const u8 {
33 if (std.c.getenv(name)) |p| return mem.span(p);
34 return null;
35}
36
37const JetstreamArgs = struct {
38 allocator: Allocator,
39 config: ingest.Config,
40 transport: *HttpTransport,
41 local_db: ?*LocalDb,
42};
43
44fn jetstreamThread(args: JetstreamArgs) void {
45 var handler = ingest.IngestHandler.init(
46 args.allocator,
47 args.config,
48 args.transport,
49 args.local_db,
50 ) catch |err| {
51 log.err("handler init failed: {}", .{err});
52 return;
53 };
54 defer handler.deinit();
55
56 var client = zat.JetstreamClient.init(io, args.allocator, .{
57 .wanted_collections = &.{
58 "app.bsky.actor.profile",
59 "app.bsky.feed.post",
60 "app.bsky.feed.like",
61 "app.bsky.graph.follow",
62 "sh.tangled.actor.profile",
63 },
64 .cursor = ingest.fetchCursor(args.transport, args.config),
65 });
66 defer client.deinit();
67
68 client.subscribe(&handler) catch |err| {
69 log.err("jetstream subscribe failed: {}", .{err});
70 };
71}
72
73fn setSocketTimeout(fd: std.posix.fd_t, secs: u32) !void {
74 const timeout = mem.toBytes(std.posix.timeval{
75 .sec = @intCast(secs),
76 .usec = 0,
77 });
78 try std.posix.setsockopt(fd, std.posix.SOL.SOCKET, std.posix.SO.RCVTIMEO, &timeout);
79 try std.posix.setsockopt(fd, std.posix.SOL.SOCKET, std.posix.SO.SNDTIMEO, &timeout);
80}
81
82pub fn main() !void {
83 const allocator = std.heap.smp_allocator;
84 app_threaded_io = std.Io.Threaded.init(allocator, .{
85 .stack_size = 8 * 1024 * 1024, // 8MB (default 16MB is wasteful on 256MB VM)
86 .async_limit = std.Io.Limit.limited(8), // bounded HTTP handler pool
87 });
88
89 // configure logfire (reads LOGFIRE_WRITE_TOKEN from env)
90 _ = logfire.configure(.{
91 .service_name = "typeahead-ingester",
92 .service_version = "0.1.0",
93 .environment = cGetenv("FLY_APP_NAME") orelse "development",
94 }) catch |err| {
95 log.err("logfire configure failed: {}", .{err});
96 };
97
98 const config = ingest.getConfig();
99 log.info("typeahead ingester → {s}", .{config.worker_url});
100
101 // open local SQLite (optional — graceful degradation if no volume)
102 var local_db = LocalDb.init(allocator);
103 var local_db_ptr: ?*LocalDb = null;
104 local_db.open() catch |err| {
105 log.warn("local db failed to open: {}, search will be unavailable", .{err});
106 };
107 if (local_db.conn != null) {
108 local_db_ptr = &local_db;
109 }
110 defer local_db.deinit();
111
112 // start HTTP server FIRST so Fly proxy doesn't timeout
113 const port: u16 = blk: {
114 const port_str = cGetenv("PORT") orelse "8080";
115 break :blk std.fmt.parseInt(u16, port_str, 10) catch 8080;
116 };
117
118 var addr = std.Io.net.IpAddress{ .ip4 = .{ .bytes = .{ 0, 0, 0, 0 }, .port = port } };
119 var srv = std.Io.net.IpAddress.listen(&addr, io, .{ .reuse_address = true }) catch |err| {
120 log.err("listen failed: {}", .{err});
121 return err;
122 };
123 defer srv.deinit(io);
124
125 log.info("listening on port {d}", .{port});
126
127 // register SIGTERM for graceful shutdown (Fly sends SIGTERM before stopping)
128 listen_fd.store(srv.socket.handle, .release);
129 std.posix.sigaction(.TERM, &.{
130 .handler = .{ .handler = handleSigterm },
131 .mask = mem.zeroes(std.posix.sigset_t),
132 .flags = 0,
133 }, null);
134
135 // task group: .concurrent for long-lived background tasks,
136 // .async (bounded pool, inline fallback) for HTTP handlers
137 var tasks: std.Io.Group = .init;
138
139 // start sync (background — turso → local SQLite)
140 if (local_db_ptr) |db| {
141 tasks.concurrent(io, sync.syncLoop, .{ allocator, db }) catch |err| {
142 log.err("sync task failed: {}", .{err});
143 };
144 }
145
146 // start jetstream (background — ingestion)
147 var transport = HttpTransport.init(io, allocator);
148 transport.keep_alive = false;
149 defer transport.deinit();
150
151 tasks.concurrent(io, jetstreamThread, .{JetstreamArgs{
152 .allocator = allocator,
153 .config = config,
154 .transport = &transport,
155 .local_db = local_db_ptr,
156 }}) catch |err| {
157 log.err("jetstream task failed: {}", .{err});
158 };
159
160 // main thread: HTTP accept loop
161 while (true) {
162 const stream = srv.accept(io) catch |err| switch (err) {
163 error.SocketNotListening => break,
164 else => {
165 log.err("accept error: {}", .{err});
166 continue;
167 },
168 };
169
170 setSocketTimeout(stream.socket.handle, SOCKET_TIMEOUT_SECS) catch |err| {
171 log.warn("failed to set socket timeout: {}", .{err});
172 };
173
174 if (local_db_ptr) |db| {
175 // bounded pool — inline fallback under load (graceful degradation)
176 tasks.async(io, server.handleConnection, .{ stream, db });
177 } else {
178 // no local DB — just serve health
179 var read_buffer: [1024]u8 = undefined;
180 var write_buffer: [1024]u8 = undefined;
181 var reader = stream.reader(io, &read_buffer);
182 var writer = stream.writer(io, &write_buffer);
183 var http_srv = std.http.Server.init(&reader.interface, &writer.interface);
184 var request = http_srv.receiveHead() catch {
185 stream.close(io);
186 continue;
187 };
188 request.respond("{\"error\":\"search unavailable\"}", .{
189 .status = .service_unavailable,
190 .extra_headers = &.{
191 .{ .name = "content-type", .value = "application/json" },
192 },
193 }) catch {};
194 stream.close(io);
195 }
196 }
197
198 log.info("SIGTERM received, draining tasks…", .{});
199 tasks.cancel(io);
200 log.info("shutdown complete", .{});
201}