atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix all cross-Io heap corruption: dual pg.Pool + Threaded service paths

the relay SIGSEGV'd at ~396 hosts during startup — DiskPersist's pg.Pool
was created on Threaded Io, but ~40 call sites across slurper, API handlers,
broadcaster, backfiller, and cleaner called it from Evented fibers, triggering
Thread.current() on a NULL threadlocal.

Part A — dual pg.Pool:
add ev_db (Evented pg.Pool) to DiskPersist. pure DB methods get *Impl(db)
internal implementations + thin *Ev wrappers. Evented callers use ev_db;
pool_io callers keep using self.db. 13 methods refactored.

Part B — Threaded service paths:
- admin ban: uidForDidEv on Evented side, takedown routed through host_ops
queue (fire-and-forget). worker executes takeDownUser + persist + broadcast
on pool_io under persist_order.
- playback: cross-Io request/reply via MPSC Treiber stack. Evented fiber
posts PlaybackRequest, pool_io worker executes playback() under mutex,
sets done atomic. fiber spin-waits on failure to prevent use-after-free
on stack-local request.

persist_order held through full persist → resequence → broadcast_queue.push()
to guarantee insertion order matches seq assignment order.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz 949e9a7f c8efaa16

+427 -147
+27 -37
src/api/admin.zig
··· 8 8 const h = @import("http.zig"); 9 9 const router = @import("router.zig"); 10 10 const websocket = @import("websocket"); 11 - const broadcaster = @import("../broadcaster.zig"); 12 11 const event_log_mod = @import("../event_log.zig"); 13 12 const backfill_mod = @import("../backfill.zig"); 14 13 const cleaner_mod = @import("../cleaner.zig"); ··· 59 58 defer parsed.deinit(); 60 59 const did = parsed.value.did; 61 60 62 - // resolve DID → UID and take down 63 - const uid = ctx.persist.uidForDid(did) catch { 61 + // resolve DID → UID via Evented pool (skips DID cache which uses pool_io mutex) 62 + const uid = ctx.persist.uidForDidEv(did) catch { 64 63 h.respondJson(conn, .internal_server_error, "{\"error\":\"failed to resolve DID\"}"); 65 - return; 66 - }; 67 - ctx.persist.takeDownUser(uid) catch { 68 - h.respondJson(conn, .internal_server_error, "{\"error\":\"takedown failed\"}"); 69 64 return; 70 65 }; 71 66 ··· 74 69 log.debug("collection removeAll after ban failed: {s}", .{@errorName(err)}); 75 70 }; 76 71 77 - // emit #account event — persist under narrow ordering lock, resequence + enqueue outside. 78 - if (buildAccountFrame(ctx.persist.allocator, did)) |frame_bytes| { 79 - while (ctx.bc.persist_order.cmpxchgWeak(0, 1, .acquire, .monotonic) != null) { 80 - std.atomic.spinLoopHint(); 81 - } 72 + // build CBOR #account frame and route takedown + persist + broadcast 73 + // through host_ops queue (pool_io thread) — fire and forget. 74 + const host_ops_mod = @import("../host_ops.zig"); 75 + var td: host_ops_mod.HostOp.Payload.Takedown = .{ .uid = uid }; 82 76 83 - if (ctx.persist.persist(.account, uid, frame_bytes)) |relay_seq| { 84 - ctx.bc.stats.relay_seq.store(relay_seq, .release); 85 - ctx.bc.persist_order.store(0, .release); 86 - 87 - const broadcast_data = broadcaster.resequenceFrame(ctx.persist.allocator, frame_bytes, relay_seq) orelse frame_bytes; 88 - const owned = ctx.persist.allocator.dupe(u8, broadcast_data) catch { 89 - log.warn("admin: failed to alloc broadcast data for {s}", .{did}); 90 - h.respondJson(conn, .ok, "{\"success\":true}"); 91 - return; 92 - }; 93 - ctx.bc.broadcast_queue.push(relay_seq, owned, &ctx.bc.stats); 94 - log.info("admin: emitted #account takedown event for {s} (seq={d})", .{ did, relay_seq }); 95 - } else |err| { 96 - ctx.bc.persist_order.store(0, .release); 97 - log.warn("admin: failed to persist #account takedown event: {s}", .{@errorName(err)}); 77 + if (buildAccountFrame(ctx.persist.allocator, did)) |frame_bytes| { 78 + defer ctx.persist.allocator.free(frame_bytes); 79 + if (frame_bytes.len <= td.frame_buf.len) { 80 + @memcpy(td.frame_buf[0..frame_bytes.len], frame_bytes); 81 + td.frame_len = @intCast(frame_bytes.len); 98 82 } 99 83 } 100 84 101 - log.info("admin: banned {s} (uid={d})", .{ did, uid }); 85 + ctx.host_ops.push(.{ 86 + .host_id = 0, // not host-specific 87 + .kind = .takedown_user, 88 + .payload = .{ .takedown = td }, 89 + }); 90 + 91 + log.info("admin: banned {s} (uid={d}), takedown enqueued", .{ did, uid }); 102 92 h.respondJson(conn, .ok, "{\"success\":true}"); 103 93 } 104 94 ··· 106 96 if (!checkAdmin(conn, headers)) return; 107 97 108 98 const persist = ctx.persist; 109 - const hosts = persist.listAllHosts(persist.allocator) catch { 99 + const hosts = persist.listAllHostsEv(persist.allocator) catch { 110 100 h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 111 101 return; 112 102 }; ··· 159 149 }; 160 150 defer parsed.deinit(); 161 151 162 - const host_info = persist.getOrCreateHost(parsed.value.hostname) catch { 152 + const host_info = persist.getOrCreateHostEv(parsed.value.hostname) catch { 163 153 h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"host lookup failed\"}"); 164 154 return; 165 155 }; 166 156 167 - persist.updateHostStatus(host_info.id, "blocked") catch { 157 + persist.updateHostStatusEv(host_info.id, "blocked") catch { 168 158 h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"status update failed\"}"); 169 159 return; 170 160 }; ··· 182 172 }; 183 173 defer parsed.deinit(); 184 174 185 - const host_info = persist.getOrCreateHost(parsed.value.hostname) catch { 175 + const host_info = persist.getOrCreateHostEv(parsed.value.hostname) catch { 186 176 h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"host lookup failed\"}"); 187 177 return; 188 178 }; 189 179 190 - persist.updateHostStatus(host_info.id, "active") catch { 180 + persist.updateHostStatusEv(host_info.id, "active") catch { 191 181 h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"status update failed\"}"); 192 182 return; 193 183 }; 194 - persist.resetHostFailures(host_info.id) catch {}; 184 + persist.resetHostFailuresEv(host_info.id) catch {}; 195 185 196 186 log.info("admin: unblocked host {s} (id={d})", .{ parsed.value.hostname, host_info.id }); 197 187 h.respondJson(conn, .ok, "{\"success\":true}"); ··· 214 204 }; 215 205 defer parsed.deinit(); 216 206 217 - const host_id = ctx.persist.getHostIdForHostname(parsed.value.host) catch { 207 + const host_id = ctx.persist.getHostIdForHostnameEv(parsed.value.host) catch { 218 208 h.respondJson(conn, .internal_server_error, "{\"error\":\"database error\"}"); 219 209 return; 220 210 } orelse { ··· 222 212 return; 223 213 }; 224 214 225 - ctx.persist.setHostAccountLimit(host_id, parsed.value.account_limit) catch { 215 + ctx.persist.setHostAccountLimitEv(host_id, parsed.value.account_limit) catch { 226 216 h.respondJson(conn, .internal_server_error, "{\"error\":\"failed to update limit\"}"); 227 217 return; 228 218 }; 229 219 230 220 // update running subscriber's rate limits immediately 231 - const effective = if (parsed.value.account_limit) |l| l else ctx.persist.getHostAccountCount(host_id); 221 + const effective = if (parsed.value.account_limit) |l| l else ctx.persist.getHostAccountCountEv(host_id); 232 222 ctx.slurper.updateHostLimits(host_id, effective); 233 223 234 224 if (parsed.value.account_limit) |limit| {
+2
src/api/router.zig
··· 14 14 const backfill_mod = @import("../backfill.zig"); 15 15 const cleaner_mod = @import("../cleaner.zig"); 16 16 const resync_mod = @import("../resync.zig"); 17 + const host_ops_mod = @import("../host_ops.zig"); 17 18 const h = @import("http.zig"); 18 19 const xrpc = @import("xrpc.zig"); 19 20 const admin = @import("admin.zig"); ··· 29 30 resyncer: *resync_mod.Resyncer, 30 31 bc: *broadcaster.Broadcaster, 31 32 validator: *validator_mod.Validator, 33 + host_ops: *host_ops_mod.HostOpsQueue, 32 34 pool_io: Io, 33 35 }; 34 36
+9 -9
src/api/xrpc.zig
··· 37 37 38 38 // query accounts with repo state, paginated by UID 39 39 // includes both local status and upstream_status for combined active check 40 - var result = persist.db.query( 40 + var result = persist.ev_db.?.query( 41 41 \\SELECT a.uid, a.did, a.status, a.upstream_status, COALESCE(r.rev, ''), COALESCE(r.commit_data_cid, '') 42 42 \\FROM account a LEFT JOIN account_repo r ON a.uid = r.uid 43 43 \\WHERE a.uid > $1 ORDER BY a.uid ASC LIMIT $2 ··· 122 122 } 123 123 124 124 // look up account (includes both local and upstream status) 125 - var row = (persist.db.rowUnsafe( 125 + var row = (persist.ev_db.?.rowUnsafe( 126 126 "SELECT a.uid, a.status, a.upstream_status, COALESCE(r.rev, '') FROM account a LEFT JOIN account_repo r ON a.uid = r.uid WHERE a.did = $1", 127 127 .{did}, 128 128 ) catch { ··· 181 181 } 182 182 183 183 // look up the PDS hostname for this account 184 - var row = (persist.db.rowUnsafe( 184 + var row = (persist.ev_db.?.rowUnsafe( 185 185 "SELECT h.hostname FROM account a JOIN host h ON a.host_id = h.id WHERE a.did = $1 AND a.host_id > 0", 186 186 .{did}, 187 187 ) catch { ··· 215 215 } 216 216 217 217 // look up account + repo state (includes both local and upstream status) 218 - var row = (persist.db.rowUnsafe( 218 + var row = (persist.ev_db.?.rowUnsafe( 219 219 "SELECT a.status, a.upstream_status, COALESCE(r.rev, ''), COALESCE(r.commit_data_cid, '') FROM account a LEFT JOIN account_repo r ON a.uid = r.uid WHERE a.did = $1", 220 220 .{did}, 221 221 ) catch { ··· 345 345 return; 346 346 } 347 347 348 - var result = persist.db.query( 348 + var result = persist.ev_db.?.query( 349 349 "SELECT id, hostname, status, last_seq FROM host WHERE id > $1 AND last_seq > 0 ORDER BY id ASC LIMIT $2", 350 350 .{ cursor_val, limit }, 351 351 ) catch { ··· 400 400 }; 401 401 402 402 // look up host 403 - var row = (persist.db.rowUnsafe( 403 + var row = (persist.ev_db.?.rowUnsafe( 404 404 "SELECT id, hostname, status, last_seq FROM host WHERE hostname = $1", 405 405 .{hostname}, 406 406 ) catch { ··· 426 426 raw_status; // active, idle pass through 427 427 428 428 // count accounts on this host 429 - const account_count: i64 = if (persist.db.rowUnsafe( 429 + const account_count: i64 = if (persist.ev_db.?.rowUnsafe( 430 430 "SELECT COUNT(*) FROM account WHERE host_id = $1", 431 431 .{host_id}, 432 432 ) catch null) |cnt_row| blk: { ··· 473 473 }; 474 474 defer slurper.allocator.free(hostname); 475 475 476 - // fast validation: domain ban check 477 - if (slurper.persist.isDomainBanned(hostname)) { 476 + // fast validation: domain ban check (Evented fiber — use Ev pool) 477 + if (slurper.persist.isDomainBannedEv(hostname)) { 478 478 log.warn("requestCrawl rejected '{s}': domain banned", .{hostname}); 479 479 h.respondJson(conn, .bad_request, "{\"error\":\"InvalidRequest\",\"message\":\"domain is banned\"}"); 480 480 return;
+29 -9
src/broadcaster.zig
··· 611 611 /// two-phase cursor replay: disk (diskpersist) first, then in-memory ring buffer. 612 612 /// the consumer is already in the live broadcast list, so frames arriving 613 613 /// during replay are buffered — no gap possible. 614 + /// 615 + /// disk playback runs on pool_io (Threaded) via request/reply — playback() 616 + /// holds the DiskPersist mutex and reads files, all of which require Threaded Io. 614 617 pub fn replayTo(self: *Broadcaster, consumer: *Consumer, cursor: u64) void { 615 - // phase 1: disk replay from diskpersist 618 + // phase 1: disk replay from diskpersist via cross-Io request/reply 616 619 if (self.persist) |dp| { 617 - var entries: std.ArrayListUnmanaged(event_log_mod.PlaybackEntry) = .empty; 618 - defer { 619 - for (entries.items) |e| self.allocator.free(e.data); 620 - entries.deinit(self.allocator); 620 + var req: event_log_mod.PlaybackRequest = .{ .since = cursor, .allocator = self.allocator }; 621 + dp.enqueuePlayback(&req); 622 + 623 + // poll until pool_io worker completes the request (yields to Evented scheduler). 624 + // SAFETY: req is stack-local — we MUST wait for the worker to finish before 625 + // returning, otherwise the stack frame unwinds while the worker still holds &req. 626 + // if sleep fails (shutdown/io error), fall back to spin-wait. the host_ops worker 627 + // drains all pending playback requests before exiting, so this is bounded. 628 + while (!req.done.load(.acquire)) { 629 + self.io.sleep(Io.Duration.fromMicroseconds(100), .awake) catch { 630 + while (!req.done.load(.acquire)) std.atomic.spinLoopHint(); 631 + break; 632 + }; 621 633 } 622 634 623 - dp.playback(cursor, self.allocator, &entries) catch |err| { 635 + if (req.err) |err| { 624 636 log.warn("disk replay failed: {s}, falling back to memory", .{@errorName(err)}); 637 + // clean up any partial entries 638 + for (req.entries.items) |e| self.allocator.free(e.data); 639 + req.entries.deinit(self.allocator); 625 640 self.replayFromMemory(consumer, cursor); 626 641 return; 627 - }; 642 + } 643 + 644 + defer { 645 + for (req.entries.items) |e| self.allocator.free(e.data); 646 + req.entries.deinit(self.allocator); 647 + } 628 648 629 649 var replayed: usize = 0; 630 650 var reseq_arena = std.heap.ArenaAllocator.init(self.allocator); 631 651 defer reseq_arena.deinit(); 632 652 633 - for (entries.items) |entry| { 653 + for (req.entries.items) |entry| { 634 654 // resequence: replace upstream seq in CBOR with relay-assigned seq 635 655 const frame_data = resequenceFrame(reseq_arena.allocator(), entry.data, entry.seq) orelse entry.data; 636 656 if (!consumer.enqueueRaw(frame_data)) { ··· 762 782 // OutdatedCursor: cursor older than oldest available — info, continue 763 783 const oldest = blk: { 764 784 if (ctx.persist) |dp| { 765 - if (dp.firstSeq()) |s| break :blk s; 785 + if (dp.firstSeqEv()) |s| break :blk s; 766 786 } 767 787 break :blk ctx.history.oldestSeq() orelse 0; 768 788 };
+214 -36
src/event_log.zig
··· 77 77 78 78 // --- disk persistence --- 79 79 80 + /// cross-Io playback request — Evented fiber posts, pool_io worker executes 81 + pub const PlaybackRequest = struct { 82 + since: u64, 83 + allocator: Allocator, 84 + entries: std.ArrayListUnmanaged(PlaybackEntry) = .empty, 85 + err: ?anyerror = null, 86 + done: std.atomic.Value(bool) = .{ .raw = false }, 87 + next: std.atomic.Value(?*PlaybackRequest) = .{ .raw = null }, 88 + }; 89 + 80 90 pub const DiskPersist = struct { 81 91 allocator: Allocator, 82 92 dir_path: []const u8, 83 93 dir: Io.Dir, 84 94 db: *pg.Pool, 95 + /// Evented-safe pg.Pool — created on the Evented Io backend. 96 + /// Evented callers (slurper, API handlers, broadcaster) use this pool 97 + /// for pure DB reads/writes. pool_io callers keep using self.db. 98 + ev_db: ?*pg.Pool = null, 85 99 current_file: ?Io.File = null, 86 100 current_file_path: ?[]const u8 = null, 87 101 current_file_pos: u64 = 0, ··· 113 127 /// read by metrics server to report health without cross-Io pg.Pool access. 114 128 last_db_success: std.atomic.Value(i64) = .{ .raw = 0 }, 115 129 130 + /// MPSC queue for cross-Io playback requests (Evented → pool_io) 131 + playback_head: std.atomic.Value(?*PlaybackRequest) = .{ .raw = null }, 132 + 116 133 /// current evtbuf entry count (for metrics — non-blocking, returns 0 if lock is contended) 117 134 pub fn evtbufLen(self: *DiskPersist) usize { 118 135 if (!self.mutex.tryLock()) return 0; ··· 276 293 if (self.current_file) |f| f.close(self.io); 277 294 if (self.current_file_path) |p| self.allocator.free(p); 278 295 self.dir.close(self.io); 296 + if (self.ev_db) |ev| ev.deinit(); 279 297 self.db.deinit(); 280 298 self.allocator.free(self.dir_path); 281 299 } ··· 418 436 return if (hid > 0) @intCast(hid) else 0; 419 437 } 420 438 421 - /// count accounts on a host (for rate limit scaling, matches Go relay's host.AccountCount) 439 + /// count accounts on a host (Threaded pool) 422 440 pub fn getHostAccountCount(self: *DiskPersist, host_id: u64) u64 { 423 - var row = (self.db.rowUnsafe( 441 + return getHostAccountCountImpl(host_id, self.db); 442 + } 443 + 444 + /// count accounts on a host (Evented pool) 445 + pub fn getHostAccountCountEv(self: *DiskPersist, host_id: u64) u64 { 446 + return getHostAccountCountImpl(host_id, self.ev_db.?); 447 + } 448 + 449 + fn getHostAccountCountImpl(host_id: u64, db: *pg.Pool) u64 { 450 + var row = (db.rowUnsafe( 424 451 "SELECT COUNT(*) FROM account WHERE host_id = $1", 425 452 .{@as(i64, @intCast(host_id))}, 426 453 ) catch return 0) orelse return 0; ··· 429 456 return if (count > 0) @intCast(count) else 0; 430 457 } 431 458 432 - /// effective account count for rate limit scaling. 433 - /// uses admin-configured limit if set, otherwise actual COUNT(*). 459 + /// effective account count (Threaded pool) 434 460 pub fn getEffectiveAccountCount(self: *DiskPersist, host_id: u64) u64 { 435 - var row = (self.db.rowUnsafe( 461 + return getEffectiveAccountCountImpl(host_id, self.db); 462 + } 463 + 464 + /// effective account count (Evented pool) 465 + pub fn getEffectiveAccountCountEv(self: *DiskPersist, host_id: u64) u64 { 466 + return getEffectiveAccountCountImpl(host_id, self.ev_db.?); 467 + } 468 + 469 + /// uses admin-configured limit if set, otherwise actual COUNT(*). 470 + fn getEffectiveAccountCountImpl(host_id: u64, db: *pg.Pool) u64 { 471 + var row = (db.rowUnsafe( 436 472 "SELECT COALESCE(h.account_limit, COUNT(a.uid)) FROM host h LEFT JOIN account a ON a.host_id = h.id WHERE h.id = $1 GROUP BY h.id", 437 473 .{@as(i64, @intCast(host_id))}, 438 474 ) catch return 0) orelse return 0; ··· 441 477 return if (count > 0) @intCast(count) else 0; 442 478 } 443 479 444 - /// set admin-configured account limit for a host (overrides COUNT(*) for rate limiting). 480 + /// set host account limit (Threaded pool) 481 + pub fn setHostAccountLimit(self: *DiskPersist, host_id: u64, limit: ?u64) !void { 482 + return setHostAccountLimitImpl(host_id, limit, self.db); 483 + } 484 + 485 + /// set host account limit (Evented pool) 486 + pub fn setHostAccountLimitEv(self: *DiskPersist, host_id: u64, limit: ?u64) !void { 487 + return setHostAccountLimitImpl(host_id, limit, self.ev_db.?); 488 + } 489 + 445 490 /// pass null to clear the override and revert to actual COUNT(*). 446 - pub fn setHostAccountLimit(self: *DiskPersist, host_id: u64, limit: ?u64) !void { 491 + fn setHostAccountLimitImpl(host_id: u64, limit: ?u64, db: *pg.Pool) !void { 447 492 if (limit) |l| { 448 493 const clamped: i64 = if (l > @as(u64, @intCast(std.math.maxInt(i64)))) std.math.maxInt(i64) else @intCast(l); 449 - _ = try self.db.exec( 494 + _ = try db.exec( 450 495 "UPDATE host SET account_limit = $2, updated_at = now() WHERE id = $1", 451 496 .{ @as(i64, @intCast(host_id)), clamped }, 452 497 ); 453 498 } else { 454 - _ = try self.db.exec( 499 + _ = try db.exec( 455 500 "UPDATE host SET account_limit = NULL, updated_at = now() WHERE id = $1", 456 501 .{@as(i64, @intCast(host_id))}, 457 502 ); ··· 502 547 account_limit: ?u64 = null, 503 548 }; 504 549 505 - /// get or create a host row. returns {id, last_seq}. 506 - pub fn getOrCreateHost(self: *DiskPersist, hostname: []const u8) !struct { id: u64, last_seq: u64 } { 507 - _ = self.db.exec( 550 + const HostResult = struct { id: u64, last_seq: u64 }; 551 + 552 + /// get or create a host row (Threaded pool) 553 + pub fn getOrCreateHost(self: *DiskPersist, hostname: []const u8) !HostResult { 554 + return getOrCreateHostImpl(hostname, self.db); 555 + } 556 + 557 + /// get or create a host row (Evented pool) 558 + pub fn getOrCreateHostEv(self: *DiskPersist, hostname: []const u8) !HostResult { 559 + return getOrCreateHostImpl(hostname, self.ev_db.?); 560 + } 561 + 562 + fn getOrCreateHostImpl(hostname: []const u8, db: *pg.Pool) !HostResult { 563 + _ = db.exec( 508 564 "INSERT INTO host (hostname) VALUES ($1) ON CONFLICT (hostname) DO NOTHING", 509 565 .{hostname}, 510 566 ) catch |err| { ··· 512 568 return err; 513 569 }; 514 570 515 - var row = try self.db.rowUnsafe( 571 + var row = try db.rowUnsafe( 516 572 "SELECT id, last_seq FROM host WHERE hostname = $1", 517 573 .{hostname}, 518 574 ) orelse return error.HostCreationFailed; ··· 523 579 }; 524 580 } 525 581 526 - /// check if a host is banned or blocked by status 582 + /// check if a host is banned or blocked by status (Threaded pool) 527 583 pub fn isHostBanned(self: *DiskPersist, hostname: []const u8) bool { 528 - var row = self.db.rowUnsafe( 584 + return isHostBannedImpl(hostname, self.db); 585 + } 586 + 587 + /// check if a host is banned or blocked by status (Evented pool) 588 + pub fn isHostBannedEv(self: *DiskPersist, hostname: []const u8) bool { 589 + return isHostBannedImpl(hostname, self.ev_db.?); 590 + } 591 + 592 + fn isHostBannedImpl(hostname: []const u8, db: *pg.Pool) bool { 593 + var row = db.rowUnsafe( 529 594 "SELECT status FROM host WHERE hostname = $1", 530 595 .{hostname}, 531 596 ) catch return false; ··· 545 610 ); 546 611 } 547 612 548 - /// look up host ID by hostname. returns null if not found. 613 + /// look up host ID by hostname (Threaded pool) 549 614 pub fn getHostIdForHostname(self: *DiskPersist, hostname: []const u8) !?u64 { 550 - var row = (try self.db.rowUnsafe( 615 + return getHostIdForHostnameImpl(hostname, self.db); 616 + } 617 + 618 + /// look up host ID by hostname (Evented pool) 619 + pub fn getHostIdForHostnameEv(self: *DiskPersist, hostname: []const u8) !?u64 { 620 + return getHostIdForHostnameImpl(hostname, self.ev_db.?); 621 + } 622 + 623 + fn getHostIdForHostnameImpl(hostname: []const u8, db: *pg.Pool) !?u64 { 624 + var row = (try db.rowUnsafe( 551 625 "SELECT id FROM host WHERE hostname = $1", 552 626 .{hostname}, 553 627 )) orelse return null; ··· 555 629 return @intCast(row.get(i64, 0)); 556 630 } 557 631 558 - /// update host status (active, blocked, exhausted) 632 + /// update host status (Threaded pool) 559 633 pub fn updateHostStatus(self: *DiskPersist, host_id: u64, status: []const u8) !void { 560 - _ = try self.db.exec( 634 + return updateHostStatusImpl(host_id, status, self.db); 635 + } 636 + 637 + /// update host status (Evented pool) 638 + pub fn updateHostStatusEv(self: *DiskPersist, host_id: u64, status: []const u8) !void { 639 + return updateHostStatusImpl(host_id, status, self.ev_db.?); 640 + } 641 + 642 + fn updateHostStatusImpl(host_id: u64, status: []const u8, db: *pg.Pool) !void { 643 + _ = try db.exec( 561 644 "UPDATE host SET status = $2, updated_at = now() WHERE id = $1", 562 645 .{ @as(i64, @intCast(host_id)), status }, 563 646 ); 564 647 } 565 648 566 - /// list all active hosts 649 + /// list all active hosts (Threaded pool) 567 650 pub fn listActiveHosts(self: *DiskPersist, allocator: Allocator) ![]Host { 651 + return listActiveHostsImpl(allocator, self.db); 652 + } 653 + 654 + /// list all active hosts (Evented pool) 655 + pub fn listActiveHostsEv(self: *DiskPersist, allocator: Allocator) ![]Host { 656 + return listActiveHostsImpl(allocator, self.ev_db.?); 657 + } 658 + 659 + fn listActiveHostsImpl(allocator: Allocator, db: *pg.Pool) ![]Host { 568 660 var hosts: std.ArrayListUnmanaged(Host) = .empty; 569 661 errdefer { 570 - for (hosts.items) |h| { 571 - allocator.free(h.hostname); 572 - allocator.free(h.status); 662 + for (hosts.items) |host| { 663 + allocator.free(host.hostname); 664 + allocator.free(host.status); 573 665 } 574 666 hosts.deinit(allocator); 575 667 } 576 668 577 - var result = try self.db.query( 669 + var result = try db.query( 578 670 "SELECT id, hostname, status, last_seq, failed_attempts, account_limit FROM host WHERE status = 'active' ORDER BY id ASC", 579 671 .{}, 580 672 ); ··· 594 686 return try hosts.toOwnedSlice(allocator); 595 687 } 596 688 597 - /// list all hosts (any status) for admin view 689 + /// list all hosts (Threaded pool) 598 690 pub fn listAllHosts(self: *DiskPersist, allocator: Allocator) ![]Host { 691 + return listAllHostsImpl(allocator, self.db); 692 + } 693 + 694 + /// list all hosts (Evented pool) 695 + pub fn listAllHostsEv(self: *DiskPersist, allocator: Allocator) ![]Host { 696 + return listAllHostsImpl(allocator, self.ev_db.?); 697 + } 698 + 699 + fn listAllHostsImpl(allocator: Allocator, db: *pg.Pool) ![]Host { 599 700 var hosts: std.ArrayListUnmanaged(Host) = .empty; 600 701 errdefer { 601 - for (hosts.items) |h| { 602 - allocator.free(h.hostname); 603 - allocator.free(h.status); 702 + for (hosts.items) |host| { 703 + allocator.free(host.hostname); 704 + allocator.free(host.status); 604 705 } 605 706 hosts.deinit(allocator); 606 707 } 607 708 608 - var result = try self.db.query( 709 + var result = try db.query( 609 710 "SELECT id, hostname, status, last_seq, failed_attempts, account_limit FROM host ORDER BY id ASC", 610 711 .{}, 611 712 ); ··· 639 740 return @intCast(row.get(i32, 0)); 640 741 } 641 742 642 - /// check if a hostname (or any parent domain) is banned. 643 - /// Go relay: domain_ban.go DomainIsBanned — suffix-based check. 743 + /// check if a hostname (or any parent domain) is banned (Threaded pool). 644 744 pub fn isDomainBanned(self: *DiskPersist, hostname: []const u8) bool { 745 + return self.isDomainBannedImpl(hostname, self.db); 746 + } 747 + 748 + /// check if a hostname (or any parent domain) is banned (Evented pool). 749 + pub fn isDomainBannedEv(self: *DiskPersist, hostname: []const u8) bool { 750 + return self.isDomainBannedImpl(hostname, self.ev_db.?); 751 + } 752 + 753 + /// Go relay: domain_ban.go DomainIsBanned — suffix-based check. 754 + fn isDomainBannedImpl(_: *DiskPersist, hostname: []const u8, db: *pg.Pool) bool { 645 755 // check each suffix: "pds.host.example.com", "host.example.com", "example.com" 646 756 var offset: usize = 0; 647 757 while (offset < hostname.len) { 648 758 const suffix = hostname[offset..]; 649 - var row = self.db.rowUnsafe( 759 + var row = db.rowUnsafe( 650 760 "SELECT 1 FROM domain_ban WHERE domain = $1", 651 761 .{suffix}, 652 762 ) catch return false; ··· 662 772 return false; 663 773 } 664 774 665 - /// reset failure count (on successful connection) 775 + /// reset failure count (Threaded pool) 666 776 pub fn resetHostFailures(self: *DiskPersist, host_id: u64) !void { 667 - _ = try self.db.exec( 777 + return resetHostFailuresImpl(host_id, self.db); 778 + } 779 + 780 + /// reset failure count (Evented pool) 781 + pub fn resetHostFailuresEv(self: *DiskPersist, host_id: u64) !void { 782 + return resetHostFailuresImpl(host_id, self.ev_db.?); 783 + } 784 + 785 + fn resetHostFailuresImpl(host_id: u64, db: *pg.Pool) !void { 786 + _ = try db.exec( 668 787 "UPDATE host SET failed_attempts = 0, updated_at = now() WHERE id = $1", 669 788 .{@as(i64, @intCast(host_id))}, 670 789 ); 671 790 } 672 791 792 + /// resolve a DID to UID using the Evented pool. skips the DID cache 793 + /// (which uses pool_io mutex). only used from admin ban (rare path). 794 + pub fn uidForDidEv(self: *DiskPersist, did: []const u8) !u64 { 795 + const db = self.ev_db.?; 796 + // check database 797 + if (try db.rowUnsafe( 798 + "SELECT uid FROM account WHERE did = $1", 799 + .{did}, 800 + )) |row| { 801 + var r = row; 802 + defer r.deinit() catch {}; 803 + return @intCast(r.get(i64, 0)); 804 + } 805 + 806 + // create new account row 807 + _ = db.exec( 808 + "INSERT INTO account (did) VALUES ($1) ON CONFLICT (did) DO NOTHING", 809 + .{did}, 810 + ) catch |err| { 811 + log.warn("failed to create account for {s}: {s}", .{ did, @errorName(err) }); 812 + return err; 813 + }; 814 + 815 + // read back the UID 816 + var row = try db.rowUnsafe( 817 + "SELECT uid FROM account WHERE did = $1", 818 + .{did}, 819 + ) orelse return error.AccountCreationFailed; 820 + defer row.deinit() catch {}; 821 + return @intCast(row.get(i64, 0)); 822 + } 823 + 824 + /// enqueue a playback request for the pool_io worker to execute. 825 + /// uses MPSC push (lock-free Treiber stack). 826 + pub fn enqueuePlayback(self: *DiskPersist, req: *PlaybackRequest) void { 827 + while (true) { 828 + const head = self.playback_head.load(.acquire); 829 + req.next.store(head, .monotonic); 830 + if (self.playback_head.cmpxchgWeak(@as(?*PlaybackRequest, head), req, .release, .monotonic) == null) { 831 + return; 832 + } 833 + } 834 + } 835 + 836 + /// pop all pending playback requests (single-consumer — pool_io worker only). 837 + /// returns a singly-linked list via .next pointers, or null if empty. 838 + pub fn popPlaybackBatch(self: *DiskPersist) ?*PlaybackRequest { 839 + return self.playback_head.swap(null, .acq_rel); 840 + } 841 + 673 842 /// persist an event. assigns a sequence number. returns the assigned seq. 674 843 /// the event is buffered and will be flushed to disk asynchronously. 675 844 pub fn persist(self: *DiskPersist, kind: EvtKind, uid: u64, payload: []const u8) !u64 { ··· 766 935 return self.cur_seq - 1; 767 936 } 768 937 769 - /// oldest available sequence number on disk, or null if no log files exist 938 + /// oldest available sequence number (Threaded pool) 770 939 pub fn firstSeq(self: *DiskPersist) ?u64 { 771 - if (self.db.rowUnsafe( 940 + return firstSeqImpl(self.db); 941 + } 942 + 943 + /// oldest available sequence number (Evented pool) 944 + pub fn firstSeqEv(self: *DiskPersist) ?u64 { 945 + return firstSeqImpl(self.ev_db.?); 946 + } 947 + 948 + fn firstSeqImpl(db: *pg.Pool) ?u64 { 949 + if (db.rowUnsafe( 772 950 "SELECT seq_start FROM log_file_refs ORDER BY seq_start ASC LIMIT 1", 773 951 .{}, 774 952 ) catch null) |row| {
+19 -20
src/frame_worker.zig
··· 273 273 else 274 274 .identity; 275 275 276 - // persist under narrow ordering lock (seq assignment only), then 277 - // resequence + enqueue outside the lock. slight out-of-order in the 278 - // ring is fine — seq is embedded in frame data and consumers track by seq. 276 + // persist + resequence + enqueue under ordering lock to guarantee 277 + // broadcast_queue insertion order matches seq assignment order. 279 278 if (work.persist) |dp| { 280 - const relay_seq = blk: { 281 - var spins: u64 = 0; 282 - while (work.bc.persist_order.cmpxchgWeak(0, 1, .acquire, .monotonic) != null) { 283 - spins += 1; 284 - std.atomic.spinLoopHint(); 285 - } 286 - if (spins > 0) { 287 - _ = work.bc.stats.persist_order_spins.fetchAdd(spins, .monotonic); 288 - } 279 + var spins: u64 = 0; 280 + while (work.bc.persist_order.cmpxchgWeak(0, 1, .acquire, .monotonic) != null) { 281 + spins += 1; 282 + std.atomic.spinLoopHint(); 283 + } 284 + if (spins > 0) { 285 + _ = work.bc.stats.persist_order_spins.fetchAdd(spins, .monotonic); 286 + } 289 287 290 - const seq = dp.persist(kind, uid, data) catch |err| { 291 - work.bc.persist_order.store(0, .release); 292 - log.warn("persist failed: {s}", .{@errorName(err)}); 293 - return; 294 - }; 295 - work.bc.stats.relay_seq.store(seq, .release); 288 + const relay_seq = dp.persist(kind, uid, data) catch |err| { 296 289 work.bc.persist_order.store(0, .release); 297 - break :blk seq; 290 + log.warn("persist failed: {s}", .{@errorName(err)}); 291 + return; 298 292 }; 293 + work.bc.stats.relay_seq.store(relay_seq, .release); 299 294 300 295 const broadcast_data = broadcaster.resequenceFrame(alloc, data, relay_seq) orelse data; 301 - const owned = work.allocator.dupe(u8, broadcast_data) catch return; 296 + const owned = work.allocator.dupe(u8, broadcast_data) catch { 297 + work.bc.persist_order.store(0, .release); 298 + return; 299 + }; 302 300 work.bc.broadcast_queue.push(relay_seq, owned, &work.bc.stats); 301 + work.bc.persist_order.store(0, .release); 303 302 304 303 // update per-DID state outside the ordering lock (Postgres round-trip) 305 304 if ((is_commit or is_sync) and uid > 0) {
+74 -2
src/host_ops.zig
··· 13 13 14 14 const std = @import("std"); 15 15 const event_log_mod = @import("event_log.zig"); 16 + const broadcaster = @import("broadcaster.zig"); 16 17 17 18 const Io = std.Io; 18 19 const log = std.log.scoped(.relay); ··· 132 133 increment_failures, 133 134 reset_failures, 134 135 update_status, 136 + takedown_user, 135 137 }; 136 138 137 139 pub const Payload = union { ··· 139 141 host_shutdown: *std.atomic.Value(bool), 140 142 none: void, 141 143 status: Status, 144 + takedown: Takedown, 142 145 143 146 pub const Status = struct { 144 147 buf: [16]u8 = .{0} ** 16, ··· 156 159 return self.buf[0..self.len]; 157 160 } 158 161 }; 162 + 163 + /// inline buffer for takedown — #account CBOR frame is <200 bytes 164 + pub const Takedown = struct { 165 + uid: u64, 166 + frame_buf: [256]u8 = .{0} ** 256, 167 + frame_len: u16 = 0, 168 + 169 + pub fn frameSlice(self: *const Takedown) []const u8 { 170 + return self.frame_buf[0..self.frame_len]; 171 + } 172 + }; 159 173 }; 160 174 }; 161 175 ··· 172 186 cursor_map: *CursorMap, 173 187 shutdown: *std.atomic.Value(bool), 174 188 max_consecutive_failures: u32, 189 + bc: ?*broadcaster.Broadcaster = null, 175 190 176 191 /// push a rare op (called from any Io context). spins until space is available. 177 192 /// only used for failures/status — cursors go through CursorMap. ··· 209 224 } 210 225 211 226 /// worker thread entry point. runs on pool_io (Threaded). 212 - /// drains rare ops immediately, sweeps cursor map every 5s. 227 + /// drains rare ops + playback requests immediately, sweeps cursor map every 5s. 213 228 pub fn run(self: *HostOpsQueue, pool_io: Io) void { 214 229 var last_cursor_flush: i64 = timestamp(pool_io); 215 230 216 231 while (!self.shutdown.load(.acquire)) { 217 - // drain rare ops (failures, status) 232 + // drain rare ops (failures, status, takedowns) 218 233 var drained: u32 = 0; 219 234 while (self.pop()) |op| { 220 235 self.execute(op); 221 236 drained += 1; 222 237 } 223 238 239 + // drain playback requests (cross-Io: Evented fibers post, we execute) 240 + drained += self.drainPlaybackRequests(); 241 + 224 242 // periodic cursor sweep 225 243 const now = timestamp(pool_io); 226 244 if (now - last_cursor_flush >= CURSOR_FLUSH_INTERVAL_SEC) { ··· 239 257 while (self.pop()) |op| { 240 258 self.execute(op); 241 259 } 260 + _ = self.drainPlaybackRequests(); 261 + } 262 + 263 + /// drain all pending playback requests from the MPSC queue 264 + fn drainPlaybackRequests(self: *HostOpsQueue) u32 { 265 + var maybe_batch = self.persist.popPlaybackBatch(); 266 + var count: u32 = 0; 267 + // Treiber stack pops in LIFO order — fine for playback (each request is independent) 268 + while (maybe_batch) |req| { 269 + maybe_batch = req.next.load(.acquire); 270 + self.persist.playback(req.since, req.allocator, &req.entries) catch |e| { 271 + req.err = e; 272 + }; 273 + req.done.store(true, .release); 274 + count += 1; 275 + } 276 + return count; 242 277 } 243 278 244 279 fn execute(self: *HostOpsQueue, op: HostOp) void { ··· 261 296 log.debug("host_ops: update status failed for host_id={d}: {s}", .{ op.host_id, @errorName(err) }); 262 297 }; 263 298 }, 299 + .takedown_user => { 300 + self.executeTakedown(op.payload.takedown); 301 + }, 302 + } 303 + } 304 + 305 + /// execute takedown on pool_io: takeDownUser + persist + broadcast 306 + fn executeTakedown(self: *HostOpsQueue, td: HostOp.Payload.Takedown) void { 307 + self.persist.takeDownUser(td.uid) catch |err| { 308 + log.warn("host_ops: takedown failed for uid={d}: {s}", .{ td.uid, @errorName(err) }); 309 + return; 310 + }; 311 + 312 + if (td.frame_len == 0) return; 313 + const frame_bytes = td.frameSlice(); 314 + const bc = self.bc orelse return; 315 + 316 + // persist the #account event under ordering lock 317 + while (bc.persist_order.cmpxchgWeak(0, 1, .acquire, .monotonic) != null) { 318 + std.atomic.spinLoopHint(); 319 + } 320 + 321 + if (self.persist.persist(.account, td.uid, frame_bytes)) |relay_seq| { 322 + bc.stats.relay_seq.store(relay_seq, .release); 323 + 324 + const broadcast_data = broadcaster.resequenceFrame(self.persist.allocator, frame_bytes, relay_seq) orelse frame_bytes; 325 + const owned = self.persist.allocator.dupe(u8, broadcast_data) catch { 326 + bc.persist_order.store(0, .release); 327 + log.warn("host_ops: failed to alloc broadcast data for takedown uid={d}", .{td.uid}); 328 + return; 329 + }; 330 + bc.broadcast_queue.push(relay_seq, owned, &bc.stats); 331 + bc.persist_order.store(0, .release); 332 + log.info("host_ops: emitted #account takedown for uid={d} (seq={d})", .{ td.uid, relay_seq }); 333 + } else |err| { 334 + bc.persist_order.store(0, .release); 335 + log.warn("host_ops: persist #account takedown failed: {s}", .{@errorName(err)}); 264 336 } 265 337 } 266 338
+21 -2
src/main.zig
··· 227 227 dp.retention_hours = retention_hours; 228 228 dp.max_dir_bytes = max_events_gb * 1024 * 1024 * 1024; 229 229 230 + // create Evented pg.Pool — safe for use from Evented fibers (slurper, API, broadcaster). 231 + // the Threaded pool (dp.db) is only safe from pool_io threads. 232 + { 233 + const uri = std.Uri.parse(database_url) catch { 234 + log.err("failed to parse DATABASE_URL for ev_db", .{}); 235 + return error.InvalidDatabaseUrl; 236 + }; 237 + const pg_mod = @import("pg"); 238 + dp.ev_db = pg_mod.Pool.initUri(allocator, io, uri, .{ .size = db_pool_size }) catch |err| { 239 + log.err("failed to init Evented pg.Pool: {s}", .{@errorName(err)}); 240 + return err; 241 + }; 242 + } 243 + log.info("created Evented pg.Pool (size={d})", .{db_pool_size}); 244 + 230 245 if (dp.lastSeq()) |last| { 231 246 log.info("event log recovered: last_seq={d}", .{last}); 232 247 } ··· 247 262 defer ci.deinit(); 248 263 249 264 // init backfiller (collection index backfill from source relay) 250 - var backfiller = backfill_mod.Backfiller.init(allocator, &ci, dp.db, io); 265 + // uses ev_db (Evented pool) — backfiller spawns Evented fibers via io.concurrent() 266 + var backfiller = backfill_mod.Backfiller.init(allocator, &ci, dp.ev_db.?, io); 251 267 252 268 // init cleaner (removes stale entries from collection index) 253 - var cleaner = cleaner_mod.Cleaner.init(allocator, io, &ci, dp.db); 269 + // uses ev_db (Evented pool) — cleaner spawns Evented fibers via io.concurrent() 270 + var cleaner = cleaner_mod.Cleaner.init(allocator, io, &ci, dp.ev_db.?); 254 271 255 272 // init resyncer (updates collection index on #sync events) 256 273 // runs entirely on pool_io (Threaded) — enqueue() is called from frame worker ··· 271 288 .cursor_map = &cursor_map, 272 289 .shutdown = &shutdown_flag, 273 290 .max_consecutive_failures = 15, 291 + .bc = &bc, 274 292 }; 275 293 const host_ops_thread = std.Thread.spawn(.{}, host_ops_mod.HostOpsQueue.run, .{ &host_ops_queue, pool_io }) catch |err| { 276 294 log.err("failed to start host ops thread: {s}", .{@errorName(err)}); ··· 328 346 .resyncer = &resyncer, 329 347 .bc = &bc, 330 348 .validator = &val, 349 + .host_ops = &host_ops_queue, 331 350 .pool_io = pool_io, 332 351 }; 333 352 bc.http_fallback = api.handleHttpRequest;
+13 -13
src/slurper.zig
··· 362 362 const normalized = validateHostname(self.allocator, host.hostname) catch continue; 363 363 defer self.allocator.free(normalized); 364 364 365 - // skip banned domains 366 - if (self.persist.isDomainBanned(normalized)) continue; 365 + // skip banned domains (Evented fiber — use Ev pool) 366 + if (self.persist.isDomainBannedEv(normalized)) continue; 367 367 368 368 // insert into DB (no describeServer check — the seed relay already vetted them) 369 - _ = self.persist.getOrCreateHost(normalized) catch continue; 369 + _ = self.persist.getOrCreateHostEv(normalized) catch continue; 370 370 added += 1; 371 371 } 372 372 total += added; ··· 417 417 }; 418 418 defer self.allocator.free(hostname); 419 419 420 - // step 2: domain ban check (suffix-based) 420 + // step 2: domain ban check (Evented fiber — use Ev pool) 421 421 // Go relay: domain_ban.go DomainIsBanned 422 - if (self.persist.isDomainBanned(hostname)) { 422 + if (self.persist.isDomainBannedEv(hostname)) { 423 423 log.warn("host {s}: domain is banned, rejecting", .{hostname}); 424 424 return; 425 425 } 426 426 427 - // step 3: check if host is banned/blocked in DB 427 + // step 3: check if host is banned/blocked in DB (Evented pool) 428 428 // Go relay: crawl.go checks host.Status == HostStatusBanned 429 - if (self.persist.isHostBanned(hostname)) { 429 + if (self.persist.isHostBannedEv(hostname)) { 430 430 log.warn("host {s}: banned/blocked in DB, rejecting", .{hostname}); 431 431 return; 432 432 } 433 433 434 434 // step 4: dedup — check if already tracked 435 435 // Go relay: crawl.go CheckIfSubscribed 436 - const host_info = try self.persist.getOrCreateHost(hostname); 436 + const host_info = try self.persist.getOrCreateHostEv(hostname); 437 437 { 438 438 self.workers_mutex.lockUncancelable(self.io); 439 439 defer self.workers_mutex.unlock(self.io); ··· 450 450 return; 451 451 }; 452 452 453 - // reset status and failure count — host passed describeServer, give it a fresh start. 453 + // reset status and failure count (Evented pool) — host passed describeServer, give it a fresh start. 454 454 // without this, exhausted hosts accumulate failures across requestCrawl cycles 455 455 // and immediately re-exhaust on a single failure. 456 - self.persist.updateHostStatus(host_info.id, "active") catch {}; 457 - self.persist.resetHostFailures(host_info.id) catch {}; 456 + self.persist.updateHostStatusEv(host_info.id, "active") catch {}; 457 + self.persist.resetHostFailuresEv(host_info.id) catch {}; 458 458 459 459 try self.spawnWorker(host_info.id, hostname, host_info.last_seq); 460 460 log.info("added host {s} (id={d})", .{ hostname, host_info.id }); ··· 468 468 const sub = try self.allocator.create(subscriber_mod.Subscriber); 469 469 errdefer self.allocator.destroy(sub); 470 470 471 - const account_count: u64 = self.persist.getEffectiveAccountCount(host_id); 471 + const account_count: u64 = self.persist.getEffectiveAccountCountEv(host_id); 472 472 473 473 sub.* = subscriber_mod.Subscriber.init( 474 474 self.allocator, ··· 546 546 log.info("no seed host configured, skipping bootstrap", .{}); 547 547 } 548 548 549 - const hosts = self.persist.listActiveHosts(self.allocator) catch |err| { 549 + const hosts = self.persist.listActiveHostsEv(self.allocator) catch |err| { 550 550 log.err("failed to load hosts: {s}", .{@errorName(err)}); 551 551 return; 552 552 };
+19 -19
src/subscriber.zig
··· 709 709 else // is_identity (unknown types already filtered above) 710 710 .identity; 711 711 712 - // persist under narrow ordering lock (seq assignment only), then 713 - // resequence + enqueue outside the lock. 712 + // persist + resequence + enqueue under ordering lock to guarantee 713 + // broadcast_queue insertion order matches seq assignment order. 714 714 if (sub.persist) |dp| { 715 - const relay_seq = blk: { 716 - var spins: u64 = 0; 717 - while (sub.bc.persist_order.cmpxchgWeak(0, 1, .acquire, .monotonic) != null) { 718 - spins += 1; 719 - std.atomic.spinLoopHint(); 720 - } 721 - if (spins > 0) { 722 - _ = sub.bc.stats.persist_order_spins.fetchAdd(spins, .monotonic); 723 - } 715 + var spins: u64 = 0; 716 + while (sub.bc.persist_order.cmpxchgWeak(0, 1, .acquire, .monotonic) != null) { 717 + spins += 1; 718 + std.atomic.spinLoopHint(); 719 + } 720 + if (spins > 0) { 721 + _ = sub.bc.stats.persist_order_spins.fetchAdd(spins, .monotonic); 722 + } 724 723 725 - const seq = dp.persist(kind, uid, data) catch |err| { 726 - sub.bc.persist_order.store(0, .release); 727 - log.warn("persist failed: {s}", .{@errorName(err)}); 728 - return; 729 - }; 730 - sub.bc.stats.relay_seq.store(seq, .release); 724 + const relay_seq = dp.persist(kind, uid, data) catch |err| { 731 725 sub.bc.persist_order.store(0, .release); 732 - break :blk seq; 726 + log.warn("persist failed: {s}", .{@errorName(err)}); 727 + return; 733 728 }; 729 + sub.bc.stats.relay_seq.store(relay_seq, .release); 734 730 735 731 const broadcast_data = broadcaster.resequenceFrame(alloc, data, relay_seq) orelse data; 736 - const owned = sub.allocator.dupe(u8, broadcast_data) catch return; 732 + const owned = sub.allocator.dupe(u8, broadcast_data) catch { 733 + sub.bc.persist_order.store(0, .release); 734 + return; 735 + }; 737 736 sub.bc.broadcast_queue.push(relay_seq, owned, &sub.bc.stats); 737 + sub.bc.persist_order.store(0, .release); 738 738 739 739 // update per-DID state outside the ordering lock (Postgres round-trip) 740 740 if ((is_commit or is_sync) and uid > 0) {