GET /xrpc/app.bsky.actor.searchActorsTypeahead typeahead.waow.tech
15
fork

Configure Feed

Select the types of activity you want to include in your feed.

ingester: bump zat to v0.3.0-alpha.15, graceful shutdown + bounded task pool

- SIGTERM handler closes listen socket for clean fly.io deploys
- replace Thread.spawn/detach with Io.Group (concurrent + bounded async)
- 8MB stacks + async_limit(8) to fit 256MB VM budget

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

zzstoatzz 7c2a17e4 d245c15d

+47 -19
+2 -2
ingester/build.zig.zon
··· 5 5 .minimum_zig_version = "0.16.0", 6 6 .dependencies = .{ 7 7 .zat = .{ 8 - .url = "https://tangled.sh/zat.dev/zat/archive/v0.3.0-alpha.7", 9 - .hash = "zat-0.3.0-alpha.7-5PuC7uNjBQDv28db31DEKkFn1tU5I4f1GfJs-RrG8_pS", 8 + .url = "https://tangled.sh/zat.dev/zat/archive/v0.3.0-alpha.15", 9 + .hash = "zat-0.3.0-alpha.15-5PuC7nVhBQCJEzz9LuzSbtLb68Wd0x_yjDgTP3EqV8dH", 10 10 }, 11 11 .zqlite = .{ 12 12 .url = "https://github.com/karlseguin/zqlite.zig/archive/refs/heads/master.tar.gz",
+45 -17
ingester/src/main.zig
··· 21 21 22 22 const SOCKET_TIMEOUT_SECS = 5; 23 23 24 + // graceful shutdown: SIGTERM → close listening socket → accept breaks → drain tasks 25 + var listen_fd: std.atomic.Value(std.posix.fd_t) = std.atomic.Value(std.posix.fd_t).init(-1); 26 + 27 + fn handleSigterm(_: std.posix.SIG) callconv(.c) void { 28 + const fd = listen_fd.load(.acquire); 29 + if (fd >= 0) _ = std.posix.system.shutdown(fd, 2); // SHUT_RDWR 30 + } 31 + 24 32 fn cGetenv(name: [*:0]const u8) ?[]const u8 { 25 33 if (std.c.getenv(name)) |p| return mem.span(p); 26 34 return null; ··· 73 81 74 82 pub fn main() !void { 75 83 const allocator = std.heap.smp_allocator; 76 - app_threaded_io = std.Io.Threaded.init(allocator, .{}); 84 + app_threaded_io = std.Io.Threaded.init(allocator, .{ 85 + .stack_size = 8 * 1024 * 1024, // 8MB (default 16MB is wasteful on 256MB VM) 86 + .async_limit = std.Io.Limit.limited(8), // bounded HTTP handler pool 87 + }); 77 88 78 89 // configure logfire (reads LOGFIRE_WRITE_TOKEN from env) 79 90 _ = logfire.configure(.{ ··· 113 124 114 125 log.info("listening on port {d}", .{port}); 115 126 116 - // start sync thread (background — turso → local SQLite) 127 + // register SIGTERM for graceful shutdown (Fly sends SIGTERM before stopping) 128 + listen_fd.store(srv.socket.handle, .release); 129 + std.posix.sigaction(.TERM, &.{ 130 + .handler = .{ .handler = handleSigterm }, 131 + .mask = mem.zeroes(std.posix.sigset_t), 132 + .flags = 0, 133 + }, null); 134 + 135 + // task group: .concurrent for long-lived background tasks, 136 + // .async (bounded pool, inline fallback) for HTTP handlers 137 + var tasks: std.Io.Group = .init; 138 + 139 + // start sync (background — turso → local SQLite) 117 140 if (local_db_ptr) |db| { 118 - const sync_thread = try std.Thread.spawn(.{}, sync.syncLoop, .{ allocator, db }); 119 - sync_thread.detach(); 141 + tasks.concurrent(io, sync.syncLoop, .{ allocator, db }) catch |err| { 142 + log.err("sync task failed: {}", .{err}); 143 + }; 120 144 } 121 145 122 - // start jetstream thread (background — ingestion) 146 + // start jetstream (background — ingestion) 123 147 var transport = HttpTransport.init(io, allocator); 124 148 transport.keep_alive = false; 125 149 defer transport.deinit(); 126 150 127 - const js_thread = try std.Thread.spawn(.{}, jetstreamThread, .{JetstreamArgs{ 151 + tasks.concurrent(io, jetstreamThread, .{JetstreamArgs{ 128 152 .allocator = allocator, 129 153 .config = config, 130 154 .transport = &transport, 131 155 .local_db = local_db_ptr, 132 - }}); 133 - js_thread.detach(); 156 + }}) catch |err| { 157 + log.err("jetstream task failed: {}", .{err}); 158 + }; 134 159 135 160 // main thread: HTTP accept loop 136 161 while (true) { 137 - const stream = srv.accept(io) catch |err| { 138 - log.err("accept error: {}", .{err}); 139 - continue; 162 + const stream = srv.accept(io) catch |err| switch (err) { 163 + error.SocketNotListening => break, 164 + else => { 165 + log.err("accept error: {}", .{err}); 166 + continue; 167 + }, 140 168 }; 141 169 142 170 setSocketTimeout(stream.socket.handle, SOCKET_TIMEOUT_SECS) catch |err| { ··· 144 172 }; 145 173 146 174 if (local_db_ptr) |db| { 147 - const handler_thread = std.Thread.spawn(.{}, server.handleConnection, .{ stream, db }) catch |err| { 148 - log.err("thread spawn error: {}", .{err}); 149 - stream.close(io); 150 - continue; 151 - }; 152 - handler_thread.detach(); 175 + // bounded pool — inline fallback under load (graceful degradation) 176 + tasks.async(io, server.handleConnection, .{ stream, db }); 153 177 } else { 154 178 // no local DB — just serve health 155 179 var read_buffer: [1024]u8 = undefined; ··· 170 194 stream.close(io); 171 195 } 172 196 } 197 + 198 + log.info("SIGTERM received, draining tasks…", .{}); 199 + tasks.cancel(io); 200 + log.info("shutdown complete", .{}); 173 201 }