atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

replace ev_db with DbRequestQueue: route all DB from Evented through pool_io

the Evented pg.Pool (ev_db) approach was broken — io_uring netLookup is
unimplemented upstream, so no DNS and no outbound TCP from Evented fibers.

this replaces ev_db with a DbRequestQueue (MPSC FIFO ring buffer) that
routes general DB traffic through pool_io (Threaded) worker threads:

- add DbRequest + DbRequestQueue to event_log.zig (4096-slot ring,
spinlock push, CAS pop, 2 worker threads)
- convert xrpc/admin handlers to typed DbRequest structs with
@fieldParentPtr callbacks that write JSON into stack buffers
- convert slurper: pullHosts on own std.Thread (parallel with
spawnWorkers), addHost phased (DB via queue, HTTP via temp thread)
- convert broadcaster firstSeq to DbRequest
- convert backfiller/cleaner from Io.Future to std.Thread + direct
persist.db access, with cooperative shutdown checks
- remove all *Ev methods and ev_db infrastructure from DiskPersist
- explicit join paths for backfiller/cleaner/db-workers on shutdown

DbRequest.wait() never returns before done — preserves stack lifetime
of caller-embedded request structs during shutdown drain.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+967 -542
+172 -44
src/api/admin.zig
··· 2 2 //! 3 3 //! all handlers require Bearer token auth against RELAY_ADMIN_PASSWORD. 4 4 //! includes host blocking/unblocking, account bans, and backfill control. 5 + //! 6 + //! DB-accessing handlers use DbRequest + DbRequestQueue to route queries through 7 + //! pool_io (Threaded) workers. 5 8 6 9 const std = @import("std"); 7 10 const Io = std.Io; ··· 16 19 const log = std.log.scoped(.relay); 17 20 18 21 const HttpContext = router.HttpContext; 22 + const DbRequest = event_log_mod.DbRequest; 23 + const DiskPersist = event_log_mod.DiskPersist; 19 24 20 25 /// check admin auth via headers, send error response if not authorized. returns true if authorized. 21 26 pub fn checkAdmin(conn: *h.Conn, headers: ?*const websocket.Handshake.KeyValue) bool { ··· 58 63 defer parsed.deinit(); 59 64 const did = parsed.value.did; 60 65 61 - // resolve DID → UID via Evented pool (skips DID cache which uses pool_io mutex) 62 - const uid = ctx.persist.uidForDidEv(did) catch { 66 + // resolve DID → UID via DbRequestQueue 67 + const UidReq = struct { 68 + base: DbRequest = .{ .callback = &execute }, 69 + did_buf: [256]u8 = undefined, 70 + did_len: usize = 0, 71 + uid: u64 = 0, 72 + 73 + fn execute(b: *DbRequest, dp: *DiskPersist) void { 74 + const self: *@This() = @fieldParentPtr("base", b); 75 + const d = self.did_buf[0..self.did_len]; 76 + // check database 77 + if (dp.db.rowUnsafe("SELECT uid FROM account WHERE did = $1", .{d}) catch null) |row| { 78 + var r = row; 79 + defer r.deinit() catch {}; 80 + self.uid = @intCast(r.get(i64, 0)); 81 + return; 82 + } 83 + // create new account row 84 + _ = dp.db.exec("INSERT INTO account (did) VALUES ($1) ON CONFLICT (did) DO NOTHING", .{d}) catch { 85 + b.err = error.DatabaseError; 86 + return; 87 + }; 88 + var row = dp.db.rowUnsafe("SELECT uid FROM account WHERE did = $1", .{d}) catch { 89 + b.err = error.DatabaseError; 90 + return; 91 + } orelse { 92 + b.err = error.AccountCreationFailed; 93 + return; 94 + }; 95 + defer row.deinit() catch {}; 96 + self.uid = @intCast(row.get(i64, 0)); 97 + } 98 + }; 99 + var uid_req: UidReq = .{}; 100 + const copy_len = @min(did.len, uid_req.did_buf.len); 101 + @memcpy(uid_req.did_buf[0..copy_len], did[0..copy_len]); 102 + uid_req.did_len = copy_len; 103 + ctx.db_queue.push(&uid_req.base); 104 + uid_req.base.wait(ctx.io, ctx.shutdown); 105 + 106 + if (uid_req.base.err != null) { 63 107 h.respondJson(conn, .internal_server_error, "{\"error\":\"failed to resolve DID\"}"); 64 108 return; 65 - }; 109 + } 66 110 67 111 // remove from collection index so banned accounts don't appear in listReposByCollection 68 112 ctx.collection_index.removeAll(did) catch |err| { ··· 72 116 // build CBOR #account frame and route takedown + persist + broadcast 73 117 // through host_ops queue (pool_io thread) — fire and forget. 74 118 const host_ops_mod = @import("../host_ops.zig"); 75 - var td: host_ops_mod.HostOp.Payload.Takedown = .{ .uid = uid }; 119 + var td: host_ops_mod.HostOp.Payload.Takedown = .{ .uid = uid_req.uid }; 76 120 77 121 if (buildAccountFrame(ctx.persist.allocator, did)) |frame_bytes| { 78 122 defer ctx.persist.allocator.free(frame_bytes); ··· 88 132 .payload = .{ .takedown = td }, 89 133 }); 90 134 91 - log.info("admin: banned {s} (uid={d}), takedown enqueued", .{ did, uid }); 135 + log.info("admin: banned {s} (uid={d}), takedown enqueued", .{ did, uid_req.uid }); 92 136 h.respondJson(conn, .ok, "{\"success\":true}"); 93 137 } 94 138 95 139 pub fn handleAdminListHosts(conn: *h.Conn, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { 96 140 if (!checkAdmin(conn, headers)) return; 97 141 98 - const persist = ctx.persist; 99 - const hosts = persist.listAllHostsEv(persist.allocator) catch { 142 + // list all hosts via DbRequestQueue 143 + const ListAllHostsReq = struct { 144 + base: DbRequest = .{ .callback = &execute }, 145 + alloc: std.mem.Allocator, 146 + result: ?[]event_log_mod.DiskPersist.Host = null, 147 + 148 + fn execute(b: *DbRequest, dp: *DiskPersist) void { 149 + const self: *@This() = @fieldParentPtr("base", b); 150 + self.result = dp.listAllHosts(self.alloc) catch |e| { 151 + b.err = e; 152 + return; 153 + }; 154 + } 155 + }; 156 + var list_req: ListAllHostsReq = .{ .alloc = ctx.persist.allocator }; 157 + ctx.db_queue.push(&list_req.base); 158 + list_req.base.wait(ctx.io, ctx.shutdown); 159 + 160 + if (list_req.base.err != null or list_req.result == null) { 100 161 h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 101 162 return; 102 - }; 163 + } 164 + 165 + const hosts = list_req.result.?; 103 166 defer { 104 167 for (hosts) |host| { 105 - persist.allocator.free(host.hostname); 106 - persist.allocator.free(host.status); 168 + ctx.persist.allocator.free(host.hostname); 169 + ctx.persist.allocator.free(host.status); 107 170 } 108 - persist.allocator.free(hosts); 171 + ctx.persist.allocator.free(hosts); 109 172 } 110 173 111 - var aw: Io.Writer.Allocating = .init(persist.allocator); 174 + var aw: Io.Writer.Allocating = .init(ctx.persist.allocator); 112 175 defer aw.deinit(); 113 176 const w = &aw.writer; 114 177 ··· 140 203 h.respondJson(conn, .ok, aw.written()); 141 204 } 142 205 143 - pub fn handleAdminBlockHost(conn: *h.Conn, body: []const u8, headers: *const websocket.Handshake.KeyValue, persist: *event_log_mod.DiskPersist) void { 206 + pub fn handleAdminBlockHost(conn: *h.Conn, body: []const u8, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { 144 207 if (!checkAdmin(conn, headers)) return; 145 208 146 - const parsed = std.json.parseFromSlice(struct { hostname: []const u8 }, persist.allocator, body, .{ .ignore_unknown_fields = true }) catch { 209 + const parsed = std.json.parseFromSlice(struct { hostname: []const u8 }, ctx.persist.allocator, body, .{ .ignore_unknown_fields = true }) catch { 147 210 h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid JSON\"}"); 148 211 return; 149 212 }; 150 213 defer parsed.deinit(); 151 214 152 - const host_info = persist.getOrCreateHostEv(parsed.value.hostname) catch { 153 - h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"host lookup failed\"}"); 154 - return; 215 + const BlockHostReq = struct { 216 + base: DbRequest = .{ .callback = &execute }, 217 + hostname_buf: [256]u8 = undefined, 218 + hostname_len: usize = 0, 219 + host_id: u64 = 0, 220 + 221 + fn execute(b: *DbRequest, dp: *DiskPersist) void { 222 + const self: *@This() = @fieldParentPtr("base", b); 223 + const hn = self.hostname_buf[0..self.hostname_len]; 224 + const info = dp.getOrCreateHost(hn) catch |e| { 225 + b.err = e; 226 + return; 227 + }; 228 + self.host_id = info.id; 229 + dp.updateHostStatus(info.id, "blocked") catch |e| { 230 + b.err = e; 231 + return; 232 + }; 233 + } 155 234 }; 235 + var req: BlockHostReq = .{}; 236 + const copy_len = @min(parsed.value.hostname.len, req.hostname_buf.len); 237 + @memcpy(req.hostname_buf[0..copy_len], parsed.value.hostname[0..copy_len]); 238 + req.hostname_len = copy_len; 239 + ctx.db_queue.push(&req.base); 240 + req.base.wait(ctx.io, ctx.shutdown); 156 241 157 - persist.updateHostStatusEv(host_info.id, "blocked") catch { 158 - h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"status update failed\"}"); 242 + if (req.base.err != null) { 243 + h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"operation failed\"}"); 159 244 return; 160 - }; 245 + } 161 246 162 - log.info("admin: blocked host {s} (id={d})", .{ parsed.value.hostname, host_info.id }); 247 + log.info("admin: blocked host {s} (id={d})", .{ parsed.value.hostname, req.host_id }); 163 248 h.respondJson(conn, .ok, "{\"success\":true}"); 164 249 } 165 250 166 - pub fn handleAdminUnblockHost(conn: *h.Conn, body: []const u8, headers: *const websocket.Handshake.KeyValue, persist: *event_log_mod.DiskPersist) void { 251 + pub fn handleAdminUnblockHost(conn: *h.Conn, body: []const u8, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { 167 252 if (!checkAdmin(conn, headers)) return; 168 253 169 - const parsed = std.json.parseFromSlice(struct { hostname: []const u8 }, persist.allocator, body, .{ .ignore_unknown_fields = true }) catch { 254 + const parsed = std.json.parseFromSlice(struct { hostname: []const u8 }, ctx.persist.allocator, body, .{ .ignore_unknown_fields = true }) catch { 170 255 h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid JSON\"}"); 171 256 return; 172 257 }; 173 258 defer parsed.deinit(); 174 259 175 - const host_info = persist.getOrCreateHostEv(parsed.value.hostname) catch { 176 - h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"host lookup failed\"}"); 177 - return; 260 + const UnblockHostReq = struct { 261 + base: DbRequest = .{ .callback = &execute }, 262 + hostname_buf: [256]u8 = undefined, 263 + hostname_len: usize = 0, 264 + host_id: u64 = 0, 265 + 266 + fn execute(b: *DbRequest, dp: *DiskPersist) void { 267 + const self: *@This() = @fieldParentPtr("base", b); 268 + const hn = self.hostname_buf[0..self.hostname_len]; 269 + const info = dp.getOrCreateHost(hn) catch |e| { 270 + b.err = e; 271 + return; 272 + }; 273 + self.host_id = info.id; 274 + dp.updateHostStatus(info.id, "active") catch |e| { 275 + b.err = e; 276 + return; 277 + }; 278 + dp.resetHostFailures(info.id) catch {}; 279 + } 178 280 }; 281 + var req: UnblockHostReq = .{}; 282 + const copy_len = @min(parsed.value.hostname.len, req.hostname_buf.len); 283 + @memcpy(req.hostname_buf[0..copy_len], parsed.value.hostname[0..copy_len]); 284 + req.hostname_len = copy_len; 285 + ctx.db_queue.push(&req.base); 286 + req.base.wait(ctx.io, ctx.shutdown); 179 287 180 - persist.updateHostStatusEv(host_info.id, "active") catch { 181 - h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"status update failed\"}"); 288 + if (req.base.err != null) { 289 + h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"operation failed\"}"); 182 290 return; 183 - }; 184 - persist.resetHostFailuresEv(host_info.id) catch {}; 291 + } 185 292 186 - log.info("admin: unblocked host {s} (id={d})", .{ parsed.value.hostname, host_info.id }); 293 + log.info("admin: unblocked host {s} (id={d})", .{ parsed.value.hostname, req.host_id }); 187 294 h.respondJson(conn, .ok, "{\"success\":true}"); 188 295 } 189 296 190 297 /// set or clear the account_limit override for a host. 191 - /// POST {"host": "...", "account_limit": 100000} — set override 192 - /// POST {"host": "...", "account_limit": null} — clear override (revert to COUNT(*)) 193 298 pub fn handleAdminChangeLimits(conn: *h.Conn, body: []const u8, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { 194 299 if (!checkAdmin(conn, headers)) return; 195 300 ··· 204 309 }; 205 310 defer parsed.deinit(); 206 311 207 - const host_id = ctx.persist.getHostIdForHostnameEv(parsed.value.host) catch { 312 + const ChangeLimitsReq = struct { 313 + base: DbRequest = .{ .callback = &execute }, 314 + hostname_buf: [256]u8 = undefined, 315 + hostname_len: usize = 0, 316 + new_limit: ?u64, 317 + host_id: ?u64 = null, 318 + effective: u64 = 0, 319 + 320 + fn execute(b: *DbRequest, dp: *DiskPersist) void { 321 + const self: *@This() = @fieldParentPtr("base", b); 322 + const hn = self.hostname_buf[0..self.hostname_len]; 323 + self.host_id = dp.getHostIdForHostname(hn) catch |e| { 324 + b.err = e; 325 + return; 326 + }; 327 + const hid = self.host_id orelse return; 328 + dp.setHostAccountLimit(hid, self.new_limit) catch |e| { 329 + b.err = e; 330 + return; 331 + }; 332 + self.effective = if (self.new_limit) |l| l else dp.getHostAccountCount(hid); 333 + } 334 + }; 335 + var req: ChangeLimitsReq = .{ .new_limit = parsed.value.account_limit }; 336 + const copy_len = @min(parsed.value.host.len, req.hostname_buf.len); 337 + @memcpy(req.hostname_buf[0..copy_len], parsed.value.host[0..copy_len]); 338 + req.hostname_len = copy_len; 339 + ctx.db_queue.push(&req.base); 340 + req.base.wait(ctx.io, ctx.shutdown); 341 + 342 + if (req.base.err != null) { 208 343 h.respondJson(conn, .internal_server_error, "{\"error\":\"database error\"}"); 209 344 return; 210 - } orelse { 345 + } 346 + const host_id = req.host_id orelse { 211 347 h.respondJson(conn, .not_found, "{\"error\":\"host not found\"}"); 212 348 return; 213 349 }; 214 350 215 - ctx.persist.setHostAccountLimitEv(host_id, parsed.value.account_limit) catch { 216 - h.respondJson(conn, .internal_server_error, "{\"error\":\"failed to update limit\"}"); 217 - return; 218 - }; 219 - 220 351 // update running subscriber's rate limits immediately 221 - const effective = if (parsed.value.account_limit) |l| l else ctx.persist.getHostAccountCountEv(host_id) catch 0; 222 - ctx.slurper.updateHostLimits(host_id, effective); 352 + ctx.slurper.updateHostLimits(host_id, req.effective); 223 353 224 354 if (parsed.value.account_limit) |limit| { 225 355 log.info("admin: set account_limit for {s} (id={d}): {d}", .{ parsed.value.host, host_id, limit }); ··· 337 467 // --- protocol helpers (used only by handleBan) --- 338 468 339 469 /// build a CBOR #account frame for a takedown event. 340 - /// header: {op: 1, t: "#account"}, payload: {seq: 0, did: "...", time: "...", active: false, status: "takendown"} 341 470 fn buildAccountFrame(allocator: std.mem.Allocator, did: []const u8) ?[]const u8 { 342 471 const zat = @import("zat"); 343 472 const cbor = zat.cbor; ··· 378 507 return frame; 379 508 } 380 509 381 - /// format current UTC time as ISO 8601 (YYYY-MM-DDTHH:MM:SSZ) 382 510 fn formatTimestamp(buf: *[24]u8) []const u8 { 383 511 var tp: std.c.timespec = undefined; 384 512 _ = std.c.clock_gettime(.REALTIME, &tp);
+12 -9
src/api/router.zig
··· 32 32 validator: *validator_mod.Validator, 33 33 host_ops: *host_ops_mod.HostOpsQueue, 34 34 pool_io: Io, 35 + io: Io, 36 + db_queue: *event_log_mod.DbRequestQueue, 37 + shutdown: *std.atomic.Value(bool), 35 38 }; 36 39 37 40 /// top-level HTTP request router — installed as bc.http_fallback ··· 74 77 const body = broadcaster.formatStatsResponse(ctx.stats, &stats_buf, ctx.bc.io); 75 78 h.respondJson(conn, .ok, body); 76 79 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listRepos")) { 77 - xrpc.handleListRepos(conn, query, ctx.persist); 80 + xrpc.handleListRepos(conn, query, ctx); 78 81 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.getRepoStatus")) { 79 - xrpc.handleGetRepoStatus(conn, query, ctx.persist); 82 + xrpc.handleGetRepoStatus(conn, query, ctx); 80 83 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.getRepo")) { 81 - xrpc.handleGetRepo(conn, query, ctx.persist); 84 + xrpc.handleGetRepo(conn, query, ctx); 82 85 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.getLatestCommit")) { 83 - xrpc.handleGetLatestCommit(conn, query, ctx.persist); 86 + xrpc.handleGetLatestCommit(conn, query, ctx); 84 87 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listReposByCollection")) { 85 88 xrpc.handleListReposByCollection(conn, query, ctx.collection_index); 86 89 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listHosts")) { 87 - xrpc.handleListHosts(conn, query, ctx.persist); 90 + xrpc.handleListHosts(conn, query, ctx); 88 91 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.getHostStatus")) { 89 - xrpc.handleGetHostStatus(conn, query, ctx.persist); 92 + xrpc.handleGetHostStatus(conn, query, ctx); 90 93 } else if (std.mem.eql(u8, path, "/admin/hosts")) { 91 94 admin.handleAdminListHosts(conn, headers, ctx); 92 95 } else if (std.mem.eql(u8, path, "/admin/backfill-collections")) { ··· 126 129 if (std.mem.eql(u8, path, "/admin/repo/ban")) { 127 130 admin.handleBan(conn, body, headers, ctx); 128 131 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.requestCrawl")) { 129 - xrpc.handleRequestCrawl(conn, body, ctx.slurper); 132 + xrpc.handleRequestCrawl(conn, body, ctx); 130 133 } else if (std.mem.eql(u8, path, "/admin/hosts/block")) { 131 - admin.handleAdminBlockHost(conn, body, headers, ctx.persist); 134 + admin.handleAdminBlockHost(conn, body, headers, ctx); 132 135 } else if (std.mem.eql(u8, path, "/admin/hosts/unblock")) { 133 - admin.handleAdminUnblockHost(conn, body, headers, ctx.persist); 136 + admin.handleAdminUnblockHost(conn, body, headers, ctx); 134 137 } else if (std.mem.eql(u8, path, "/admin/hosts/changeLimits")) { 135 138 admin.handleAdminChangeLimits(conn, body, headers, ctx); 136 139 } else if (std.mem.eql(u8, path, "/admin/backfill-collections")) {
+431 -252
src/api/xrpc.zig
··· 3 3 //! implements com.atproto.sync.* lexicon endpoints: 4 4 //! listRepos, getRepo, getRepoStatus, getLatestCommit, listReposByCollection, 5 5 //! listHosts, getHostStatus, requestCrawl 6 + //! 7 + //! DB-accessing handlers use DbRequest + DbRequestQueue to route queries through 8 + //! pool_io (Threaded) workers, avoiding the broken Evented pg.Pool. 6 9 7 10 const std = @import("std"); 8 11 const Io = std.Io; 12 + const pg = @import("pg"); 9 13 const h = @import("http.zig"); 14 + const router = @import("router.zig"); 10 15 const event_log_mod = @import("../event_log.zig"); 11 16 const collection_index_mod = @import("../collection_index.zig"); 12 17 const slurper_mod = @import("../slurper.zig"); 13 18 19 + const Allocator = std.mem.Allocator; 20 + const HttpContext = router.HttpContext; 21 + const DbRequest = event_log_mod.DbRequest; 22 + const DiskPersist = event_log_mod.DiskPersist; 14 23 const log = std.log.scoped(.relay); 15 24 16 - pub fn handleListRepos(conn: *h.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { 25 + // --- listRepos --- 26 + 27 + const ListReposReq = struct { 28 + base: DbRequest = .{ .callback = &execute }, 29 + cursor_val: i64, 30 + limit: i64, 31 + json_buf: [65536]u8 = undefined, 32 + json_len: usize = 0, 33 + db_err: bool = false, 34 + 35 + fn execute(b: *DbRequest, dp: *DiskPersist) void { 36 + const self: *@This() = @fieldParentPtr("base", b); 37 + var w: Io.Writer = .fixed(&self.json_buf); 38 + 39 + var result = dp.db.query( 40 + \\SELECT a.uid, a.did, a.status, a.upstream_status, COALESCE(r.rev, ''), COALESCE(r.commit_data_cid, '') 41 + \\FROM account a LEFT JOIN account_repo r ON a.uid = r.uid 42 + \\WHERE a.uid > $1 ORDER BY a.uid ASC LIMIT $2 43 + , .{ self.cursor_val, self.limit }) catch { 44 + self.db_err = true; 45 + return; 46 + }; 47 + defer result.deinit(); 48 + 49 + var count: i64 = 0; 50 + var last_uid: i64 = 0; 51 + 52 + w.writeAll("{\"repos\":[") catch return; 53 + 54 + while (result.nextUnsafe() catch null) |row| { 55 + if (count > 0) w.writeByte(',') catch return; 56 + 57 + const uid = row.get(i64, 0); 58 + const did = row.get([]const u8, 1); 59 + const local_status = row.get([]const u8, 2); 60 + const upstream_status = row.get([]const u8, 3); 61 + const rev = row.get([]const u8, 4); 62 + const head = row.get([]const u8, 5); 63 + 64 + const local_ok = std.mem.eql(u8, local_status, "active"); 65 + const upstream_ok = std.mem.eql(u8, upstream_status, "active"); 66 + const active = local_ok and upstream_ok; 67 + const status = if (!local_ok) local_status else upstream_status; 68 + 69 + w.writeAll("{\"did\":\"") catch return; 70 + w.writeAll(did) catch return; 71 + w.writeAll("\",\"head\":\"") catch return; 72 + w.writeAll(head) catch return; 73 + w.writeAll("\",\"rev\":\"") catch return; 74 + w.writeAll(rev) catch return; 75 + w.writeAll("\"") catch return; 76 + 77 + if (active) { 78 + w.writeAll(",\"active\":true") catch return; 79 + } else { 80 + w.writeAll(",\"active\":false,\"status\":\"") catch return; 81 + w.writeAll(status) catch return; 82 + w.writeAll("\"") catch return; 83 + } 84 + 85 + w.writeByte('}') catch return; 86 + last_uid = uid; 87 + count += 1; 88 + } 89 + 90 + w.writeByte(']') catch return; 91 + if (count >= self.limit and count >= 2) { 92 + w.print(",\"cursor\":\"{d}\"", .{last_uid}) catch return; 93 + } 94 + w.writeByte('}') catch return; 95 + self.json_len = w.end; 96 + } 97 + }; 98 + 99 + pub fn handleListRepos(conn: *h.Conn, query: []const u8, ctx: *HttpContext) void { 17 100 const cursor_str = h.queryParam(query, "cursor") orelse "0"; 18 101 const limit_str = h.queryParam(query, "limit") orelse "500"; 19 102 ··· 35 118 return; 36 119 } 37 120 38 - const ev_db = persist.ensureEvDb() catch { 121 + var req: ListReposReq = .{ .cursor_val = cursor_val, .limit = limit }; 122 + ctx.db_queue.push(&req.base); 123 + req.base.wait(ctx.io, ctx.shutdown); 124 + 125 + if (req.base.err != null) { 39 126 h.respondJson(conn, .service_unavailable, "{\"error\":\"ServiceUnavailable\",\"message\":\"database unavailable\"}"); 40 127 return; 41 - }; 42 - 43 - // query accounts with repo state, paginated by UID 44 - // includes both local status and upstream_status for combined active check 45 - var result = ev_db.query( 46 - \\SELECT a.uid, a.did, a.status, a.upstream_status, COALESCE(r.rev, ''), COALESCE(r.commit_data_cid, '') 47 - \\FROM account a LEFT JOIN account_repo r ON a.uid = r.uid 48 - \\WHERE a.uid > $1 ORDER BY a.uid ASC LIMIT $2 49 - , .{ cursor_val, limit }) catch { 128 + } 129 + if (req.db_err) { 50 130 h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 51 131 return; 52 - }; 53 - defer result.deinit(); 132 + } 54 133 55 - // build JSON response into a buffer 56 - var buf: [65536]u8 = undefined; 57 - var w: Io.Writer = .fixed(&buf); 134 + h.respondJson(conn, .ok, req.json_buf[0..req.json_len]); 135 + } 58 136 59 - var count: i64 = 0; 60 - var last_uid: i64 = 0; 137 + // --- getRepoStatus --- 61 138 62 - w.writeAll("{\"repos\":[") catch return; 139 + const GetRepoStatusReq = struct { 140 + base: DbRequest = .{ .callback = &execute }, 141 + did_buf: [256]u8 = undefined, 142 + did_len: usize = 0, 143 + json_buf: [4096]u8 = undefined, 144 + json_len: usize = 0, 145 + not_found: bool = false, 146 + db_err: bool = false, 63 147 64 - while (result.nextUnsafe() catch null) |row| { 65 - if (count > 0) w.writeByte(',') catch return; 148 + fn execute(b: *DbRequest, dp: *DiskPersist) void { 149 + const self: *@This() = @fieldParentPtr("base", b); 150 + const did = self.did_buf[0..self.did_len]; 66 151 67 - const uid = row.get(i64, 0); 68 - const did = row.get([]const u8, 1); 69 - const local_status = row.get([]const u8, 2); 70 - const upstream_status = row.get([]const u8, 3); 71 - const rev = row.get([]const u8, 4); 72 - const head = row.get([]const u8, 5); 152 + var row = (dp.db.rowUnsafe( 153 + "SELECT a.uid, a.status, a.upstream_status, COALESCE(r.rev, '') FROM account a LEFT JOIN account_repo r ON a.uid = r.uid WHERE a.did = $1", 154 + .{did}, 155 + ) catch { 156 + self.db_err = true; 157 + return; 158 + }) orelse { 159 + self.not_found = true; 160 + return; 161 + }; 162 + defer row.deinit() catch {}; 73 163 74 - // Go relay: Account.IsActive() — both local AND upstream must be active 164 + const local_status = row.get([]const u8, 1); 165 + const upstream_status = row.get([]const u8, 2); 166 + const rev = row.get([]const u8, 3); 75 167 const local_ok = std.mem.eql(u8, local_status, "active"); 76 168 const upstream_ok = std.mem.eql(u8, upstream_status, "active"); 77 169 const active = local_ok and upstream_ok; 78 - // Go relay: Account.AccountStatus() — local takes priority 79 170 const status = if (!local_ok) local_status else upstream_status; 80 171 172 + var w: Io.Writer = .fixed(&self.json_buf); 81 173 w.writeAll("{\"did\":\"") catch return; 82 174 w.writeAll(did) catch return; 83 175 w.writeAll("\"") catch return; 84 176 85 - w.writeAll(",\"head\":\"") catch return; 86 - w.writeAll(head) catch return; 87 - w.writeAll("\",\"rev\":\"") catch return; 88 - w.writeAll(rev) catch return; 89 - w.writeAll("\"") catch return; 90 - 91 177 if (active) { 92 178 w.writeAll(",\"active\":true") catch return; 93 179 } else { ··· 96 182 w.writeAll("\"") catch return; 97 183 } 98 184 99 - w.writeByte('}') catch return; 100 - last_uid = uid; 101 - count += 1; 102 - } 185 + if (rev.len > 0) { 186 + w.writeAll(",\"rev\":\"") catch return; 187 + w.writeAll(rev) catch return; 188 + w.writeAll("\"") catch return; 189 + } 103 190 104 - w.writeByte(']') catch return; 105 - 106 - // include cursor if we got a full page 107 - if (count >= limit and count >= 2) { 108 - w.print(",\"cursor\":\"{d}\"", .{last_uid}) catch return; 191 + w.writeByte('}') catch return; 192 + self.json_len = w.end; 109 193 } 110 - 111 - w.writeByte('}') catch return; 194 + }; 112 195 113 - h.respondJson(conn, .ok, w.buffered()); 114 - } 115 - 116 - pub fn handleGetRepoStatus(conn: *h.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { 196 + pub fn handleGetRepoStatus(conn: *h.Conn, query: []const u8, ctx: *HttpContext) void { 117 197 var did_buf: [256]u8 = undefined; 118 198 const did = h.queryParamDecoded(query, "did", &did_buf) orelse { 119 199 h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"did parameter required\"}"); 120 200 return; 121 201 }; 122 - 123 - // basic DID syntax check 124 202 if (!std.mem.startsWith(u8, did, "did:")) { 125 203 h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid DID\"}"); 126 204 return; 127 205 } 128 206 129 - const ev_db = persist.ensureEvDb() catch { 207 + var req: GetRepoStatusReq = .{}; 208 + @memcpy(req.did_buf[0..did.len], did); 209 + req.did_len = did.len; 210 + ctx.db_queue.push(&req.base); 211 + req.base.wait(ctx.io, ctx.shutdown); 212 + 213 + if (req.base.err != null) { 130 214 h.respondJson(conn, .service_unavailable, "{\"error\":\"ServiceUnavailable\",\"message\":\"database unavailable\"}"); 131 215 return; 132 - }; 133 - 134 - // look up account (includes both local and upstream status) 135 - var row = (ev_db.rowUnsafe( 136 - "SELECT a.uid, a.status, a.upstream_status, COALESCE(r.rev, '') FROM account a LEFT JOIN account_repo r ON a.uid = r.uid WHERE a.did = $1", 137 - .{did}, 138 - ) catch { 216 + } 217 + if (req.db_err) { 139 218 h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 140 219 return; 141 - }) orelse { 220 + } 221 + if (req.not_found) { 142 222 h.respondJson(conn, .not_found, "{\"error\":\"RepoNotFound\",\"message\":\"account not found\"}"); 143 223 return; 144 - }; 145 - defer row.deinit() catch {}; 224 + } 146 225 147 - const local_status = row.get([]const u8, 1); 148 - const upstream_status = row.get([]const u8, 2); 149 - const rev = row.get([]const u8, 3); 150 - // Go relay: Account.IsActive() / AccountStatus() 151 - const local_ok = std.mem.eql(u8, local_status, "active"); 152 - const upstream_ok = std.mem.eql(u8, upstream_status, "active"); 153 - const active = local_ok and upstream_ok; 154 - const status = if (!local_ok) local_status else upstream_status; 226 + h.respondJson(conn, .ok, req.json_buf[0..req.json_len]); 227 + } 155 228 156 - var buf: [4096]u8 = undefined; 157 - var w: Io.Writer = .fixed(&buf); 229 + // --- getRepo --- 158 230 159 - w.writeAll("{\"did\":\"") catch return; 160 - w.writeAll(did) catch return; 161 - w.writeAll("\"") catch return; 231 + const GetRepoReq = struct { 232 + base: DbRequest = .{ .callback = &execute }, 233 + did_buf: [256]u8 = undefined, 234 + did_len: usize = 0, 235 + url_buf: [512]u8 = undefined, 236 + url_len: usize = 0, 237 + not_found: bool = false, 238 + db_err: bool = false, 162 239 163 - if (active) { 164 - w.writeAll(",\"active\":true") catch return; 165 - } else { 166 - w.writeAll(",\"active\":false,\"status\":\"") catch return; 167 - w.writeAll(status) catch return; 168 - w.writeAll("\"") catch return; 169 - } 240 + fn execute(b: *DbRequest, dp: *DiskPersist) void { 241 + const self: *@This() = @fieldParentPtr("base", b); 242 + const did = self.did_buf[0..self.did_len]; 243 + 244 + var row = (dp.db.rowUnsafe( 245 + "SELECT h.hostname FROM account a JOIN host h ON a.host_id = h.id WHERE a.did = $1 AND a.host_id > 0", 246 + .{did}, 247 + ) catch { 248 + self.db_err = true; 249 + return; 250 + }) orelse { 251 + self.not_found = true; 252 + return; 253 + }; 254 + defer row.deinit() catch {}; 170 255 171 - if (rev.len > 0) { 172 - w.writeAll(",\"rev\":\"") catch return; 173 - w.writeAll(rev) catch return; 174 - w.writeAll("\"") catch return; 256 + const hostname = row.get([]const u8, 0); 257 + const url = std.fmt.bufPrint(&self.url_buf, "https://{s}/xrpc/com.atproto.sync.getRepo?did={s}", .{ hostname, did }) catch return; 258 + self.url_len = url.len; 175 259 } 260 + }; 176 261 177 - w.writeByte('}') catch return; 178 - h.respondJson(conn, .ok, w.buffered()); 179 - } 180 - 181 - pub fn handleGetRepo(conn: *h.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { 262 + pub fn handleGetRepo(conn: *h.Conn, query: []const u8, ctx: *HttpContext) void { 182 263 var did_buf: [256]u8 = undefined; 183 264 const did = h.queryParamDecoded(query, "did", &did_buf) orelse { 184 265 h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"did parameter required\"}"); 185 266 return; 186 267 }; 187 - 188 268 if (!std.mem.startsWith(u8, did, "did:")) { 189 269 h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid DID\"}"); 190 270 return; 191 271 } 192 272 193 - const ev_db = persist.ensureEvDb() catch { 273 + var req: GetRepoReq = .{}; 274 + @memcpy(req.did_buf[0..did.len], did); 275 + req.did_len = did.len; 276 + ctx.db_queue.push(&req.base); 277 + req.base.wait(ctx.io, ctx.shutdown); 278 + 279 + if (req.base.err != null) { 194 280 h.respondJson(conn, .service_unavailable, "{\"error\":\"ServiceUnavailable\",\"message\":\"database unavailable\"}"); 195 281 return; 196 - }; 197 - 198 - // look up the PDS hostname for this account 199 - var row = (ev_db.rowUnsafe( 200 - "SELECT h.hostname FROM account a JOIN host h ON a.host_id = h.id WHERE a.did = $1 AND a.host_id > 0", 201 - .{did}, 202 - ) catch { 282 + } 283 + if (req.db_err) { 203 284 h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 204 285 return; 205 - }) orelse { 286 + } 287 + if (req.not_found) { 206 288 h.respondJson(conn, .not_found, "{\"error\":\"RepoNotFound\",\"message\":\"account not found\"}"); 207 289 return; 208 - }; 209 - defer row.deinit() catch {}; 290 + } 291 + 292 + h.respondRedirect(conn, req.url_buf[0..req.url_len]); 293 + } 294 + 295 + // --- getLatestCommit --- 296 + 297 + const GetLatestCommitReq = struct { 298 + base: DbRequest = .{ .callback = &execute }, 299 + did_buf: [256]u8 = undefined, 300 + did_len: usize = 0, 301 + json_buf: [4096]u8 = undefined, 302 + json_len: usize = 0, 303 + not_found: bool = false, 304 + db_err: bool = false, 305 + status_err: ?[]const u8 = null, 306 + 307 + fn execute(b: *DbRequest, dp: *DiskPersist) void { 308 + const self: *@This() = @fieldParentPtr("base", b); 309 + const did = self.did_buf[0..self.did_len]; 210 310 211 - const hostname = row.get([]const u8, 0); 311 + var row = (dp.db.rowUnsafe( 312 + "SELECT a.status, a.upstream_status, COALESCE(r.rev, ''), COALESCE(r.commit_data_cid, '') FROM account a LEFT JOIN account_repo r ON a.uid = r.uid WHERE a.did = $1", 313 + .{did}, 314 + ) catch { 315 + self.db_err = true; 316 + return; 317 + }) orelse { 318 + self.not_found = true; 319 + return; 320 + }; 321 + defer row.deinit() catch {}; 212 322 213 - // build redirect URL: https://{hostname}/xrpc/com.atproto.sync.getRepo?did={did} 214 - var url_buf: [512]u8 = undefined; 215 - const url = std.fmt.bufPrint(&url_buf, "https://{s}/xrpc/com.atproto.sync.getRepo?did={s}", .{ hostname, did }) catch return; 323 + const local_status = row.get([]const u8, 0); 324 + const upstream_status = row.get([]const u8, 1); 325 + const rev = row.get([]const u8, 2); 326 + const cid = row.get([]const u8, 3); 327 + 328 + const status = if (!std.mem.eql(u8, local_status, "active")) local_status else upstream_status; 329 + 330 + if (std.mem.eql(u8, status, "takendown") or std.mem.eql(u8, status, "suspended")) { 331 + self.status_err = "{\"error\":\"RepoTakendown\",\"message\":\"account has been taken down\"}"; 332 + return; 333 + } else if (std.mem.eql(u8, status, "deactivated")) { 334 + self.status_err = "{\"error\":\"RepoDeactivated\",\"message\":\"account is deactivated\"}"; 335 + return; 336 + } else if (std.mem.eql(u8, status, "deleted")) { 337 + self.status_err = "{\"error\":\"RepoDeleted\",\"message\":\"account is deleted\"}"; 338 + return; 339 + } else if (!std.mem.eql(u8, status, "active")) { 340 + self.status_err = "{\"error\":\"RepoInactive\",\"message\":\"account is not active\"}"; 341 + return; 342 + } 343 + 344 + if (rev.len == 0 or cid.len == 0) { 345 + self.not_found = true; 346 + return; 347 + } 216 348 217 - h.respondRedirect(conn, url); 218 - } 349 + var w: Io.Writer = .fixed(&self.json_buf); 350 + w.writeAll("{\"cid\":\"") catch return; 351 + w.writeAll(cid) catch return; 352 + w.writeAll("\",\"rev\":\"") catch return; 353 + w.writeAll(rev) catch return; 354 + w.writeAll("\"}") catch return; 355 + self.json_len = w.end; 356 + } 357 + }; 219 358 220 - pub fn handleGetLatestCommit(conn: *h.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { 359 + pub fn handleGetLatestCommit(conn: *h.Conn, query: []const u8, ctx: *HttpContext) void { 221 360 var did_buf: [256]u8 = undefined; 222 361 const did = h.queryParamDecoded(query, "did", &did_buf) orelse { 223 362 h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"did parameter required\"}"); 224 363 return; 225 364 }; 226 - 227 365 if (!std.mem.startsWith(u8, did, "did:")) { 228 366 h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid DID\"}"); 229 367 return; 230 368 } 231 369 232 - const ev_db = persist.ensureEvDb() catch { 370 + var req: GetLatestCommitReq = .{}; 371 + @memcpy(req.did_buf[0..did.len], did); 372 + req.did_len = did.len; 373 + ctx.db_queue.push(&req.base); 374 + req.base.wait(ctx.io, ctx.shutdown); 375 + 376 + if (req.base.err != null) { 233 377 h.respondJson(conn, .service_unavailable, "{\"error\":\"ServiceUnavailable\",\"message\":\"database unavailable\"}"); 234 378 return; 235 - }; 236 - 237 - // look up account + repo state (includes both local and upstream status) 238 - var row = (ev_db.rowUnsafe( 239 - "SELECT a.status, a.upstream_status, COALESCE(r.rev, ''), COALESCE(r.commit_data_cid, '') FROM account a LEFT JOIN account_repo r ON a.uid = r.uid WHERE a.did = $1", 240 - .{did}, 241 - ) catch { 379 + } 380 + if (req.db_err) { 242 381 h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 243 382 return; 244 - }) orelse { 245 - h.respondJson(conn, .not_found, "{\"error\":\"RepoNotFound\",\"message\":\"account not found\"}"); 246 - return; 247 - }; 248 - defer row.deinit() catch {}; 249 - 250 - const local_status = row.get([]const u8, 0); 251 - const upstream_status = row.get([]const u8, 1); 252 - const rev = row.get([]const u8, 2); 253 - const cid = row.get([]const u8, 3); 254 - 255 - // combined status: local takes priority (Go relay: AccountStatus()) 256 - const status = if (!std.mem.eql(u8, local_status, "active")) local_status else upstream_status; 257 - 258 - // check account status (match Go relay behavior) 259 - if (std.mem.eql(u8, status, "takendown") or std.mem.eql(u8, status, "suspended")) { 260 - h.respondJson(conn, .forbidden, "{\"error\":\"RepoTakendown\",\"message\":\"account has been taken down\"}"); 261 - return; 262 - } else if (std.mem.eql(u8, status, "deactivated")) { 263 - h.respondJson(conn, .forbidden, "{\"error\":\"RepoDeactivated\",\"message\":\"account is deactivated\"}"); 264 - return; 265 - } else if (std.mem.eql(u8, status, "deleted")) { 266 - h.respondJson(conn, .forbidden, "{\"error\":\"RepoDeleted\",\"message\":\"account is deleted\"}"); 267 - return; 268 - } else if (!std.mem.eql(u8, status, "active")) { 269 - h.respondJson(conn, .forbidden, "{\"error\":\"RepoInactive\",\"message\":\"account is not active\"}"); 383 + } 384 + if (req.status_err) |err_body| { 385 + h.respondJson(conn, .forbidden, err_body); 270 386 return; 271 387 } 272 - 273 - if (rev.len == 0 or cid.len == 0) { 274 - h.respondJson(conn, .not_found, "{\"error\":\"RepoNotSynchronized\",\"message\":\"relay has no repo data for this account\"}"); 388 + if (req.not_found) { 389 + if (req.json_len == 0) { 390 + h.respondJson(conn, .not_found, "{\"error\":\"RepoNotSynchronized\",\"message\":\"relay has no repo data for this account\"}"); 391 + } else { 392 + h.respondJson(conn, .not_found, "{\"error\":\"RepoNotFound\",\"message\":\"account not found\"}"); 393 + } 275 394 return; 276 395 } 277 396 278 - var buf: [4096]u8 = undefined; 279 - var w: Io.Writer = .fixed(&buf); 397 + h.respondJson(conn, .ok, req.json_buf[0..req.json_len]); 398 + } 280 399 281 - w.writeAll("{\"cid\":\"") catch return; 282 - w.writeAll(cid) catch return; 283 - w.writeAll("\",\"rev\":\"") catch return; 284 - w.writeAll(rev) catch return; 285 - w.writeAll("\"}") catch return; 286 - 287 - h.respondJson(conn, .ok, w.buffered()); 288 - } 400 + // --- listReposByCollection (no DB, uses collection index directly) --- 289 401 290 402 pub fn handleListReposByCollection(conn: *h.Conn, query: []const u8, ci: *collection_index_mod.CollectionIndex) void { 291 403 const collection = h.queryParam(query, "collection") orelse { ··· 311 423 var cursor_buf: [256]u8 = undefined; 312 424 const cursor_did = h.queryParamDecoded(query, "cursor", &cursor_buf); 313 425 314 - // scan collection index 315 426 var did_buf: [65536]u8 = undefined; 316 427 const ci_result = ci.listReposByCollection(collection, limit, cursor_did, &did_buf) catch { 317 428 h.respondJson(conn, .internal_server_error, "{\"error\":\"InternalError\",\"message\":\"index scan failed\"}"); 318 429 return; 319 430 }; 320 431 321 - // build JSON response 322 432 var buf: [65536]u8 = undefined; 323 433 var w: Io.Writer = .fixed(&buf); 324 434 ··· 343 453 h.respondJson(conn, .ok, w.buffered()); 344 454 } 345 455 346 - pub fn handleListHosts(conn: *h.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { 456 + // --- listHosts --- 457 + 458 + const ListHostsReq = struct { 459 + base: DbRequest = .{ .callback = &execute }, 460 + cursor_val: i64, 461 + limit: i64, 462 + json_buf: [65536]u8 = undefined, 463 + json_len: usize = 0, 464 + db_err: bool = false, 465 + 466 + fn execute(b: *DbRequest, dp: *DiskPersist) void { 467 + const self: *@This() = @fieldParentPtr("base", b); 468 + var w: Io.Writer = .fixed(&self.json_buf); 469 + 470 + var result = dp.db.query( 471 + "SELECT id, hostname, status, last_seq FROM host WHERE id > $1 AND last_seq > 0 ORDER BY id ASC LIMIT $2", 472 + .{ self.cursor_val, self.limit }, 473 + ) catch { 474 + self.db_err = true; 475 + return; 476 + }; 477 + defer result.deinit(); 478 + 479 + var count: i64 = 0; 480 + var last_id: i64 = 0; 481 + 482 + w.writeAll("{\"hosts\":[") catch return; 483 + 484 + while (result.nextUnsafe() catch null) |row| { 485 + if (count > 0) w.writeByte(',') catch return; 486 + 487 + const id = row.get(i64, 0); 488 + const hostname = row.get([]const u8, 1); 489 + const status = row.get([]const u8, 2); 490 + const seq = row.get(i64, 3); 491 + 492 + w.writeAll("{\"hostname\":\"") catch return; 493 + w.writeAll(hostname) catch return; 494 + w.writeAll("\"") catch return; 495 + w.print(",\"seq\":{d}", .{seq}) catch return; 496 + w.writeAll(",\"status\":\"") catch return; 497 + w.writeAll(status) catch return; 498 + w.writeAll("\"}") catch return; 499 + 500 + last_id = id; 501 + count += 1; 502 + } 503 + 504 + w.writeByte(']') catch return; 505 + if (count >= self.limit and count > 1) { 506 + w.print(",\"cursor\":\"{d}\"", .{last_id}) catch return; 507 + } 508 + w.writeByte('}') catch return; 509 + self.json_len = w.end; 510 + } 511 + }; 512 + 513 + pub fn handleListHosts(conn: *h.Conn, query: []const u8, ctx: *HttpContext) void { 347 514 const cursor_str = h.queryParam(query, "cursor") orelse "0"; 348 515 const limit_str = h.queryParam(query, "limit") orelse "200"; 349 516 ··· 365 532 return; 366 533 } 367 534 368 - const ev_db = persist.ensureEvDb() catch { 535 + var req: ListHostsReq = .{ .cursor_val = cursor_val, .limit = limit }; 536 + ctx.db_queue.push(&req.base); 537 + req.base.wait(ctx.io, ctx.shutdown); 538 + 539 + if (req.base.err != null) { 369 540 h.respondJson(conn, .service_unavailable, "{\"error\":\"ServiceUnavailable\",\"message\":\"database unavailable\"}"); 370 541 return; 371 - }; 372 - 373 - var result = ev_db.query( 374 - "SELECT id, hostname, status, last_seq FROM host WHERE id > $1 AND last_seq > 0 ORDER BY id ASC LIMIT $2", 375 - .{ cursor_val, limit }, 376 - ) catch { 542 + } 543 + if (req.db_err) { 377 544 h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 378 545 return; 379 - }; 380 - defer result.deinit(); 546 + } 381 547 382 - var buf: [65536]u8 = undefined; 383 - var w: Io.Writer = .fixed(&buf); 548 + h.respondJson(conn, .ok, req.json_buf[0..req.json_len]); 549 + } 550 + 551 + // --- getHostStatus --- 384 552 385 - var count: i64 = 0; 386 - var last_id: i64 = 0; 553 + const GetHostStatusReq = struct { 554 + base: DbRequest = .{ .callback = &execute }, 555 + hostname_buf: [256]u8 = undefined, 556 + hostname_len: usize = 0, 557 + json_buf: [4096]u8 = undefined, 558 + json_len: usize = 0, 559 + not_found: bool = false, 560 + db_err: bool = false, 387 561 388 - w.writeAll("{\"hosts\":[") catch return; 562 + fn execute(b: *DbRequest, dp: *DiskPersist) void { 563 + const self: *@This() = @fieldParentPtr("base", b); 564 + const hostname = self.hostname_buf[0..self.hostname_len]; 389 565 390 - while (result.nextUnsafe() catch null) |row| { 391 - if (count > 0) w.writeByte(',') catch return; 566 + var row = (dp.db.rowUnsafe( 567 + "SELECT id, hostname, status, last_seq FROM host WHERE hostname = $1", 568 + .{hostname}, 569 + ) catch { 570 + self.db_err = true; 571 + return; 572 + }) orelse { 573 + self.not_found = true; 574 + return; 575 + }; 576 + defer row.deinit() catch {}; 392 577 393 - const id = row.get(i64, 0); 394 - const hostname = row.get([]const u8, 1); 395 - const status = row.get([]const u8, 2); 578 + const host_id = row.get(i64, 0); 579 + const host_name = row.get([]const u8, 1); 580 + const raw_status = row.get([]const u8, 2); 396 581 const seq = row.get(i64, 3); 397 582 583 + const status = if (std.mem.eql(u8, raw_status, "blocked")) 584 + "banned" 585 + else if (std.mem.eql(u8, raw_status, "exhausted")) 586 + "offline" 587 + else 588 + raw_status; 589 + 590 + // count accounts on this host 591 + const account_count: i64 = if (dp.db.rowUnsafe( 592 + "SELECT COUNT(*) FROM account WHERE host_id = $1", 593 + .{host_id}, 594 + ) catch null) |cnt_row| blk: { 595 + var r = cnt_row; 596 + defer r.deinit() catch {}; 597 + break :blk r.get(i64, 0); 598 + } else 0; 599 + 600 + var w: Io.Writer = .fixed(&self.json_buf); 398 601 w.writeAll("{\"hostname\":\"") catch return; 399 - w.writeAll(hostname) catch return; 602 + w.writeAll(host_name) catch return; 400 603 w.writeAll("\"") catch return; 401 - w.print(",\"seq\":{d}", .{seq}) catch return; 604 + w.print(",\"seq\":{d},\"accountCount\":{d}", .{ seq, account_count }) catch return; 402 605 w.writeAll(",\"status\":\"") catch return; 403 606 w.writeAll(status) catch return; 404 607 w.writeAll("\"}") catch return; 405 - 406 - last_id = id; 407 - count += 1; 408 - } 409 - 410 - w.writeByte(']') catch return; 411 - 412 - if (count >= limit and count > 1) { 413 - w.print(",\"cursor\":\"{d}\"", .{last_id}) catch return; 608 + self.json_len = w.end; 414 609 } 610 + }; 415 611 416 - w.writeByte('}') catch return; 417 - h.respondJson(conn, .ok, w.buffered()); 418 - } 419 - 420 - pub fn handleGetHostStatus(conn: *h.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { 612 + pub fn handleGetHostStatus(conn: *h.Conn, query: []const u8, ctx: *HttpContext) void { 421 613 var hostname_buf: [256]u8 = undefined; 422 614 const hostname = h.queryParamDecoded(query, "hostname", &hostname_buf) orelse { 423 615 h.respondJson(conn, .bad_request, "{\"error\":\"InvalidRequest\",\"message\":\"hostname parameter required\"}"); 424 616 return; 425 617 }; 426 618 427 - const ev_db = persist.ensureEvDb() catch { 619 + var req: GetHostStatusReq = .{}; 620 + @memcpy(req.hostname_buf[0..hostname.len], hostname); 621 + req.hostname_len = hostname.len; 622 + ctx.db_queue.push(&req.base); 623 + req.base.wait(ctx.io, ctx.shutdown); 624 + 625 + if (req.base.err != null) { 428 626 h.respondJson(conn, .service_unavailable, "{\"error\":\"ServiceUnavailable\",\"message\":\"database unavailable\"}"); 429 627 return; 430 - }; 431 - 432 - // look up host 433 - var row = (ev_db.rowUnsafe( 434 - "SELECT id, hostname, status, last_seq FROM host WHERE hostname = $1", 435 - .{hostname}, 436 - ) catch { 628 + } 629 + if (req.db_err) { 437 630 h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 438 631 return; 439 - }) orelse { 632 + } 633 + if (req.not_found) { 440 634 h.respondJson(conn, .not_found, "{\"error\":\"HostNotFound\",\"message\":\"host not found\"}"); 441 635 return; 442 - }; 443 - defer row.deinit() catch {}; 636 + } 444 637 445 - const host_id = row.get(i64, 0); 446 - const host_name = row.get([]const u8, 1); 447 - const raw_status = row.get([]const u8, 2); 448 - const seq = row.get(i64, 3); 638 + h.respondJson(conn, .ok, req.json_buf[0..req.json_len]); 639 + } 449 640 450 - // map internal status to lexicon hostStatus values 451 - const status = if (std.mem.eql(u8, raw_status, "blocked")) 452 - "banned" 453 - else if (std.mem.eql(u8, raw_status, "exhausted")) 454 - "offline" 455 - else 456 - raw_status; // active, idle pass through 641 + // --- requestCrawl --- 457 642 458 - // count accounts on this host 459 - const account_count: i64 = if (ev_db.rowUnsafe( 460 - "SELECT COUNT(*) FROM account WHERE host_id = $1", 461 - .{host_id}, 462 - ) catch null) |cnt_row| blk: { 463 - var r = cnt_row; 464 - defer r.deinit() catch {}; 465 - break :blk r.get(i64, 0); 466 - } else 0; 467 - 468 - var buf: [4096]u8 = undefined; 469 - var w: Io.Writer = .fixed(&buf); 470 - 471 - w.writeAll("{\"hostname\":\"") catch return; 472 - w.writeAll(host_name) catch return; 473 - w.writeAll("\"") catch return; 474 - w.print(",\"seq\":{d},\"accountCount\":{d}", .{ seq, account_count }) catch return; 475 - w.writeAll(",\"status\":\"") catch return; 476 - w.writeAll(status) catch return; 477 - w.writeAll("\"}") catch return; 478 - 479 - h.respondJson(conn, .ok, w.buffered()); 480 - } 481 - 482 - pub fn handleRequestCrawl(conn: *h.Conn, body: []const u8, slurper: *slurper_mod.Slurper) void { 483 - const parsed = std.json.parseFromSlice(struct { hostname: []const u8 }, slurper.allocator, body, .{ .ignore_unknown_fields = true }) catch { 643 + pub fn handleRequestCrawl(conn: *h.Conn, body: []const u8, ctx: *HttpContext) void { 644 + const parsed = std.json.parseFromSlice(struct { hostname: []const u8 }, ctx.persist.allocator, body, .{ .ignore_unknown_fields = true }) catch { 484 645 h.respondJson(conn, .bad_request, "{\"error\":\"InvalidRequest\",\"message\":\"invalid JSON, expected {\\\"hostname\\\":\\\"...\\\"}\"}"); 485 646 return; 486 647 }; 487 648 defer parsed.deinit(); 488 649 489 - // fast validation: hostname format (Go relay does this synchronously in handler) 490 - const hostname = slurper_mod.validateHostname(slurper.allocator, parsed.value.hostname) catch |err| { 650 + // fast validation: hostname format 651 + const hostname = slurper_mod.validateHostname(ctx.persist.allocator, parsed.value.hostname) catch |err| { 491 652 log.warn("requestCrawl rejected '{s}': {s}", .{ parsed.value.hostname, @errorName(err) }); 492 653 h.respondJson(conn, .bad_request, switch (err) { 493 654 error.EmptyHostname => "{\"error\":\"InvalidRequest\",\"message\":\"empty hostname\"}", ··· 501 662 }); 502 663 return; 503 664 }; 504 - defer slurper.allocator.free(hostname); 665 + defer ctx.persist.allocator.free(hostname); 505 666 506 - // fast validation: domain ban check (Evented fiber — use Ev pool) 507 - if (slurper.persist.isDomainBannedEv(hostname) catch false) { 667 + // domain ban check via DbRequestQueue 668 + const DomainBanReq = struct { 669 + base_req: DbRequest = .{ .callback = &execute }, 670 + hostname_buf: [256]u8 = undefined, 671 + hostname_len: usize = 0, 672 + banned: bool = false, 673 + 674 + fn execute(b: *DbRequest, dp: *DiskPersist) void { 675 + const self: *@This() = @fieldParentPtr("base_req", b); 676 + self.banned = dp.isDomainBanned(self.hostname_buf[0..self.hostname_len]); 677 + } 678 + }; 679 + var ban_req: DomainBanReq = .{}; 680 + const copy_len = @min(hostname.len, ban_req.hostname_buf.len); 681 + @memcpy(ban_req.hostname_buf[0..copy_len], hostname[0..copy_len]); 682 + ban_req.hostname_len = copy_len; 683 + ctx.db_queue.push(&ban_req.base_req); 684 + ban_req.base_req.wait(ctx.io, ctx.shutdown); 685 + 686 + if (ban_req.banned) { 508 687 log.warn("requestCrawl rejected '{s}': domain banned", .{hostname}); 509 688 h.respondJson(conn, .bad_request, "{\"error\":\"InvalidRequest\",\"message\":\"domain is banned\"}"); 510 689 return; 511 690 } 512 691 513 - // enqueue for async processing (describeServer check happens in crawl processor) 514 - slurper.addCrawlRequest(hostname) catch { 692 + // enqueue for async processing 693 + ctx.slurper.addCrawlRequest(hostname) catch { 515 694 h.respondJson(conn, .internal_server_error, "{\"error\":\"failed to store crawl request\"}"); 516 695 return; 517 696 };
+28 -13
src/backfill.zig
··· 27 27 collection_index: *collection_index_mod.CollectionIndex, 28 28 persist: *event_log_mod.DiskPersist, 29 29 running: std.atomic.Value(bool), 30 - future: ?Io.Future(void), 30 + thread: ?std.Thread, 31 31 source: []const u8, 32 32 io: Io, 33 + shutdown: *std.atomic.Value(bool), 33 34 34 - fn db(self: *Backfiller) !*pg.Pool { 35 - return self.persist.ensureEvDb(); 35 + fn db(self: *Backfiller) *pg.Pool { 36 + return self.persist.db; 36 37 } 37 38 38 39 pub fn init( ··· 40 41 collection_index: *collection_index_mod.CollectionIndex, 41 42 persist: *event_log_mod.DiskPersist, 42 43 io: Io, 44 + shutdown: *std.atomic.Value(bool), 43 45 ) Backfiller { 44 46 return .{ 45 47 .allocator = allocator, 46 48 .collection_index = collection_index, 47 49 .persist = persist, 48 50 .running = .{ .raw = false }, 49 - .future = null, 51 + .thread = null, 50 52 .source = "", 51 53 .io = io, 54 + .shutdown = shutdown, 52 55 }; 53 56 } 54 57 ··· 56 59 return self.running.load(.acquire); 57 60 } 58 61 62 + /// block until any in-progress backfill thread completes. 63 + /// must be called before tearing down DiskPersist or CollectionIndex. 64 + pub fn waitForCompletion(self: *Backfiller) void { 65 + if (self.thread) |t| { 66 + t.join(); 67 + self.thread = null; 68 + } 69 + } 70 + 59 71 /// start a backfill from the given source relay. returns error if already running. 60 72 pub fn start(self: *Backfiller, source: []const u8) !void { 61 73 if (self.running.cmpxchgStrong(false, true, .acq_rel, .acquire) != null) { ··· 68 80 self.allocator.free(self.source); 69 81 self.source = ""; 70 82 } 71 - self.future = try self.io.concurrent(run, .{self}); 83 + self.thread = std.Thread.spawn(.{}, run, .{self}) catch return error.SpawnFailed; 72 84 } 73 85 74 86 fn run(self: *Backfiller) void { 75 87 defer { 76 88 self.allocator.free(self.source); 77 89 self.source = ""; 78 - self.future = null; 90 + // note: do NOT clear self.thread here — waitForCompletion() needs 91 + // the handle to join this thread before dp/ci teardown. the running 92 + // flag gates start(), so a stale thread handle is harmless. 79 93 self.running.store(false, .release); 80 94 } 81 95 82 - const pool = self.db() catch |err| { 83 - log.err("backfill: database unavailable: {s}", .{@errorName(err)}); 84 - return; 85 - }; 96 + const pool = self.db(); 86 97 87 98 // discover collections 88 99 const collections = self.discoverCollections() catch |err| { ··· 108 119 109 120 // backfill each collection 110 121 for (collections) |collection| { 122 + if (self.shutdown.load(.acquire)) { 123 + log.info("backfill interrupted by shutdown", .{}); 124 + return; 125 + } 111 126 self.backfillCollection(collection) catch |err| { 112 127 log.warn("backfill failed for {s}: {s}", .{ collection, @errorName(err) }); 113 128 }; ··· 212 227 } 213 228 214 229 fn backfillCollection(self: *Backfiller, collection: []const u8) !void { 215 - const pool = try self.db(); 230 + const pool = self.db(); 216 231 217 232 // single query: check completion, get cursor + count for resume 218 233 var cursor: ?[]const u8 = null; ··· 241 256 defer client.deinit(); 242 257 243 258 var page_count: usize = 0; 244 - while (true) { 259 + while (!self.shutdown.load(.acquire)) { 245 260 const fetch_result = self.fetchPage(&client, collection, cursor) catch |err| { 246 261 log.warn("{s}: fetch page failed: {s}", .{ collection, @errorName(err) }); 247 262 break; ··· 349 364 350 365 /// return status summary for the admin endpoint 351 366 pub fn getStatus(self: *Backfiller, allocator: Allocator) ![]u8 { 352 - const pool = try self.db(); 367 + const pool = self.db(); 353 368 354 369 var aw: Io.Writer.Allocating = .init(allocator); 355 370 defer aw.deinit();
+15 -2
src/broadcaster.zig
··· 470 470 broadcast_queue: BroadcastQueue, 471 471 history: FrameHistory, 472 472 persist: ?*event_log_mod.DiskPersist = null, 473 + db_queue: ?*event_log_mod.DbRequestQueue = null, 473 474 stats: Stats = .{}, 474 475 error_frame: ?[]const u8 = null, 475 476 http_fallback: ?HttpFallbackFn = null, ··· 781 782 782 783 // OutdatedCursor: cursor older than oldest available — info, continue 783 784 const oldest = blk: { 784 - if (ctx.persist) |dp| { 785 - if (dp.firstSeqEv() catch null) |s| break :blk s; 785 + if (ctx.persist != null and ctx.db_queue != null) { 786 + const FirstSeqReq = struct { 787 + base: event_log_mod.DbRequest = .{ .callback = &execute }, 788 + result: ?u64 = null, 789 + 790 + fn execute(b: *event_log_mod.DbRequest, dp: *event_log_mod.DiskPersist) void { 791 + const s: *@This() = @fieldParentPtr("base", b); 792 + s.result = dp.firstSeq(); 793 + } 794 + }; 795 + var first_req: FirstSeqReq = .{}; 796 + ctx.db_queue.?.push(&first_req.base); 797 + first_req.base.wait(ctx.io, ctx.shutdown); 798 + if (first_req.result) |s| break :blk s; 786 799 } 787 800 break :blk ctx.history.oldestSeq() orelse 0; 788 801 };
+21 -11
src/cleaner.zig
··· 19 19 collection_index: *collection_index_mod.CollectionIndex, 20 20 persist: *event_log_mod.DiskPersist, 21 21 running: std.atomic.Value(bool), 22 - future: ?Io.Future(void), 22 + thread: ?std.Thread, 23 23 scanned: std.atomic.Value(u64), 24 24 removed: std.atomic.Value(u64), 25 + shutdown: *std.atomic.Value(bool), 25 26 26 - fn db(self: *Cleaner) !*pg.Pool { 27 - return self.persist.ensureEvDb(); 27 + fn db(self: *Cleaner) *pg.Pool { 28 + return self.persist.db; 28 29 } 29 30 30 31 pub fn init( ··· 32 33 io: Io, 33 34 collection_index: *collection_index_mod.CollectionIndex, 34 35 persist: *event_log_mod.DiskPersist, 36 + shutdown: *std.atomic.Value(bool), 35 37 ) Cleaner { 36 38 return .{ 37 39 .allocator = allocator, ··· 39 41 .collection_index = collection_index, 40 42 .persist = persist, 41 43 .running = .{ .raw = false }, 42 - .future = null, 44 + .thread = null, 43 45 .scanned = .{ .raw = 0 }, 44 46 .removed = .{ .raw = 0 }, 47 + .shutdown = shutdown, 45 48 }; 46 49 } 47 50 ··· 49 52 return self.running.load(.acquire); 50 53 } 51 54 55 + /// block until any in-progress cleanup thread completes. 56 + /// must be called before tearing down DiskPersist or CollectionIndex. 57 + pub fn waitForCompletion(self: *Cleaner) void { 58 + if (self.thread) |t| { 59 + t.join(); 60 + self.thread = null; 61 + } 62 + } 63 + 52 64 /// start cleanup. returns error if already running. 53 65 pub fn start(self: *Cleaner) !void { 54 66 if (self.running.cmpxchgStrong(false, true, .acq_rel, .acquire) != null) { ··· 58 70 59 71 self.scanned.store(0, .release); 60 72 self.removed.store(0, .release); 61 - self.future = try self.io.concurrent(run, .{self}); 73 + self.thread = std.Thread.spawn(.{}, run, .{self}) catch return error.SpawnFailed; 62 74 } 63 75 64 76 fn run(self: *Cleaner) void { 65 77 defer { 66 - self.future = null; 78 + // note: do NOT clear self.thread here — waitForCompletion() needs 79 + // the handle to join this thread before dp/ci teardown. 67 80 self.running.store(false, .release); 68 81 } 69 82 70 83 log.info("cleanup started", .{}); 71 84 72 - const pool = self.db() catch |err| { 73 - log.err("cleanup: database unavailable: {s}", .{@errorName(err)}); 74 - return; 75 - }; 85 + const pool = self.db(); 76 86 77 87 // page through inactive accounts by uid 78 88 var last_uid: i64 = 0; 79 - while (true) { 89 + while (!self.shutdown.load(.acquire)) { 80 90 var batch_count: u64 = 0; 81 91 { 82 92 var result = pool.query(
+90 -157
src/event_log.zig
··· 87 87 next: std.atomic.Value(?*PlaybackRequest) = .{ .raw = null }, 88 88 }; 89 89 90 + /// cross-Io DB request — Evented fiber posts, pool_io worker executes. 91 + /// callers define typed structs embedding DbRequest + @fieldParentPtr. 92 + pub const DbRequest = struct { 93 + callback: *const fn (*DbRequest, *DiskPersist) void, 94 + done: std.atomic.Value(bool) = .{ .raw = false }, 95 + err: ?anyerror = null, 96 + 97 + /// spin-wait for completion, yielding via io.sleep when available. 98 + /// never returns until `done` is true — the request struct lives on the 99 + /// caller's stack, so we must outlive the worker's callback execution. 100 + /// the queue's shutdown drain sets done+err on unprocessed requests, 101 + /// and workers set done after the callback returns, so this always terminates. 102 + pub fn wait(self: *DbRequest, io: Io, shutdown: *std.atomic.Value(bool)) void { 103 + _ = shutdown; 104 + while (!self.done.load(.acquire)) { 105 + io.sleep(Io.Duration.fromMicroseconds(100), .awake) catch { 106 + // sleep failed (Io shutting down) — busy-wait for worker to finish 107 + while (!self.done.load(.acquire)) { 108 + std.atomic.spinLoopHint(); 109 + } 110 + break; 111 + }; 112 + } 113 + } 114 + }; 115 + 116 + /// MPSC FIFO ring buffer for general DB traffic. 117 + /// multiple producers (Evented fibers) push via spinlock, 118 + /// multiple consumers (pool_io worker threads) pop via CAS. 119 + pub const DbRequestQueue = struct { 120 + const CAPACITY = 4096; 121 + 122 + items: [CAPACITY]*DbRequest = undefined, 123 + head: std.atomic.Value(u32) = .{ .raw = 0 }, 124 + tail: std.atomic.Value(u32) = .{ .raw = 0 }, 125 + push_lock: std.atomic.Value(u32) = .{ .raw = 0 }, // producer spinlock 126 + shutdown: *std.atomic.Value(bool), 127 + persist: *DiskPersist, 128 + 129 + pub fn push(self: *DbRequestQueue, req: *DbRequest) void { 130 + // acquire producer spinlock 131 + while (self.push_lock.cmpxchgWeak(0, 1, .acquire, .monotonic) != null) { 132 + std.atomic.spinLoopHint(); 133 + } 134 + defer self.push_lock.store(0, .release); 135 + 136 + const tail = self.tail.load(.monotonic); 137 + const head = self.head.load(.acquire); 138 + // if full, spin until space opens (workers are fast, this shouldn't happen) 139 + if (tail -% head >= CAPACITY) { 140 + self.push_lock.store(0, .release); 141 + while (self.tail.load(.monotonic) -% self.head.load(.acquire) >= CAPACITY) { 142 + std.atomic.spinLoopHint(); 143 + } 144 + while (self.push_lock.cmpxchgWeak(0, 1, .acquire, .monotonic) != null) { 145 + std.atomic.spinLoopHint(); 146 + } 147 + } 148 + 149 + self.items[self.tail.load(.monotonic) % CAPACITY] = req; 150 + self.tail.store(self.tail.load(.monotonic) +% 1, .release); 151 + } 152 + 153 + pub fn pop(self: *DbRequestQueue) ?*DbRequest { 154 + while (true) { 155 + const head = self.head.load(.acquire); 156 + if (head == self.tail.load(.acquire)) return null; 157 + const item = self.items[head % CAPACITY]; 158 + if (self.head.cmpxchgWeak(head, head +% 1, .acq_rel, .acquire) == null) 159 + return item; 160 + } 161 + } 162 + 163 + pub fn run(self: *DbRequestQueue, pool_io: Io) void { 164 + while (!self.shutdown.load(.acquire)) { 165 + if (self.pop()) |req| { 166 + req.callback(req, self.persist); 167 + req.done.store(true, .release); 168 + } else { 169 + pool_io.sleep(Io.Duration.fromMilliseconds(5), .awake) catch return; 170 + } 171 + } 172 + // shutdown drain — signal error on unprocessed requests 173 + while (self.pop()) |req| { 174 + req.err = error.ShuttingDown; 175 + req.done.store(true, .release); 176 + } 177 + } 178 + }; 179 + 90 180 pub const DiskPersist = struct { 91 181 allocator: Allocator, 92 182 dir_path: []const u8, 93 183 dir: Io.Dir, 94 184 db: *pg.Pool, 95 - /// Evented-safe pg.Pool — lazy-initialized on first use from an Evented fiber. 96 - /// pg.Pool.initUri requires the event loop to be running (TCP connect goes through 97 - /// io_uring), so we can't create it during main() init. Evented callers access it 98 - /// via ensureEvDb(). pool_io callers keep using self.db. 99 - ev_db: ?*pg.Pool = null, 100 - ev_db_state: std.atomic.Value(u8) = .{ .raw = 0 }, 101 - /// Evented Io — set by main before any fibers run. used for lazy ev_db init. 102 - ev_io: ?Io = null, 103 - /// database URL for lazy ev_db init (stable ref to process env string) 104 - db_url: []const u8 = "", 105 - /// pool size for lazy ev_db init 106 - ev_db_pool_size: u16 = 0, 107 185 current_file: ?Io.File = null, 108 186 current_file_path: ?[]const u8 = null, 109 187 current_file_pos: u64 = 0, ··· 138 216 /// MPSC queue for cross-Io playback requests (Evented → pool_io) 139 217 playback_head: std.atomic.Value(?*PlaybackRequest) = .{ .raw = null }, 140 218 141 - const EvDbInit = enum(u8) { uninit = 0, initializing = 1, ready = 2 }; 142 - 143 - /// returns the Evented pg.Pool, lazy-initializing on first call. 144 - /// pg.Pool.initUri does TCP connects via io_uring, so it can only run inside 145 - /// an Evented fiber (after the event loop is spinning). callers from main() 146 - /// init set ev_io/db_url/ev_db_pool_size; the actual pool is created here. 147 - /// on failure, resets state to uninit so the next call retries. 148 - pub fn ensureEvDb(self: *DiskPersist) !*pg.Pool { 149 - // fast path — already initialized (single atomic load) 150 - if (self.ev_db) |db| return db; 151 - 152 - const ev_io = self.ev_io orelse return error.EvDbNotConfigured; 153 - 154 - while (true) { 155 - const state: EvDbInit = @enumFromInt(self.ev_db_state.load(.acquire)); 156 - switch (state) { 157 - .ready => return self.ev_db orelse error.EvDbNotConfigured, 158 - .uninit => { 159 - if (self.ev_db_state.cmpxchgWeak( 160 - @intFromEnum(EvDbInit.uninit), 161 - @intFromEnum(EvDbInit.initializing), 162 - .acquire, 163 - .monotonic, 164 - ) == null) { 165 - // won the race — create the pool 166 - const uri = std.Uri.parse(self.db_url) catch 167 - return error.InvalidDatabaseUrl; 168 - self.ev_db = pg.Pool.initUri( 169 - self.allocator, 170 - ev_io, 171 - uri, 172 - .{ .size = self.ev_db_pool_size }, 173 - ) catch |err| { 174 - log.err("ensureEvDb: initUri failed: {s}", .{@errorName(err)}); 175 - // reset to uninit so next call retries 176 - self.ev_db_state.store(@intFromEnum(EvDbInit.uninit), .release); 177 - return error.EvDbInitFailed; 178 - }; 179 - self.ev_db_state.store(@intFromEnum(EvDbInit.ready), .release); 180 - log.info("lazy-initialized Evented pg.Pool (size={d})", .{self.ev_db_pool_size}); 181 - return self.ev_db.?; 182 - } 183 - // lost CAS — another fiber is initializing, fall through 184 - }, 185 - .initializing => { 186 - // yield to let the initializing fiber complete 187 - ev_io.sleep(Io.Duration.fromMilliseconds(1), .awake) catch {}; 188 - }, 189 - } 190 - } 191 - } 192 - 193 219 /// current evtbuf entry count (for metrics — non-blocking, returns 0 if lock is contended) 194 220 pub fn evtbufLen(self: *DiskPersist) usize { 195 221 if (!self.mutex.tryLock()) return 0; ··· 353 379 if (self.current_file) |f| f.close(self.io); 354 380 if (self.current_file_path) |p| self.allocator.free(p); 355 381 self.dir.close(self.io); 356 - if (self.ev_db) |ev| ev.deinit(); 357 382 self.db.deinit(); 358 383 self.allocator.free(self.dir_path); 359 384 } ··· 501 526 return getHostAccountCountImpl(host_id, self.db); 502 527 } 503 528 504 - /// count accounts on a host (Evented pool) 505 - pub fn getHostAccountCountEv(self: *DiskPersist, host_id: u64) !u64 { 506 - return getHostAccountCountImpl(host_id, try self.ensureEvDb()); 507 - } 508 - 509 529 fn getHostAccountCountImpl(host_id: u64, db: *pg.Pool) u64 { 510 530 var row = (db.rowUnsafe( 511 531 "SELECT COUNT(*) FROM account WHERE host_id = $1", ··· 521 541 return getEffectiveAccountCountImpl(host_id, self.db); 522 542 } 523 543 524 - /// effective account count (Evented pool) 525 - pub fn getEffectiveAccountCountEv(self: *DiskPersist, host_id: u64) !u64 { 526 - return getEffectiveAccountCountImpl(host_id, try self.ensureEvDb()); 527 - } 528 - 529 544 /// uses admin-configured limit if set, otherwise actual COUNT(*). 530 545 fn getEffectiveAccountCountImpl(host_id: u64, db: *pg.Pool) u64 { 531 546 var row = (db.rowUnsafe( ··· 540 555 /// set host account limit (Threaded pool) 541 556 pub fn setHostAccountLimit(self: *DiskPersist, host_id: u64, limit: ?u64) !void { 542 557 return setHostAccountLimitImpl(host_id, limit, self.db); 543 - } 544 - 545 - /// set host account limit (Evented pool) 546 - pub fn setHostAccountLimitEv(self: *DiskPersist, host_id: u64, limit: ?u64) !void { 547 - return setHostAccountLimitImpl(host_id, limit, try self.ensureEvDb()); 548 558 } 549 559 550 560 /// pass null to clear the override and revert to actual COUNT(*). ··· 614 624 return getOrCreateHostImpl(hostname, self.db); 615 625 } 616 626 617 - /// get or create a host row (Evented pool) 618 - pub fn getOrCreateHostEv(self: *DiskPersist, hostname: []const u8) !HostResult { 619 - return getOrCreateHostImpl(hostname, try self.ensureEvDb()); 620 - } 621 - 622 627 fn getOrCreateHostImpl(hostname: []const u8, db: *pg.Pool) !HostResult { 623 628 _ = db.exec( 624 629 "INSERT INTO host (hostname) VALUES ($1) ON CONFLICT (hostname) DO NOTHING", ··· 644 649 return isHostBannedImpl(hostname, self.db); 645 650 } 646 651 647 - /// check if a host is banned or blocked by status (Evented pool) 648 - pub fn isHostBannedEv(self: *DiskPersist, hostname: []const u8) !bool { 649 - return isHostBannedImpl(hostname, try self.ensureEvDb()); 650 - } 651 - 652 652 fn isHostBannedImpl(hostname: []const u8, db: *pg.Pool) bool { 653 653 var row = db.rowUnsafe( 654 654 "SELECT status FROM host WHERE hostname = $1", ··· 675 675 return getHostIdForHostnameImpl(hostname, self.db); 676 676 } 677 677 678 - /// look up host ID by hostname (Evented pool) 679 - pub fn getHostIdForHostnameEv(self: *DiskPersist, hostname: []const u8) !?u64 { 680 - return getHostIdForHostnameImpl(hostname, try self.ensureEvDb()); 681 - } 682 - 683 678 fn getHostIdForHostnameImpl(hostname: []const u8, db: *pg.Pool) !?u64 { 684 679 var row = (try db.rowUnsafe( 685 680 "SELECT id FROM host WHERE hostname = $1", ··· 694 689 return updateHostStatusImpl(host_id, status, self.db); 695 690 } 696 691 697 - /// update host status (Evented pool) 698 - pub fn updateHostStatusEv(self: *DiskPersist, host_id: u64, status: []const u8) !void { 699 - return updateHostStatusImpl(host_id, status, try self.ensureEvDb()); 700 - } 701 - 702 692 fn updateHostStatusImpl(host_id: u64, status: []const u8, db: *pg.Pool) !void { 703 693 _ = try db.exec( 704 694 "UPDATE host SET status = $2, updated_at = now() WHERE id = $1", ··· 711 701 return listActiveHostsImpl(allocator, self.db); 712 702 } 713 703 714 - /// list all active hosts (Evented pool) 715 - pub fn listActiveHostsEv(self: *DiskPersist, allocator: Allocator) ![]Host { 716 - return listActiveHostsImpl(allocator, try self.ensureEvDb()); 717 - } 718 - 719 704 fn listActiveHostsImpl(allocator: Allocator, db: *pg.Pool) ![]Host { 720 705 var hosts: std.ArrayListUnmanaged(Host) = .empty; 721 706 errdefer { ··· 749 734 /// list all hosts (Threaded pool) 750 735 pub fn listAllHosts(self: *DiskPersist, allocator: Allocator) ![]Host { 751 736 return listAllHostsImpl(allocator, self.db); 752 - } 753 - 754 - /// list all hosts (Evented pool) 755 - pub fn listAllHostsEv(self: *DiskPersist, allocator: Allocator) ![]Host { 756 - return listAllHostsImpl(allocator, try self.ensureEvDb()); 757 737 } 758 738 759 739 fn listAllHostsImpl(allocator: Allocator, db: *pg.Pool) ![]Host { ··· 805 785 return self.isDomainBannedImpl(hostname, self.db); 806 786 } 807 787 808 - /// check if a hostname (or any parent domain) is banned (Evented pool). 809 - pub fn isDomainBannedEv(self: *DiskPersist, hostname: []const u8) !bool { 810 - return self.isDomainBannedImpl(hostname, try self.ensureEvDb()); 811 - } 812 - 813 788 /// Go relay: domain_ban.go DomainIsBanned — suffix-based check. 814 789 fn isDomainBannedImpl(_: *DiskPersist, hostname: []const u8, db: *pg.Pool) bool { 815 790 // check each suffix: "pds.host.example.com", "host.example.com", "example.com" ··· 837 812 return resetHostFailuresImpl(host_id, self.db); 838 813 } 839 814 840 - /// reset failure count (Evented pool) 841 - pub fn resetHostFailuresEv(self: *DiskPersist, host_id: u64) !void { 842 - return resetHostFailuresImpl(host_id, try self.ensureEvDb()); 843 - } 844 - 845 815 fn resetHostFailuresImpl(host_id: u64, db: *pg.Pool) !void { 846 816 _ = try db.exec( 847 817 "UPDATE host SET failed_attempts = 0, updated_at = now() WHERE id = $1", ··· 849 819 ); 850 820 } 851 821 852 - /// resolve a DID to UID using the Evented pool. skips the DID cache 853 - /// (which uses pool_io mutex). only used from admin ban (rare path). 854 - pub fn uidForDidEv(self: *DiskPersist, did: []const u8) !u64 { 855 - const db = try self.ensureEvDb(); 856 - // check database 857 - if (try db.rowUnsafe( 858 - "SELECT uid FROM account WHERE did = $1", 859 - .{did}, 860 - )) |row| { 861 - var r = row; 862 - defer r.deinit() catch {}; 863 - return @intCast(r.get(i64, 0)); 864 - } 865 - 866 - // create new account row 867 - _ = db.exec( 868 - "INSERT INTO account (did) VALUES ($1) ON CONFLICT (did) DO NOTHING", 869 - .{did}, 870 - ) catch |err| { 871 - log.warn("failed to create account for {s}: {s}", .{ did, @errorName(err) }); 872 - return err; 873 - }; 874 - 875 - // read back the UID 876 - var row = try db.rowUnsafe( 877 - "SELECT uid FROM account WHERE did = $1", 878 - .{did}, 879 - ) orelse return error.AccountCreationFailed; 880 - defer row.deinit() catch {}; 881 - return @intCast(row.get(i64, 0)); 882 - } 883 - 884 822 /// enqueue a playback request for the pool_io worker to execute. 885 823 /// uses MPSC push (lock-free Treiber stack). 886 824 pub fn enqueuePlayback(self: *DiskPersist, req: *PlaybackRequest) void { ··· 998 936 /// oldest available sequence number (Threaded pool) 999 937 pub fn firstSeq(self: *DiskPersist) ?u64 { 1000 938 return firstSeqImpl(self.db); 1001 - } 1002 - 1003 - /// oldest available sequence number (Evented pool) 1004 - pub fn firstSeqEv(self: *DiskPersist) !?u64 { 1005 - return firstSeqImpl(try self.ensureEvDb()); 1006 939 } 1007 940 1008 941 fn firstSeqImpl(db: *pg.Pool) ?u64 {
+51 -11
src/main.zig
··· 227 227 dp.retention_hours = retention_hours; 228 228 dp.max_dir_bytes = max_events_gb * 1024 * 1024 * 1024; 229 229 230 - // configure lazy Evented pg.Pool — can't create it here because pg.Pool.initUri 231 - // does TCP connects via io_uring, which requires the event loop to be running. 232 - // the pool is created on first use from an Evented fiber (via dp.ensureEvDb()). 233 - dp.ev_io = io; 234 - dp.db_url = database_url; 235 - dp.ev_db_pool_size = db_pool_size; 230 + // DbRequestQueue — general DB traffic from Evented fibers routed to pool_io workers. 231 + // replaces the broken ev_db (Evented pg.Pool) approach. 232 + var db_queue: event_log_mod.DbRequestQueue = .{ 233 + .shutdown = &shutdown_flag, 234 + .persist = &dp, 235 + }; 236 236 237 237 if (dp.lastSeq()) |last| { 238 238 log.info("event log recovered: last_seq={d}", .{last}); ··· 241 241 // start flush thread 242 242 try dp.start(); 243 243 244 + // spawn 2 DbRequestQueue worker threads on pool_io 245 + const db_worker_1 = std.Thread.spawn(.{}, event_log_mod.DbRequestQueue.run, .{ &db_queue, pool_io }) catch |err| { 246 + log.err("failed to start db queue worker 1: {s}", .{@errorName(err)}); 247 + return err; 248 + }; 249 + const db_worker_2 = std.Thread.spawn(.{}, event_log_mod.DbRequestQueue.run, .{ &db_queue, pool_io }) catch |err| { 250 + log.err("failed to start db queue worker 2: {s}", .{@errorName(err)}); 251 + return err; 252 + }; 253 + 244 254 // wire persist into broadcaster for cursor replay and validator for migration checks 245 255 bc.persist = &dp; 256 + bc.db_queue = &db_queue; 246 257 val.persist = &dp; 247 258 248 259 // init collection index (RocksDB — inspired by lightrail/microcosm.blue) ··· 254 265 defer ci.deinit(); 255 266 256 267 // init backfiller (collection index backfill from source relay) 257 - // uses ev_db (Evented pool) — backfiller spawns Evented fibers via io.concurrent() 258 - var backfiller = backfill_mod.Backfiller.init(allocator, &ci, &dp, io); 268 + // uses pool_io (Threaded) — backfiller spawns std.Thread, DNS works 269 + var backfiller = backfill_mod.Backfiller.init(allocator, &ci, &dp, pool_io, &shutdown_flag); 259 270 260 271 // init cleaner (removes stale entries from collection index) 261 - // uses ev_db (Evented pool) — cleaner spawns Evented fibers via io.concurrent() 262 - var cleaner = cleaner_mod.Cleaner.init(allocator, io, &ci, &dp); 272 + // uses pool_io (Threaded) — cleaner spawns std.Thread, checks shutdown 273 + var cleaner = cleaner_mod.Cleaner.init(allocator, pool_io, &ci, &dp, &shutdown_flag); 263 274 264 275 // init resyncer (updates collection index on #sync events) 265 276 // runs entirely on pool_io (Threaded) — enqueue() is called from frame worker ··· 309 320 slurper.resyncer = &resyncer; 310 321 slurper.host_ops = &host_ops_queue; 311 322 slurper.cursor_map = &cursor_map; 323 + slurper.db_queue = &db_queue; 312 324 313 - // start: loads active hosts from DB, spawns subscriber threads 325 + // start: loads active hosts from DB, spawns subscriber threads. 326 + // pullHosts runs on its own std.Thread (parallel, not gating spawnWorkers). 314 327 try slurper.start(); 315 328 329 + // spawn pullHosts on a separate thread — runs in parallel with spawnWorkers, 330 + // uses pool_io for HTTP (DNS works), writes to DB via Threaded pool directly. 331 + const pull_hosts_thread = if (slurper.options.seed_host.len > 0) blk: { 332 + log.info("pulling hosts from {s} (background thread)...", .{slurper.options.seed_host}); 333 + break :blk std.Thread.spawn(.{}, slurper_mod.Slurper.pullHosts, .{&slurper}) catch |err| { 334 + log.warn("failed to spawn pullHosts thread: {s}", .{@errorName(err)}); 335 + break :blk null; 336 + }; 337 + } else blk: { 338 + log.info("no seed host configured, skipping bootstrap", .{}); 339 + break :blk null; 340 + }; 341 + 316 342 // start broadcaster fiber — drains broadcast queue, owns all consumer state. 317 343 // this is the Evented-side sequencer: frame workers push results to the queue, 318 344 // this fiber does the actual fan-out to downstream consumers. ··· 340 366 .validator = &val, 341 367 .host_ops = &host_ops_queue, 342 368 .pool_io = pool_io, 369 + .io = io, 370 + .db_queue = &db_queue, 371 + .shutdown = &shutdown_flag, 343 372 }; 344 373 bc.http_fallback = api.handleHttpRequest; 345 374 bc.http_fallback_ctx = @ptrCast(&http_context); ··· 399 428 // must complete before dp.deinit() runs (dp is stack-owned). 400 429 gc_thread.join(); 401 430 431 + // join pullHosts thread if running 432 + if (pull_hosts_thread) |t| t.join(); 433 + 434 + // join db queue workers — shutdown flag already set, they will drain and exit 435 + db_worker_1.join(); 436 + db_worker_2.join(); 437 + 402 438 // join host ops thread — drains remaining ops before dp.deinit() 403 439 host_ops_thread.join(); 440 + 441 + // join backfiller/cleaner threads if running — they touch dp and ci 442 + backfiller.waitForCompletion(); 443 + cleaner.waitForCompletion(); 404 444 405 445 // cancel broadcaster fiber (shutdown flag already set, it will drain remaining) 406 446 broadcast_future.cancel(io);
+147 -43
src/slurper.zig
··· 231 231 resyncer: ?*resync_mod.Resyncer = null, 232 232 host_ops: ?*host_ops_mod.HostOpsQueue = null, 233 233 cursor_map: ?*host_ops_mod.CursorMap = null, 234 + db_queue: ?*event_log_mod.DbRequestQueue = null, 234 235 shutdown: *std.atomic.Value(bool), 235 236 options: Options, 236 237 ··· 304 305 } 305 306 306 307 /// pull PDS host list from the seed relay's com.atproto.sync.listHosts endpoint. 308 + /// runs on its own std.Thread (pool_io) — DNS and outbound HTTP work. 307 309 /// Go relay: cmd/relay/pull.go — one-time bootstrap, reads REST API, not firehose. 308 - pub fn pullHosts(self: *Slurper) !void { 310 + pub fn pullHosts(self: *Slurper) void { 309 311 var cursor: ?[]const u8 = null; 310 312 var total: usize = 0; 311 313 const limit = 500; 312 314 313 - var client: http.Client = .{ .allocator = self.allocator, .io = self.io }; 315 + var client: http.Client = .{ .allocator = self.allocator, .io = self.pool_io }; 314 316 defer client.deinit(); 315 317 316 318 while (true) { ··· 362 364 const normalized = validateHostname(self.allocator, host.hostname) catch continue; 363 365 defer self.allocator.free(normalized); 364 366 365 - // skip banned domains (Evented fiber — use Ev pool) 366 - if (self.persist.isDomainBannedEv(normalized) catch true) continue; 367 + // skip banned domains (Threaded pool — runs on own std.Thread) 368 + if (self.persist.isDomainBanned(normalized)) continue; 367 369 368 370 // insert into DB (no describeServer check — the seed relay already vetted them) 369 - _ = self.persist.getOrCreateHostEv(normalized) catch continue; 371 + _ = self.persist.getOrCreateHost(normalized) catch continue; 370 372 added += 1; 371 373 } 372 374 total += added; ··· 408 410 409 411 /// validate and add a host: format check, domain ban, describeServer, then spawn. 410 412 /// mirrors Go relay's requestCrawl → SubscribeToHost pipeline. 413 + /// uses phased approach: DB checks via DbRequestQueue, HTTP via temp thread. 411 414 fn addHost(self: *Slurper, raw_hostname: []const u8) !void { 415 + const db_queue = self.db_queue orelse return error.DbQueueNotConfigured; 416 + 412 417 // step 1: validate and normalize hostname format 413 418 // Go relay: host.go ParseHostname 414 419 const hostname = validateHostname(self.allocator, raw_hostname) catch |err| { ··· 417 422 }; 418 423 defer self.allocator.free(hostname); 419 424 420 - // step 2: domain ban check (Evented fiber — use Ev pool) 421 - // Go relay: domain_ban.go DomainIsBanned 422 - if (self.persist.isDomainBannedEv(hostname) catch true) { 423 - log.warn("host {s}: domain is banned, rejecting", .{hostname}); 424 - return; 425 - } 425 + // phase 1: DB checks via DbRequestQueue 426 + const AddHostDbReq = struct { 427 + base: event_log_mod.DbRequest = .{ .callback = &execute }, 428 + hostname: []const u8, 429 + host_id: u64 = 0, 430 + last_seq: u64 = 0, 431 + rejected: bool = false, 426 432 427 - // step 3: check if host is banned/blocked in DB (Evented pool) 428 - // Go relay: crawl.go checks host.Status == HostStatusBanned 429 - if (self.persist.isHostBannedEv(hostname) catch true) { 430 - log.warn("host {s}: banned/blocked in DB, rejecting", .{hostname}); 433 + fn execute(b: *event_log_mod.DbRequest, dp: *event_log_mod.DiskPersist) void { 434 + const s: *@This() = @fieldParentPtr("base", b); 435 + // domain ban check 436 + if (dp.isDomainBanned(s.hostname)) { 437 + s.rejected = true; 438 + return; 439 + } 440 + // host ban check 441 + if (dp.isHostBanned(s.hostname)) { 442 + s.rejected = true; 443 + return; 444 + } 445 + // get or create host 446 + const info = dp.getOrCreateHost(s.hostname) catch |e| { 447 + b.err = e; 448 + return; 449 + }; 450 + s.host_id = info.id; 451 + s.last_seq = info.last_seq; 452 + } 453 + }; 454 + var db_req: AddHostDbReq = .{ .hostname = hostname }; 455 + db_queue.push(&db_req.base); 456 + db_req.base.wait(self.io, self.shutdown); 457 + if (db_req.base.err != null) return error.DbRequestFailed; 458 + if (db_req.rejected) { 459 + log.warn("host {s}: banned/blocked, rejecting", .{hostname}); 431 460 return; 432 461 } 433 462 434 - // step 4: dedup — check if already tracked 435 - // Go relay: crawl.go CheckIfSubscribed 436 - const host_info = try self.persist.getOrCreateHostEv(hostname); 463 + // phase 2: dedup — check if already tracked (Evented, local) 437 464 { 438 465 self.workers_mutex.lockUncancelable(self.io); 439 466 defer self.workers_mutex.unlock(self.io); 440 - if (self.workers.contains(host_info.id)) { 467 + if (self.workers.contains(db_req.host_id)) { 441 468 log.debug("host {s} already has a worker, skipping", .{hostname}); 442 469 return; 443 470 } 444 471 } 445 472 446 - // step 5: describeServer liveness check 447 - // Go relay: host_checker.go CheckHost (with SSRF protection) 448 - checkHost(self.allocator, hostname, self.io) catch |err| { 449 - log.warn("host {s}: describeServer check failed: {s}", .{ hostname, @errorName(err) }); 473 + // phase 3: describeServer liveness check (outbound HTTP via temp thread) 474 + const CheckHostReq = struct { 475 + allocator: Allocator, 476 + hostname_copy: []const u8, 477 + check_io: Io, 478 + check_err: ?HostValidationError = null, 479 + done: std.atomic.Value(bool) = .{ .raw = false }, 480 + 481 + fn run(s: *@This()) void { 482 + checkHost(s.allocator, s.hostname_copy, s.check_io) catch |e| { 483 + s.check_err = e; 484 + }; 485 + s.done.store(true, .release); 486 + } 487 + }; 488 + var check_req: CheckHostReq = .{ 489 + .allocator = self.allocator, 490 + .hostname_copy = hostname, 491 + .check_io = self.pool_io, 492 + }; 493 + const check_thread = std.Thread.spawn(.{}, CheckHostReq.run, .{&check_req}) catch { 494 + log.warn("host {s}: failed to spawn check thread", .{hostname}); 450 495 return; 451 496 }; 497 + // wait for check to complete (Evented fiber yields) 498 + while (!check_req.done.load(.acquire)) { 499 + if (self.shutdown.load(.acquire)) return; 500 + self.io.sleep(Io.Duration.fromMicroseconds(100), .awake) catch { 501 + while (!check_req.done.load(.acquire)) { 502 + if (self.shutdown.load(.acquire)) return; 503 + std.atomic.spinLoopHint(); 504 + } 505 + break; 506 + }; 507 + } 508 + check_thread.join(); 509 + if (check_req.check_err) |err| { 510 + log.warn("host {s}: describeServer check failed: {s}", .{ hostname, @errorName(err) }); 511 + return; 512 + } 452 513 453 - // reset status and failure count (Evented pool) — host passed describeServer, give it a fresh start. 454 - // without this, exhausted hosts accumulate failures across requestCrawl cycles 455 - // and immediately re-exhaust on a single failure. 456 - self.persist.updateHostStatusEv(host_info.id, "active") catch {}; 457 - self.persist.resetHostFailuresEv(host_info.id) catch {}; 514 + // phase 4: reset status + failures via DbRequestQueue 515 + const ResetHostReq = struct { 516 + base: event_log_mod.DbRequest = .{ .callback = &execute }, 517 + host_id: u64, 458 518 459 - try self.spawnWorker(host_info.id, hostname, host_info.last_seq); 460 - log.info("added host {s} (id={d})", .{ hostname, host_info.id }); 519 + fn execute(b: *event_log_mod.DbRequest, dp: *event_log_mod.DiskPersist) void { 520 + const s: *@This() = @fieldParentPtr("base", b); 521 + dp.updateHostStatus(s.host_id, "active") catch {}; 522 + dp.resetHostFailures(s.host_id) catch {}; 523 + } 524 + }; 525 + var reset_req: ResetHostReq = .{ .host_id = db_req.host_id }; 526 + db_queue.push(&reset_req.base); 527 + reset_req.base.wait(self.io, self.shutdown); 528 + 529 + // phase 5: spawn worker (Evented) 530 + try self.spawnWorker(db_req.host_id, hostname, db_req.last_seq); 531 + log.info("added host {s} (id={d})", .{ hostname, db_req.host_id }); 461 532 } 462 533 463 534 /// spawn a subscriber thread for a host ··· 468 539 const sub = try self.allocator.create(subscriber_mod.Subscriber); 469 540 errdefer self.allocator.destroy(sub); 470 541 471 - const account_count: u64 = self.persist.getEffectiveAccountCountEv(host_id) catch 0; 542 + // get effective account count via DbRequestQueue 543 + var account_count: u64 = 0; 544 + if (self.db_queue) |db_queue| { 545 + const GetCountReq = struct { 546 + base: event_log_mod.DbRequest = .{ .callback = &execute }, 547 + hid: u64, 548 + count: u64 = 0, 549 + 550 + fn execute(b: *event_log_mod.DbRequest, dp: *event_log_mod.DiskPersist) void { 551 + const s: *@This() = @fieldParentPtr("base", b); 552 + s.count = dp.getEffectiveAccountCount(s.hid); 553 + } 554 + }; 555 + var count_req: GetCountReq = .{ .hid = host_id }; 556 + db_queue.push(&count_req.base); 557 + count_req.base.wait(self.io, self.shutdown); 558 + account_count = count_req.count; 559 + } 472 560 473 561 sub.* = subscriber_mod.Subscriber.init( 474 562 self.allocator, ··· 532 620 533 621 /// background fiber: load hosts from DB and spawn all workers. 534 622 /// runs in background so HTTP server + probes come up immediately. 623 + /// pullHosts runs on its own std.Thread in parallel (not gated). 535 624 /// Go relay: ResubscribeAllHosts loops with 1ms sleep per host (goroutines). 536 625 /// we batch-spawn with yields between batches to keep the event loop responsive 537 626 /// for health checks and metrics during the initial TLS handshake ramp. 538 627 fn spawnWorkers(self: *Slurper) void { 539 - // pull hosts from seed relay first — idempotent (getOrCreateHost skips existing) 540 - if (self.options.seed_host.len > 0) { 541 - log.info("pulling hosts from {s}...", .{self.options.seed_host}); 542 - self.pullHosts() catch |err| { 543 - log.warn("pullHosts from {s} failed: {s}", .{ self.options.seed_host, @errorName(err) }); 544 - }; 545 - } else { 546 - log.info("no seed host configured, skipping bootstrap", .{}); 547 - } 548 - 549 - const hosts = self.persist.listActiveHostsEv(self.allocator) catch |err| { 550 - log.err("failed to load hosts: {s}", .{@errorName(err)}); 628 + const db_queue = self.db_queue orelse { 629 + log.err("spawnWorkers: db_queue not set", .{}); 551 630 return; 552 631 }; 632 + 633 + // load active hosts via DbRequestQueue 634 + const ListActiveHostsReq = struct { 635 + base: event_log_mod.DbRequest = .{ .callback = &execute }, 636 + alloc: Allocator, 637 + result: ?[]event_log_mod.DiskPersist.Host = null, 638 + 639 + fn execute(b: *event_log_mod.DbRequest, dp: *event_log_mod.DiskPersist) void { 640 + const s: *@This() = @fieldParentPtr("base", b); 641 + s.result = dp.listActiveHosts(s.alloc) catch |e| { 642 + b.err = e; 643 + return; 644 + }; 645 + } 646 + }; 647 + var list_req: ListActiveHostsReq = .{ .alloc = self.allocator }; 648 + db_queue.push(&list_req.base); 649 + list_req.base.wait(self.io, self.shutdown); 650 + 651 + if (list_req.base.err != null or list_req.result == null) { 652 + log.err("failed to load hosts: {s}", .{if (list_req.base.err) |e| @errorName(e) else "null result"}); 653 + return; 654 + } 655 + 656 + const hosts = list_req.result.?; 553 657 defer { 554 658 for (hosts) |h| { 555 659 self.allocator.free(h.hostname);