atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

lazy-init Evented pg.Pool: defer creation until first fiber runs

pg.Pool.initUri does TCP connects via io_uring, which requires the Evented
event loop to be running. creating it during main() init (before the
scheduler starts) fails with NetworkDown — io_uring can't submit ops yet.

fix: store ev_io/db_url/pool_size config on DiskPersist, create the pool
on first use via ensureEvDb(). uses CAS-based init-once — first fiber to
call it creates the pool, concurrent fibers yield-wait via ev_io.sleep().

also changes backfiller/cleaner from storing *pg.Pool to *DiskPersist,
accessing the pool lazily via self.persist.ensureEvDb().

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz 92a15ba4 949e9a7f

+111 -52
+7 -7
src/api/xrpc.zig
··· 37 37 38 38 // query accounts with repo state, paginated by UID 39 39 // includes both local status and upstream_status for combined active check 40 - var result = persist.ev_db.?.query( 40 + var result = persist.ensureEvDb().query( 41 41 \\SELECT a.uid, a.did, a.status, a.upstream_status, COALESCE(r.rev, ''), COALESCE(r.commit_data_cid, '') 42 42 \\FROM account a LEFT JOIN account_repo r ON a.uid = r.uid 43 43 \\WHERE a.uid > $1 ORDER BY a.uid ASC LIMIT $2 ··· 122 122 } 123 123 124 124 // look up account (includes both local and upstream status) 125 - var row = (persist.ev_db.?.rowUnsafe( 125 + var row = (persist.ensureEvDb().rowUnsafe( 126 126 "SELECT a.uid, a.status, a.upstream_status, COALESCE(r.rev, '') FROM account a LEFT JOIN account_repo r ON a.uid = r.uid WHERE a.did = $1", 127 127 .{did}, 128 128 ) catch { ··· 181 181 } 182 182 183 183 // look up the PDS hostname for this account 184 - var row = (persist.ev_db.?.rowUnsafe( 184 + var row = (persist.ensureEvDb().rowUnsafe( 185 185 "SELECT h.hostname FROM account a JOIN host h ON a.host_id = h.id WHERE a.did = $1 AND a.host_id > 0", 186 186 .{did}, 187 187 ) catch { ··· 215 215 } 216 216 217 217 // look up account + repo state (includes both local and upstream status) 218 - var row = (persist.ev_db.?.rowUnsafe( 218 + var row = (persist.ensureEvDb().rowUnsafe( 219 219 "SELECT a.status, a.upstream_status, COALESCE(r.rev, ''), COALESCE(r.commit_data_cid, '') FROM account a LEFT JOIN account_repo r ON a.uid = r.uid WHERE a.did = $1", 220 220 .{did}, 221 221 ) catch { ··· 345 345 return; 346 346 } 347 347 348 - var result = persist.ev_db.?.query( 348 + var result = persist.ensureEvDb().query( 349 349 "SELECT id, hostname, status, last_seq FROM host WHERE id > $1 AND last_seq > 0 ORDER BY id ASC LIMIT $2", 350 350 .{ cursor_val, limit }, 351 351 ) catch { ··· 400 400 }; 401 401 402 402 // look up host 403 - var row = (persist.ev_db.?.rowUnsafe( 403 + var row = (persist.ensureEvDb().rowUnsafe( 404 404 "SELECT id, hostname, status, last_seq FROM host WHERE hostname = $1", 405 405 .{hostname}, 406 406 ) catch { ··· 426 426 raw_status; // active, idle pass through 427 427 428 428 // count accounts on this host 429 - const account_count: i64 = if (persist.ev_db.?.rowUnsafe( 429 + const account_count: i64 = if (persist.ensureEvDb().rowUnsafe( 430 430 "SELECT COUNT(*) FROM account WHERE host_id = $1", 431 431 .{host_id}, 432 432 ) catch null) |cnt_row| blk: {
+14 -9
src/backfill.zig
··· 12 12 const http = std.http; 13 13 const pg = @import("pg"); 14 14 const collection_index_mod = @import("collection_index.zig"); 15 + const event_log_mod = @import("event_log.zig"); 15 16 16 17 const Allocator = std.mem.Allocator; 17 18 const log = std.log.scoped(.backfill); ··· 24 25 pub const Backfiller = struct { 25 26 allocator: Allocator, 26 27 collection_index: *collection_index_mod.CollectionIndex, 27 - db: *pg.Pool, 28 + persist: *event_log_mod.DiskPersist, 28 29 running: std.atomic.Value(bool), 29 30 future: ?Io.Future(void), 30 31 source: []const u8, 31 32 io: Io, 32 33 34 + fn db(self: *Backfiller) *pg.Pool { 35 + return self.persist.ensureEvDb(); 36 + } 37 + 33 38 pub fn init( 34 39 allocator: Allocator, 35 40 collection_index: *collection_index_mod.CollectionIndex, 36 - db: *pg.Pool, 41 + persist: *event_log_mod.DiskPersist, 37 42 io: Io, 38 43 ) Backfiller { 39 44 return .{ 40 45 .allocator = allocator, 41 46 .collection_index = collection_index, 42 - .db = db, 47 + .persist = persist, 43 48 .running = .{ .raw = false }, 44 49 .future = null, 45 50 .source = "", ··· 88 93 89 94 // insert progress rows (skip existing) 90 95 for (collections) |collection| { 91 - _ = self.db.exec( 96 + _ = self.db().exec( 92 97 "INSERT INTO backfill_progress (collection, source) VALUES ($1, $2) ON CONFLICT (collection, source) DO NOTHING", 93 98 .{ collection, self.source }, 94 99 ) catch |err| { ··· 207 212 defer if (cursor) |c| self.allocator.free(c); 208 213 var imported: i64 = 0; 209 214 { 210 - var row = (self.db.rowUnsafe( 215 + var row = (self.db().rowUnsafe( 211 216 "SELECT completed_at IS NOT NULL, cursor, imported_count FROM backfill_progress WHERE collection = $1 AND source = $2", 212 217 .{ collection, self.source }, 213 218 ) catch return error.DatabaseError) orelse return; ··· 250 255 251 256 // update cursor in progress table 252 257 const new_cursor = fetch_result.next_cursor orelse ""; 253 - _ = self.db.exec( 258 + _ = self.db().exec( 254 259 "UPDATE backfill_progress SET cursor = $1, imported_count = $2 WHERE collection = $3 AND source = $4", 255 260 .{ new_cursor, imported, collection, self.source }, 256 261 ) catch {}; ··· 264 269 self.io.sleep(Io.Duration.fromMilliseconds(100), .awake) catch {}; 265 270 } else { 266 271 // no more pages — mark complete 267 - _ = self.db.exec( 272 + _ = self.db().exec( 268 273 "UPDATE backfill_progress SET completed_at = now(), cursor = '', imported_count = $1 WHERE collection = $2 AND source = $3", 269 274 .{ imported, collection, self.source }, 270 275 ) catch {}; ··· 346 351 var completed: i64 = 0; 347 352 var total_imported: i64 = 0; 348 353 { 349 - var row = (self.db.rowUnsafe( 354 + var row = (self.db().rowUnsafe( 350 355 "SELECT COUNT(*)::bigint, COUNT(completed_at)::bigint, COALESCE(SUM(imported_count), 0)::bigint FROM backfill_progress", 351 356 .{}, 352 357 ) catch null) orelse null; ··· 367 372 }) catch return error.FormatError; 368 373 369 374 // per-collection detail 370 - var result = self.db.query( 375 + var result = self.db().query( 371 376 "SELECT collection, source, cursor, imported_count, completed_at IS NOT NULL FROM backfill_progress ORDER BY collection, source", 372 377 .{}, 373 378 ) catch return error.DatabaseError;
+9 -4
src/cleaner.zig
··· 8 8 const Io = std.Io; 9 9 const pg = @import("pg"); 10 10 const collection_index_mod = @import("collection_index.zig"); 11 + const event_log_mod = @import("event_log.zig"); 11 12 12 13 const Allocator = std.mem.Allocator; 13 14 const log = std.log.scoped(.cleaner); ··· 16 17 allocator: Allocator, 17 18 io: Io, 18 19 collection_index: *collection_index_mod.CollectionIndex, 19 - db: *pg.Pool, 20 + persist: *event_log_mod.DiskPersist, 20 21 running: std.atomic.Value(bool), 21 22 future: ?Io.Future(void), 22 23 scanned: std.atomic.Value(u64), 23 24 removed: std.atomic.Value(u64), 24 25 26 + fn db(self: *Cleaner) *pg.Pool { 27 + return self.persist.ensureEvDb(); 28 + } 29 + 25 30 pub fn init( 26 31 allocator: Allocator, 27 32 io: Io, 28 33 collection_index: *collection_index_mod.CollectionIndex, 29 - db: *pg.Pool, 34 + persist: *event_log_mod.DiskPersist, 30 35 ) Cleaner { 31 36 return .{ 32 37 .allocator = allocator, 33 38 .io = io, 34 39 .collection_index = collection_index, 35 - .db = db, 40 + .persist = persist, 36 41 .running = .{ .raw = false }, 37 42 .future = null, 38 43 .scanned = .{ .raw = 0 }, ··· 69 74 while (true) { 70 75 var batch_count: u64 = 0; 71 76 { 72 - var result = self.db.query( 77 + var result = self.db().query( 73 78 "SELECT uid, did FROM account WHERE (status != 'active' OR upstream_status != 'active') AND uid > $1 ORDER BY uid LIMIT 500", 74 79 .{last_uid}, 75 80 ) catch |err| {
+73 -16
src/event_log.zig
··· 92 92 dir_path: []const u8, 93 93 dir: Io.Dir, 94 94 db: *pg.Pool, 95 - /// Evented-safe pg.Pool — created on the Evented Io backend. 96 - /// Evented callers (slurper, API handlers, broadcaster) use this pool 97 - /// for pure DB reads/writes. pool_io callers keep using self.db. 95 + /// Evented-safe pg.Pool — lazy-initialized on first use from an Evented fiber. 96 + /// pg.Pool.initUri requires the event loop to be running (TCP connect goes through 97 + /// io_uring), so we can't create it during main() init. Evented callers access it 98 + /// via ensureEvDb(). pool_io callers keep using self.db. 98 99 ev_db: ?*pg.Pool = null, 100 + ev_db_state: std.atomic.Value(u8) = .{ .raw = 0 }, 101 + /// Evented Io — set by main before any fibers run. used for lazy ev_db init. 102 + ev_io: ?Io = null, 103 + /// database URL for lazy ev_db init (stable ref to process env string) 104 + db_url: []const u8 = "", 105 + /// pool size for lazy ev_db init 106 + ev_db_pool_size: u16 = 0, 99 107 current_file: ?Io.File = null, 100 108 current_file_path: ?[]const u8 = null, 101 109 current_file_pos: u64 = 0, ··· 130 138 /// MPSC queue for cross-Io playback requests (Evented → pool_io) 131 139 playback_head: std.atomic.Value(?*PlaybackRequest) = .{ .raw = null }, 132 140 141 + const EvDbInit = enum(u8) { uninit = 0, initializing = 1, ready = 2 }; 142 + 143 + /// returns the Evented pg.Pool, lazy-initializing on first call. 144 + /// pg.Pool.initUri does TCP connects via io_uring, so it can only run inside 145 + /// an Evented fiber (after the event loop is spinning). callers from main() 146 + /// init set ev_io/db_url/ev_db_pool_size; the actual pool is created here. 147 + pub fn ensureEvDb(self: *DiskPersist) *pg.Pool { 148 + // fast path — already initialized (single atomic load) 149 + if (self.ev_db) |db| return db; 150 + 151 + const ev_io = self.ev_io orelse @panic("ensureEvDb: ev_io not set"); 152 + 153 + while (true) { 154 + const state: EvDbInit = @enumFromInt(self.ev_db_state.load(.acquire)); 155 + switch (state) { 156 + .ready => return self.ensureEvDb(), 157 + .uninit => { 158 + if (self.ev_db_state.cmpxchgWeak( 159 + @intFromEnum(EvDbInit.uninit), 160 + @intFromEnum(EvDbInit.initializing), 161 + .acquire, 162 + .monotonic, 163 + ) == null) { 164 + // won the race — create the pool 165 + const uri = std.Uri.parse(self.db_url) catch 166 + @panic("ensureEvDb: invalid DATABASE_URL"); 167 + self.ev_db = pg.Pool.initUri( 168 + self.allocator, 169 + ev_io, 170 + uri, 171 + .{ .size = self.ev_db_pool_size }, 172 + ) catch |err| { 173 + log.err("ensureEvDb: initUri failed: {s}", .{@errorName(err)}); 174 + @panic("ensureEvDb: failed to create Evented pg.Pool"); 175 + }; 176 + self.ev_db_state.store(@intFromEnum(EvDbInit.ready), .release); 177 + log.info("lazy-initialized Evented pg.Pool (size={d})", .{self.ev_db_pool_size}); 178 + return self.ensureEvDb(); 179 + } 180 + // lost CAS — another fiber is initializing, fall through 181 + }, 182 + .initializing => { 183 + // yield to let the initializing fiber complete 184 + ev_io.sleep(Io.Duration.fromMilliseconds(1), .awake) catch {}; 185 + }, 186 + } 187 + } 188 + } 189 + 133 190 /// current evtbuf entry count (for metrics — non-blocking, returns 0 if lock is contended) 134 191 pub fn evtbufLen(self: *DiskPersist) usize { 135 192 if (!self.mutex.tryLock()) return 0; ··· 443 500 444 501 /// count accounts on a host (Evented pool) 445 502 pub fn getHostAccountCountEv(self: *DiskPersist, host_id: u64) u64 { 446 - return getHostAccountCountImpl(host_id, self.ev_db.?); 503 + return getHostAccountCountImpl(host_id, self.ensureEvDb()); 447 504 } 448 505 449 506 fn getHostAccountCountImpl(host_id: u64, db: *pg.Pool) u64 { ··· 463 520 464 521 /// effective account count (Evented pool) 465 522 pub fn getEffectiveAccountCountEv(self: *DiskPersist, host_id: u64) u64 { 466 - return getEffectiveAccountCountImpl(host_id, self.ev_db.?); 523 + return getEffectiveAccountCountImpl(host_id, self.ensureEvDb()); 467 524 } 468 525 469 526 /// uses admin-configured limit if set, otherwise actual COUNT(*). ··· 484 541 485 542 /// set host account limit (Evented pool) 486 543 pub fn setHostAccountLimitEv(self: *DiskPersist, host_id: u64, limit: ?u64) !void { 487 - return setHostAccountLimitImpl(host_id, limit, self.ev_db.?); 544 + return setHostAccountLimitImpl(host_id, limit, self.ensureEvDb()); 488 545 } 489 546 490 547 /// pass null to clear the override and revert to actual COUNT(*). ··· 556 613 557 614 /// get or create a host row (Evented pool) 558 615 pub fn getOrCreateHostEv(self: *DiskPersist, hostname: []const u8) !HostResult { 559 - return getOrCreateHostImpl(hostname, self.ev_db.?); 616 + return getOrCreateHostImpl(hostname, self.ensureEvDb()); 560 617 } 561 618 562 619 fn getOrCreateHostImpl(hostname: []const u8, db: *pg.Pool) !HostResult { ··· 586 643 587 644 /// check if a host is banned or blocked by status (Evented pool) 588 645 pub fn isHostBannedEv(self: *DiskPersist, hostname: []const u8) bool { 589 - return isHostBannedImpl(hostname, self.ev_db.?); 646 + return isHostBannedImpl(hostname, self.ensureEvDb()); 590 647 } 591 648 592 649 fn isHostBannedImpl(hostname: []const u8, db: *pg.Pool) bool { ··· 617 674 618 675 /// look up host ID by hostname (Evented pool) 619 676 pub fn getHostIdForHostnameEv(self: *DiskPersist, hostname: []const u8) !?u64 { 620 - return getHostIdForHostnameImpl(hostname, self.ev_db.?); 677 + return getHostIdForHostnameImpl(hostname, self.ensureEvDb()); 621 678 } 622 679 623 680 fn getHostIdForHostnameImpl(hostname: []const u8, db: *pg.Pool) !?u64 { ··· 636 693 637 694 /// update host status (Evented pool) 638 695 pub fn updateHostStatusEv(self: *DiskPersist, host_id: u64, status: []const u8) !void { 639 - return updateHostStatusImpl(host_id, status, self.ev_db.?); 696 + return updateHostStatusImpl(host_id, status, self.ensureEvDb()); 640 697 } 641 698 642 699 fn updateHostStatusImpl(host_id: u64, status: []const u8, db: *pg.Pool) !void { ··· 653 710 654 711 /// list all active hosts (Evented pool) 655 712 pub fn listActiveHostsEv(self: *DiskPersist, allocator: Allocator) ![]Host { 656 - return listActiveHostsImpl(allocator, self.ev_db.?); 713 + return listActiveHostsImpl(allocator, self.ensureEvDb()); 657 714 } 658 715 659 716 fn listActiveHostsImpl(allocator: Allocator, db: *pg.Pool) ![]Host { ··· 693 750 694 751 /// list all hosts (Evented pool) 695 752 pub fn listAllHostsEv(self: *DiskPersist, allocator: Allocator) ![]Host { 696 - return listAllHostsImpl(allocator, self.ev_db.?); 753 + return listAllHostsImpl(allocator, self.ensureEvDb()); 697 754 } 698 755 699 756 fn listAllHostsImpl(allocator: Allocator, db: *pg.Pool) ![]Host { ··· 747 804 748 805 /// check if a hostname (or any parent domain) is banned (Evented pool). 749 806 pub fn isDomainBannedEv(self: *DiskPersist, hostname: []const u8) bool { 750 - return self.isDomainBannedImpl(hostname, self.ev_db.?); 807 + return self.isDomainBannedImpl(hostname, self.ensureEvDb()); 751 808 } 752 809 753 810 /// Go relay: domain_ban.go DomainIsBanned — suffix-based check. ··· 779 836 780 837 /// reset failure count (Evented pool) 781 838 pub fn resetHostFailuresEv(self: *DiskPersist, host_id: u64) !void { 782 - return resetHostFailuresImpl(host_id, self.ev_db.?); 839 + return resetHostFailuresImpl(host_id, self.ensureEvDb()); 783 840 } 784 841 785 842 fn resetHostFailuresImpl(host_id: u64, db: *pg.Pool) !void { ··· 792 849 /// resolve a DID to UID using the Evented pool. skips the DID cache 793 850 /// (which uses pool_io mutex). only used from admin ban (rare path). 794 851 pub fn uidForDidEv(self: *DiskPersist, did: []const u8) !u64 { 795 - const db = self.ev_db.?; 852 + const db = self.ensureEvDb(); 796 853 // check database 797 854 if (try db.rowUnsafe( 798 855 "SELECT uid FROM account WHERE did = $1", ··· 942 999 943 1000 /// oldest available sequence number (Evented pool) 944 1001 pub fn firstSeqEv(self: *DiskPersist) ?u64 { 945 - return firstSeqImpl(self.ev_db.?); 1002 + return firstSeqImpl(self.ensureEvDb()); 946 1003 } 947 1004 948 1005 fn firstSeqImpl(db: *pg.Pool) ?u64 {
+8 -16
src/main.zig
··· 227 227 dp.retention_hours = retention_hours; 228 228 dp.max_dir_bytes = max_events_gb * 1024 * 1024 * 1024; 229 229 230 - // create Evented pg.Pool — safe for use from Evented fibers (slurper, API, broadcaster). 231 - // the Threaded pool (dp.db) is only safe from pool_io threads. 232 - { 233 - const uri = std.Uri.parse(database_url) catch { 234 - log.err("failed to parse DATABASE_URL for ev_db", .{}); 235 - return error.InvalidDatabaseUrl; 236 - }; 237 - const pg_mod = @import("pg"); 238 - dp.ev_db = pg_mod.Pool.initUri(allocator, io, uri, .{ .size = db_pool_size }) catch |err| { 239 - log.err("failed to init Evented pg.Pool: {s}", .{@errorName(err)}); 240 - return err; 241 - }; 242 - } 243 - log.info("created Evented pg.Pool (size={d})", .{db_pool_size}); 230 + // configure lazy Evented pg.Pool — can't create it here because pg.Pool.initUri 231 + // does TCP connects via io_uring, which requires the event loop to be running. 232 + // the pool is created on first use from an Evented fiber (via dp.ensureEvDb()). 233 + dp.ev_io = io; 234 + dp.db_url = database_url; 235 + dp.ev_db_pool_size = db_pool_size; 244 236 245 237 if (dp.lastSeq()) |last| { 246 238 log.info("event log recovered: last_seq={d}", .{last}); ··· 263 255 264 256 // init backfiller (collection index backfill from source relay) 265 257 // uses ev_db (Evented pool) — backfiller spawns Evented fibers via io.concurrent() 266 - var backfiller = backfill_mod.Backfiller.init(allocator, &ci, dp.ev_db.?, io); 258 + var backfiller = backfill_mod.Backfiller.init(allocator, &ci, &dp, io); 267 259 268 260 // init cleaner (removes stale entries from collection index) 269 261 // uses ev_db (Evented pool) — cleaner spawns Evented fibers via io.concurrent() 270 - var cleaner = cleaner_mod.Cleaner.init(allocator, io, &ci, dp.ev_db.?); 262 + var cleaner = cleaner_mod.Cleaner.init(allocator, io, &ci, &dp); 271 263 272 264 // init resyncer (updates collection index on #sync events) 273 265 // runs entirely on pool_io (Threaded) — enqueue() is called from frame worker