atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix cross-Io heap corruption: GC loop and health checks were using Threaded pg.Pool from Evented fibers

- move GC loop from io.concurrent() (Evented fiber) to std.Thread.spawn() with pool_io (Threaded)
— dp.gc() takes Threaded mutex + queries pg.Pool, which dereferences NULL Thread.current()
threadlocal when called from Evented context → heap corruption / SIGSEGV
- replace direct pg.Pool "SELECT 1" health checks with atomic last_db_success timestamp
— metrics server and API router both run on Evented, pg.Pool runs on Threaded
— isDbHealthy() reads an atomic set by Threaded workers, safe from any Io context

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+35 -9
+3 -3
src/api/router.zig
··· 61 61 // trivial liveness — process is alive, constant-time, no dependencies 62 62 h.respondJson(conn, .ok, "{\"status\":\"ok\"}"); 63 63 } else if (std.mem.eql(u8, path, "/_readyz") or std.mem.eql(u8, path, "/_health") or std.mem.eql(u8, path, "/xrpc/_health")) { 64 - // readiness — checks DB dependency 65 - _ = ctx.persist.db.exec("SELECT 1", .{}) catch { 64 + // readiness — use atomic health flag (pg.Pool runs on Threaded, HTTP handlers are Evented) 65 + if (!ctx.persist.isDbHealthy()) { 66 66 h.respondJson(conn, .internal_server_error, "{\"status\":\"error\",\"msg\":\"database unavailable\"}"); 67 67 return; 68 - }; 68 + } 69 69 h.respondJson(conn, .ok, "{\"status\":\"ok\"}"); 70 70 } else if (std.mem.eql(u8, path, "/_stats")) { 71 71 var stats_buf: [4096]u8 = undefined;
+21
src/event_log.zig
··· 109 109 110 110 io: Io, 111 111 112 + /// last successful DB interaction (epoch seconds, set by Threaded workers). 113 + /// read by metrics server to report health without cross-Io pg.Pool access. 114 + last_db_success: std.atomic.Value(i64) = .{ .raw = 0 }, 115 + 112 116 /// current evtbuf entry count (for metrics — non-blocking, returns 0 if lock is contended) 113 117 pub fn evtbufLen(self: *DiskPersist) usize { 114 118 if (!self.mutex.tryLock()) return 0; ··· 306 310 return .{ .uid = uid }; 307 311 } 308 312 313 + /// check DB health without touching pg.Pool — safe from any Io context. 314 + /// returns true if a Threaded worker successfully queried the DB within the last 30s. 315 + pub fn isDbHealthy(self: *DiskPersist) bool { 316 + const last = self.last_db_success.load(.acquire); 317 + if (last == 0) return false; 318 + var ts: std.c.timespec = undefined; 319 + _ = std.c.clock_gettime(.REALTIME, &ts); 320 + return (@as(i64, ts.sec) - last) < 30; 321 + } 322 + 323 + fn markDbSuccess(self: *DiskPersist) void { 324 + var ts: std.c.timespec = undefined; 325 + _ = std.c.clock_gettime(.REALTIME, &ts); 326 + self.last_db_success.store(@as(i64, ts.sec), .release); 327 + } 328 + 309 329 /// resolve a DID to a numeric UID. creates a new account row on first encounter. 310 330 /// matches indigo's Relay.DidToUid → Account.UID mapping. 311 331 pub fn uidForDid(self: *DiskPersist, did: []const u8) !u64 { ··· 321 341 defer r.deinit() catch {}; 322 342 const uid: u64 = @intCast(r.get(i64, 0)); 323 343 self.didCachePut(did, uid); 344 + self.markDbSuccess(); 324 345 return uid; 325 346 } 326 347
+11 -6
src/main.zig
··· 106 106 .{ .name = "server", .value = "zlay (atproto-relay)" }, 107 107 } }) catch {}; 108 108 } else if (std.mem.eql(u8, path, "/_health") or std.mem.eql(u8, path, "/_readyz")) { 109 - const db_ok = if (self.persist.db.exec("SELECT 1", .{})) |_| true else |_| false; 109 + // use atomic health flag — pg.Pool runs on Threaded, metrics server is Evented 110 + const db_ok = self.persist.isDbHealthy(); 110 111 const status: http.Status = if (db_ok) .ok else .internal_server_error; 111 112 const body = if (db_ok) "{\"status\":\"ok\"}" else "{\"status\":\"error\",\"msg\":\"database unavailable\"}"; 112 113 request.respond(body, .{ .status = status, .keep_alive = false, .extra_headers = &.{ ··· 289 290 var broadcast_future = try io.concurrent(broadcaster.Broadcaster.runBroadcastLoop, .{&bc}); 290 291 defer _ = broadcast_future.cancel(io); 291 292 292 - // start GC loop (runs as background task — does disk I/O + malloc_trim) 293 - var gc_future = try io.concurrent(gcLoop, .{ &dp, io }); 294 - defer _ = gc_future.cancel(io); 293 + // start GC loop on a plain thread — dp.gc() uses pool_io (Threaded) mutex 294 + // and pg.Pool. MUST NOT run as Evented fiber: Threaded futex on Evented 295 + // fiber dereferences NULL Thread.current() threadlocal → heap corruption. 296 + const gc_thread = std.Thread.spawn(.{}, gcLoop, .{ &dp, pool_io }) catch |err| { 297 + log.err("failed to start GC thread: {s}", .{@errorName(err)}); 298 + return err; 299 + }; 300 + gc_thread.detach(); 295 301 296 302 // wire HTTP fallback into broadcaster (all API endpoints served on WS port) 297 303 var http_context = api.HttpContext{ ··· 360 366 ws_listener.deinit(io); 361 367 server_future.cancel(io); 362 368 363 - // cancel GC task 364 - gc_future.cancel(io); 369 + // GC thread is detached and checks shutdown_flag — no cancel needed 365 370 366 371 // cancel broadcaster fiber (shutdown flag already set, it will drain remaining) 367 372 broadcast_future.cancel(io);