GET /xrpc/app.bsky.actor.searchActorsTypeahead typeahead.waow.tech
16
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 201 lines 7.0 kB view raw
1const std = @import("std"); 2const mem = std.mem; 3const Allocator = mem.Allocator; 4const zat = @import("zat"); 5const logfire = @import("logfire"); 6const HttpTransport = zat.HttpTransport; 7const LocalDb = @import("db/LocalDb.zig"); 8const sync = @import("db/sync.zig"); 9const server = @import("server.zig"); 10const ingest = @import("ingest.zig"); 11 12const log = std.log.scoped(.ingester); 13 14// override debug_io with a real threaded implementation so Io.Mutex, 15// io.sleep, and network ops work concurrently across threads. 16// without this, std.Options.debug_io uses global_single_threaded which 17// silently serializes all I/O. 18var app_threaded_io: std.Io.Threaded = undefined; 19pub const std_options_debug_threaded_io: ?*std.Io.Threaded = &app_threaded_io; 20const io = std.Options.debug_io; 21 22const SOCKET_TIMEOUT_SECS = 5; 23 24// graceful shutdown: SIGTERM → close listening socket → accept breaks → drain tasks 25var listen_fd: std.atomic.Value(std.posix.fd_t) = std.atomic.Value(std.posix.fd_t).init(-1); 26 27fn handleSigterm(_: std.posix.SIG) callconv(.c) void { 28 const fd = listen_fd.load(.acquire); 29 if (fd >= 0) _ = std.posix.system.shutdown(fd, 2); // SHUT_RDWR 30} 31 32fn cGetenv(name: [*:0]const u8) ?[]const u8 { 33 if (std.c.getenv(name)) |p| return mem.span(p); 34 return null; 35} 36 37const JetstreamArgs = struct { 38 allocator: Allocator, 39 config: ingest.Config, 40 transport: *HttpTransport, 41 local_db: ?*LocalDb, 42}; 43 44fn jetstreamThread(args: JetstreamArgs) void { 45 var handler = ingest.IngestHandler.init( 46 args.allocator, 47 args.config, 48 args.transport, 49 args.local_db, 50 ) catch |err| { 51 log.err("handler init failed: {}", .{err}); 52 return; 53 }; 54 defer handler.deinit(); 55 56 var client = zat.JetstreamClient.init(io, args.allocator, .{ 57 .wanted_collections = &.{ 58 "app.bsky.actor.profile", 59 "app.bsky.feed.post", 60 "app.bsky.feed.like", 61 "app.bsky.graph.follow", 62 "sh.tangled.actor.profile", 63 }, 64 .cursor = ingest.fetchCursor(args.transport, args.config), 65 }); 66 defer client.deinit(); 67 68 client.subscribe(&handler) catch |err| { 69 log.err("jetstream subscribe failed: {}", .{err}); 70 }; 71} 72 73fn setSocketTimeout(fd: std.posix.fd_t, secs: u32) !void { 74 const timeout = mem.toBytes(std.posix.timeval{ 75 .sec = @intCast(secs), 76 .usec = 0, 77 }); 78 try std.posix.setsockopt(fd, std.posix.SOL.SOCKET, std.posix.SO.RCVTIMEO, &timeout); 79 try std.posix.setsockopt(fd, std.posix.SOL.SOCKET, std.posix.SO.SNDTIMEO, &timeout); 80} 81 82pub fn main() !void { 83 const allocator = std.heap.smp_allocator; 84 app_threaded_io = std.Io.Threaded.init(allocator, .{ 85 .stack_size = 8 * 1024 * 1024, // 8MB (default 16MB is wasteful on 256MB VM) 86 .async_limit = std.Io.Limit.limited(8), // bounded HTTP handler pool 87 }); 88 89 // configure logfire (reads LOGFIRE_WRITE_TOKEN from env) 90 _ = logfire.configure(.{ 91 .service_name = "typeahead-ingester", 92 .service_version = "0.1.0", 93 .environment = cGetenv("FLY_APP_NAME") orelse "development", 94 }) catch |err| { 95 log.err("logfire configure failed: {}", .{err}); 96 }; 97 98 const config = ingest.getConfig(); 99 log.info("typeahead ingester → {s}", .{config.worker_url}); 100 101 // open local SQLite (optional — graceful degradation if no volume) 102 var local_db = LocalDb.init(allocator); 103 var local_db_ptr: ?*LocalDb = null; 104 local_db.open() catch |err| { 105 log.warn("local db failed to open: {}, search will be unavailable", .{err}); 106 }; 107 if (local_db.conn != null) { 108 local_db_ptr = &local_db; 109 } 110 defer local_db.deinit(); 111 112 // start HTTP server FIRST so Fly proxy doesn't timeout 113 const port: u16 = blk: { 114 const port_str = cGetenv("PORT") orelse "8080"; 115 break :blk std.fmt.parseInt(u16, port_str, 10) catch 8080; 116 }; 117 118 var addr = std.Io.net.IpAddress{ .ip4 = .{ .bytes = .{ 0, 0, 0, 0 }, .port = port } }; 119 var srv = std.Io.net.IpAddress.listen(&addr, io, .{ .reuse_address = true }) catch |err| { 120 log.err("listen failed: {}", .{err}); 121 return err; 122 }; 123 defer srv.deinit(io); 124 125 log.info("listening on port {d}", .{port}); 126 127 // register SIGTERM for graceful shutdown (Fly sends SIGTERM before stopping) 128 listen_fd.store(srv.socket.handle, .release); 129 std.posix.sigaction(.TERM, &.{ 130 .handler = .{ .handler = handleSigterm }, 131 .mask = mem.zeroes(std.posix.sigset_t), 132 .flags = 0, 133 }, null); 134 135 // task group: .concurrent for long-lived background tasks, 136 // .async (bounded pool, inline fallback) for HTTP handlers 137 var tasks: std.Io.Group = .init; 138 139 // start sync (background — turso → local SQLite) 140 if (local_db_ptr) |db| { 141 tasks.concurrent(io, sync.syncLoop, .{ allocator, db }) catch |err| { 142 log.err("sync task failed: {}", .{err}); 143 }; 144 } 145 146 // start jetstream (background — ingestion) 147 var transport = HttpTransport.init(io, allocator); 148 transport.keep_alive = false; 149 defer transport.deinit(); 150 151 tasks.concurrent(io, jetstreamThread, .{JetstreamArgs{ 152 .allocator = allocator, 153 .config = config, 154 .transport = &transport, 155 .local_db = local_db_ptr, 156 }}) catch |err| { 157 log.err("jetstream task failed: {}", .{err}); 158 }; 159 160 // main thread: HTTP accept loop 161 while (true) { 162 const stream = srv.accept(io) catch |err| switch (err) { 163 error.SocketNotListening => break, 164 else => { 165 log.err("accept error: {}", .{err}); 166 continue; 167 }, 168 }; 169 170 setSocketTimeout(stream.socket.handle, SOCKET_TIMEOUT_SECS) catch |err| { 171 log.warn("failed to set socket timeout: {}", .{err}); 172 }; 173 174 if (local_db_ptr) |db| { 175 // bounded pool — inline fallback under load (graceful degradation) 176 tasks.async(io, server.handleConnection, .{ stream, db }); 177 } else { 178 // no local DB — just serve health 179 var read_buffer: [1024]u8 = undefined; 180 var write_buffer: [1024]u8 = undefined; 181 var reader = stream.reader(io, &read_buffer); 182 var writer = stream.writer(io, &write_buffer); 183 var http_srv = std.http.Server.init(&reader.interface, &writer.interface); 184 var request = http_srv.receiveHead() catch { 185 stream.close(io); 186 continue; 187 }; 188 request.respond("{\"error\":\"search unavailable\"}", .{ 189 .status = .service_unavailable, 190 .extra_headers = &.{ 191 .{ .name = "content-type", .value = "application/json" }, 192 }, 193 }) catch {}; 194 stream.close(io); 195 } 196 } 197 198 log.info("SIGTERM received, draining tasks…", .{}); 199 tasks.cancel(io); 200 log.info("shutdown complete", .{}); 201}