atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

make ensureEvDb() fallible: recover from transient init failures

ensureEvDb() was @panic on initUri failure, meaning a transient postgres
hiccup during lazy init crashes the entire relay. now returns !*pg.Pool
and resets state to uninit on failure so the next call retries.

callers handle the error gracefully:
- xrpc handlers: respond 503 ServiceUnavailable
- slurper: skip host on db error (safe default)
- broadcaster firstSeq: fall back to memory history
- admin: use 0 for account count on error
- backfiller/cleaner: log and bail from current run

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz e9c7b961 92a15ba4

+94 -47
+1 -1
src/api/admin.zig
··· 218 218 }; 219 219 220 220 // update running subscriber's rate limits immediately 221 - const effective = if (parsed.value.account_limit) |l| l else ctx.persist.getHostAccountCountEv(host_id); 221 + const effective = if (parsed.value.account_limit) |l| l else ctx.persist.getHostAccountCountEv(host_id) catch 0; 222 222 ctx.slurper.updateHostLimits(host_id, effective); 223 223 224 224 if (parsed.value.account_limit) |limit| {
+38 -8
src/api/xrpc.zig
··· 35 35 return; 36 36 } 37 37 38 + const ev_db = persist.ensureEvDb() catch { 39 + h.respondJson(conn, .service_unavailable, "{\"error\":\"ServiceUnavailable\",\"message\":\"database unavailable\"}"); 40 + return; 41 + }; 42 + 38 43 // query accounts with repo state, paginated by UID 39 44 // includes both local status and upstream_status for combined active check 40 - var result = persist.ensureEvDb().query( 45 + var result = ev_db.query( 41 46 \\SELECT a.uid, a.did, a.status, a.upstream_status, COALESCE(r.rev, ''), COALESCE(r.commit_data_cid, '') 42 47 \\FROM account a LEFT JOIN account_repo r ON a.uid = r.uid 43 48 \\WHERE a.uid > $1 ORDER BY a.uid ASC LIMIT $2 ··· 120 125 h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid DID\"}"); 121 126 return; 122 127 } 128 + 129 + const ev_db = persist.ensureEvDb() catch { 130 + h.respondJson(conn, .service_unavailable, "{\"error\":\"ServiceUnavailable\",\"message\":\"database unavailable\"}"); 131 + return; 132 + }; 123 133 124 134 // look up account (includes both local and upstream status) 125 - var row = (persist.ensureEvDb().rowUnsafe( 135 + var row = (ev_db.rowUnsafe( 126 136 "SELECT a.uid, a.status, a.upstream_status, COALESCE(r.rev, '') FROM account a LEFT JOIN account_repo r ON a.uid = r.uid WHERE a.did = $1", 127 137 .{did}, 128 138 ) catch { ··· 180 190 return; 181 191 } 182 192 193 + const ev_db = persist.ensureEvDb() catch { 194 + h.respondJson(conn, .service_unavailable, "{\"error\":\"ServiceUnavailable\",\"message\":\"database unavailable\"}"); 195 + return; 196 + }; 197 + 183 198 // look up the PDS hostname for this account 184 - var row = (persist.ensureEvDb().rowUnsafe( 199 + var row = (ev_db.rowUnsafe( 185 200 "SELECT h.hostname FROM account a JOIN host h ON a.host_id = h.id WHERE a.did = $1 AND a.host_id > 0", 186 201 .{did}, 187 202 ) catch { ··· 214 229 return; 215 230 } 216 231 232 + const ev_db = persist.ensureEvDb() catch { 233 + h.respondJson(conn, .service_unavailable, "{\"error\":\"ServiceUnavailable\",\"message\":\"database unavailable\"}"); 234 + return; 235 + }; 236 + 217 237 // look up account + repo state (includes both local and upstream status) 218 - var row = (persist.ensureEvDb().rowUnsafe( 238 + var row = (ev_db.rowUnsafe( 219 239 "SELECT a.status, a.upstream_status, COALESCE(r.rev, ''), COALESCE(r.commit_data_cid, '') FROM account a LEFT JOIN account_repo r ON a.uid = r.uid WHERE a.did = $1", 220 240 .{did}, 221 241 ) catch { ··· 345 365 return; 346 366 } 347 367 348 - var result = persist.ensureEvDb().query( 368 + const ev_db = persist.ensureEvDb() catch { 369 + h.respondJson(conn, .service_unavailable, "{\"error\":\"ServiceUnavailable\",\"message\":\"database unavailable\"}"); 370 + return; 371 + }; 372 + 373 + var result = ev_db.query( 349 374 "SELECT id, hostname, status, last_seq FROM host WHERE id > $1 AND last_seq > 0 ORDER BY id ASC LIMIT $2", 350 375 .{ cursor_val, limit }, 351 376 ) catch { ··· 399 424 return; 400 425 }; 401 426 427 + const ev_db = persist.ensureEvDb() catch { 428 + h.respondJson(conn, .service_unavailable, "{\"error\":\"ServiceUnavailable\",\"message\":\"database unavailable\"}"); 429 + return; 430 + }; 431 + 402 432 // look up host 403 - var row = (persist.ensureEvDb().rowUnsafe( 433 + var row = (ev_db.rowUnsafe( 404 434 "SELECT id, hostname, status, last_seq FROM host WHERE hostname = $1", 405 435 .{hostname}, 406 436 ) catch { ··· 426 456 raw_status; // active, idle pass through 427 457 428 458 // count accounts on this host 429 - const account_count: i64 = if (persist.ensureEvDb().rowUnsafe( 459 + const account_count: i64 = if (ev_db.rowUnsafe( 430 460 "SELECT COUNT(*) FROM account WHERE host_id = $1", 431 461 .{host_id}, 432 462 ) catch null) |cnt_row| blk: { ··· 474 504 defer slurper.allocator.free(hostname); 475 505 476 506 // fast validation: domain ban check (Evented fiber — use Ev pool) 477 - if (slurper.persist.isDomainBannedEv(hostname)) { 507 + if (slurper.persist.isDomainBannedEv(hostname) catch false) { 478 508 log.warn("requestCrawl rejected '{s}': domain banned", .{hostname}); 479 509 h.respondJson(conn, .bad_request, "{\"error\":\"InvalidRequest\",\"message\":\"domain is banned\"}"); 480 510 return;
+16 -7
src/backfill.zig
··· 31 31 source: []const u8, 32 32 io: Io, 33 33 34 - fn db(self: *Backfiller) *pg.Pool { 34 + fn db(self: *Backfiller) !*pg.Pool { 35 35 return self.persist.ensureEvDb(); 36 36 } 37 37 ··· 79 79 self.running.store(false, .release); 80 80 } 81 81 82 + const pool = self.db() catch |err| { 83 + log.err("backfill: database unavailable: {s}", .{@errorName(err)}); 84 + return; 85 + }; 86 + 82 87 // discover collections 83 88 const collections = self.discoverCollections() catch |err| { 84 89 log.err("collection discovery failed: {s}", .{@errorName(err)}); ··· 93 98 94 99 // insert progress rows (skip existing) 95 100 for (collections) |collection| { 96 - _ = self.db().exec( 101 + _ = pool.exec( 97 102 "INSERT INTO backfill_progress (collection, source) VALUES ($1, $2) ON CONFLICT (collection, source) DO NOTHING", 98 103 .{ collection, self.source }, 99 104 ) catch |err| { ··· 207 212 } 208 213 209 214 fn backfillCollection(self: *Backfiller, collection: []const u8) !void { 215 + const pool = try self.db(); 216 + 210 217 // single query: check completion, get cursor + count for resume 211 218 var cursor: ?[]const u8 = null; 212 219 defer if (cursor) |c| self.allocator.free(c); 213 220 var imported: i64 = 0; 214 221 { 215 - var row = (self.db().rowUnsafe( 222 + var row = (pool.rowUnsafe( 216 223 "SELECT completed_at IS NOT NULL, cursor, imported_count FROM backfill_progress WHERE collection = $1 AND source = $2", 217 224 .{ collection, self.source }, 218 225 ) catch return error.DatabaseError) orelse return; ··· 255 262 256 263 // update cursor in progress table 257 264 const new_cursor = fetch_result.next_cursor orelse ""; 258 - _ = self.db().exec( 265 + _ = pool.exec( 259 266 "UPDATE backfill_progress SET cursor = $1, imported_count = $2 WHERE collection = $3 AND source = $4", 260 267 .{ new_cursor, imported, collection, self.source }, 261 268 ) catch {}; ··· 269 276 self.io.sleep(Io.Duration.fromMilliseconds(100), .awake) catch {}; 270 277 } else { 271 278 // no more pages — mark complete 272 - _ = self.db().exec( 279 + _ = pool.exec( 273 280 "UPDATE backfill_progress SET completed_at = now(), cursor = '', imported_count = $1 WHERE collection = $2 AND source = $3", 274 281 .{ imported, collection, self.source }, 275 282 ) catch {}; ··· 342 349 343 350 /// return status summary for the admin endpoint 344 351 pub fn getStatus(self: *Backfiller, allocator: Allocator) ![]u8 { 352 + const pool = try self.db(); 353 + 345 354 var aw: Io.Writer.Allocating = .init(allocator); 346 355 defer aw.deinit(); 347 356 const w = &aw.writer; ··· 351 360 var completed: i64 = 0; 352 361 var total_imported: i64 = 0; 353 362 { 354 - var row = (self.db().rowUnsafe( 363 + var row = (pool.rowUnsafe( 355 364 "SELECT COUNT(*)::bigint, COUNT(completed_at)::bigint, COALESCE(SUM(imported_count), 0)::bigint FROM backfill_progress", 356 365 .{}, 357 366 ) catch null) orelse null; ··· 372 381 }) catch return error.FormatError; 373 382 374 383 // per-collection detail 375 - var result = self.db().query( 384 + var result = pool.query( 376 385 "SELECT collection, source, cursor, imported_count, completed_at IS NOT NULL FROM backfill_progress ORDER BY collection, source", 377 386 .{}, 378 387 ) catch return error.DatabaseError;
+1 -1
src/broadcaster.zig
··· 782 782 // OutdatedCursor: cursor older than oldest available — info, continue 783 783 const oldest = blk: { 784 784 if (ctx.persist) |dp| { 785 - if (dp.firstSeqEv()) |s| break :blk s; 785 + if (dp.firstSeqEv() catch null) |s| break :blk s; 786 786 } 787 787 break :blk ctx.history.oldestSeq() orelse 0; 788 788 };
+7 -2
src/cleaner.zig
··· 23 23 scanned: std.atomic.Value(u64), 24 24 removed: std.atomic.Value(u64), 25 25 26 - fn db(self: *Cleaner) *pg.Pool { 26 + fn db(self: *Cleaner) !*pg.Pool { 27 27 return self.persist.ensureEvDb(); 28 28 } 29 29 ··· 69 69 70 70 log.info("cleanup started", .{}); 71 71 72 + const pool = self.db() catch |err| { 73 + log.err("cleanup: database unavailable: {s}", .{@errorName(err)}); 74 + return; 75 + }; 76 + 72 77 // page through inactive accounts by uid 73 78 var last_uid: i64 = 0; 74 79 while (true) { 75 80 var batch_count: u64 = 0; 76 81 { 77 - var result = self.db().query( 82 + var result = pool.query( 78 83 "SELECT uid, did FROM account WHERE (status != 'active' OR upstream_status != 'active') AND uid > $1 ORDER BY uid LIMIT 500", 79 84 .{last_uid}, 80 85 ) catch |err| {
+27 -24
src/event_log.zig
··· 144 144 /// pg.Pool.initUri does TCP connects via io_uring, so it can only run inside 145 145 /// an Evented fiber (after the event loop is spinning). callers from main() 146 146 /// init set ev_io/db_url/ev_db_pool_size; the actual pool is created here. 147 - pub fn ensureEvDb(self: *DiskPersist) *pg.Pool { 147 + /// on failure, resets state to uninit so the next call retries. 148 + pub fn ensureEvDb(self: *DiskPersist) !*pg.Pool { 148 149 // fast path — already initialized (single atomic load) 149 150 if (self.ev_db) |db| return db; 150 151 151 - const ev_io = self.ev_io orelse @panic("ensureEvDb: ev_io not set"); 152 + const ev_io = self.ev_io orelse return error.EvDbNotConfigured; 152 153 153 154 while (true) { 154 155 const state: EvDbInit = @enumFromInt(self.ev_db_state.load(.acquire)); 155 156 switch (state) { 156 - .ready => return self.ensureEvDb(), 157 + .ready => return self.ev_db orelse error.EvDbNotConfigured, 157 158 .uninit => { 158 159 if (self.ev_db_state.cmpxchgWeak( 159 160 @intFromEnum(EvDbInit.uninit), ··· 163 164 ) == null) { 164 165 // won the race — create the pool 165 166 const uri = std.Uri.parse(self.db_url) catch 166 - @panic("ensureEvDb: invalid DATABASE_URL"); 167 + return error.InvalidDatabaseUrl; 167 168 self.ev_db = pg.Pool.initUri( 168 169 self.allocator, 169 170 ev_io, ··· 171 172 .{ .size = self.ev_db_pool_size }, 172 173 ) catch |err| { 173 174 log.err("ensureEvDb: initUri failed: {s}", .{@errorName(err)}); 174 - @panic("ensureEvDb: failed to create Evented pg.Pool"); 175 + // reset to uninit so next call retries 176 + self.ev_db_state.store(@intFromEnum(EvDbInit.uninit), .release); 177 + return error.EvDbInitFailed; 175 178 }; 176 179 self.ev_db_state.store(@intFromEnum(EvDbInit.ready), .release); 177 180 log.info("lazy-initialized Evented pg.Pool (size={d})", .{self.ev_db_pool_size}); 178 - return self.ensureEvDb(); 181 + return self.ev_db.?; 179 182 } 180 183 // lost CAS — another fiber is initializing, fall through 181 184 }, ··· 499 502 } 500 503 501 504 /// count accounts on a host (Evented pool) 502 - pub fn getHostAccountCountEv(self: *DiskPersist, host_id: u64) u64 { 503 - return getHostAccountCountImpl(host_id, self.ensureEvDb()); 505 + pub fn getHostAccountCountEv(self: *DiskPersist, host_id: u64) !u64 { 506 + return getHostAccountCountImpl(host_id, try self.ensureEvDb()); 504 507 } 505 508 506 509 fn getHostAccountCountImpl(host_id: u64, db: *pg.Pool) u64 { ··· 519 522 } 520 523 521 524 /// effective account count (Evented pool) 522 - pub fn getEffectiveAccountCountEv(self: *DiskPersist, host_id: u64) u64 { 523 - return getEffectiveAccountCountImpl(host_id, self.ensureEvDb()); 525 + pub fn getEffectiveAccountCountEv(self: *DiskPersist, host_id: u64) !u64 { 526 + return getEffectiveAccountCountImpl(host_id, try self.ensureEvDb()); 524 527 } 525 528 526 529 /// uses admin-configured limit if set, otherwise actual COUNT(*). ··· 541 544 542 545 /// set host account limit (Evented pool) 543 546 pub fn setHostAccountLimitEv(self: *DiskPersist, host_id: u64, limit: ?u64) !void { 544 - return setHostAccountLimitImpl(host_id, limit, self.ensureEvDb()); 547 + return setHostAccountLimitImpl(host_id, limit, try self.ensureEvDb()); 545 548 } 546 549 547 550 /// pass null to clear the override and revert to actual COUNT(*). ··· 613 616 614 617 /// get or create a host row (Evented pool) 615 618 pub fn getOrCreateHostEv(self: *DiskPersist, hostname: []const u8) !HostResult { 616 - return getOrCreateHostImpl(hostname, self.ensureEvDb()); 619 + return getOrCreateHostImpl(hostname, try self.ensureEvDb()); 617 620 } 618 621 619 622 fn getOrCreateHostImpl(hostname: []const u8, db: *pg.Pool) !HostResult { ··· 642 645 } 643 646 644 647 /// check if a host is banned or blocked by status (Evented pool) 645 - pub fn isHostBannedEv(self: *DiskPersist, hostname: []const u8) bool { 646 - return isHostBannedImpl(hostname, self.ensureEvDb()); 648 + pub fn isHostBannedEv(self: *DiskPersist, hostname: []const u8) !bool { 649 + return isHostBannedImpl(hostname, try self.ensureEvDb()); 647 650 } 648 651 649 652 fn isHostBannedImpl(hostname: []const u8, db: *pg.Pool) bool { ··· 674 677 675 678 /// look up host ID by hostname (Evented pool) 676 679 pub fn getHostIdForHostnameEv(self: *DiskPersist, hostname: []const u8) !?u64 { 677 - return getHostIdForHostnameImpl(hostname, self.ensureEvDb()); 680 + return getHostIdForHostnameImpl(hostname, try self.ensureEvDb()); 678 681 } 679 682 680 683 fn getHostIdForHostnameImpl(hostname: []const u8, db: *pg.Pool) !?u64 { ··· 693 696 694 697 /// update host status (Evented pool) 695 698 pub fn updateHostStatusEv(self: *DiskPersist, host_id: u64, status: []const u8) !void { 696 - return updateHostStatusImpl(host_id, status, self.ensureEvDb()); 699 + return updateHostStatusImpl(host_id, status, try self.ensureEvDb()); 697 700 } 698 701 699 702 fn updateHostStatusImpl(host_id: u64, status: []const u8, db: *pg.Pool) !void { ··· 710 713 711 714 /// list all active hosts (Evented pool) 712 715 pub fn listActiveHostsEv(self: *DiskPersist, allocator: Allocator) ![]Host { 713 - return listActiveHostsImpl(allocator, self.ensureEvDb()); 716 + return listActiveHostsImpl(allocator, try self.ensureEvDb()); 714 717 } 715 718 716 719 fn listActiveHostsImpl(allocator: Allocator, db: *pg.Pool) ![]Host { ··· 750 753 751 754 /// list all hosts (Evented pool) 752 755 pub fn listAllHostsEv(self: *DiskPersist, allocator: Allocator) ![]Host { 753 - return listAllHostsImpl(allocator, self.ensureEvDb()); 756 + return listAllHostsImpl(allocator, try self.ensureEvDb()); 754 757 } 755 758 756 759 fn listAllHostsImpl(allocator: Allocator, db: *pg.Pool) ![]Host { ··· 803 806 } 804 807 805 808 /// check if a hostname (or any parent domain) is banned (Evented pool). 806 - pub fn isDomainBannedEv(self: *DiskPersist, hostname: []const u8) bool { 807 - return self.isDomainBannedImpl(hostname, self.ensureEvDb()); 809 + pub fn isDomainBannedEv(self: *DiskPersist, hostname: []const u8) !bool { 810 + return self.isDomainBannedImpl(hostname, try self.ensureEvDb()); 808 811 } 809 812 810 813 /// Go relay: domain_ban.go DomainIsBanned — suffix-based check. ··· 836 839 837 840 /// reset failure count (Evented pool) 838 841 pub fn resetHostFailuresEv(self: *DiskPersist, host_id: u64) !void { 839 - return resetHostFailuresImpl(host_id, self.ensureEvDb()); 842 + return resetHostFailuresImpl(host_id, try self.ensureEvDb()); 840 843 } 841 844 842 845 fn resetHostFailuresImpl(host_id: u64, db: *pg.Pool) !void { ··· 849 852 /// resolve a DID to UID using the Evented pool. skips the DID cache 850 853 /// (which uses pool_io mutex). only used from admin ban (rare path). 851 854 pub fn uidForDidEv(self: *DiskPersist, did: []const u8) !u64 { 852 - const db = self.ensureEvDb(); 855 + const db = try self.ensureEvDb(); 853 856 // check database 854 857 if (try db.rowUnsafe( 855 858 "SELECT uid FROM account WHERE did = $1", ··· 998 1001 } 999 1002 1000 1003 /// oldest available sequence number (Evented pool) 1001 - pub fn firstSeqEv(self: *DiskPersist) ?u64 { 1002 - return firstSeqImpl(self.ensureEvDb()); 1004 + pub fn firstSeqEv(self: *DiskPersist) !?u64 { 1005 + return firstSeqImpl(try self.ensureEvDb()); 1003 1006 } 1004 1007 1005 1008 fn firstSeqImpl(db: *pg.Pool) ?u64 {
+4 -4
src/slurper.zig
··· 363 363 defer self.allocator.free(normalized); 364 364 365 365 // skip banned domains (Evented fiber — use Ev pool) 366 - if (self.persist.isDomainBannedEv(normalized)) continue; 366 + if (self.persist.isDomainBannedEv(normalized) catch true) continue; 367 367 368 368 // insert into DB (no describeServer check — the seed relay already vetted them) 369 369 _ = self.persist.getOrCreateHostEv(normalized) catch continue; ··· 419 419 420 420 // step 2: domain ban check (Evented fiber — use Ev pool) 421 421 // Go relay: domain_ban.go DomainIsBanned 422 - if (self.persist.isDomainBannedEv(hostname)) { 422 + if (self.persist.isDomainBannedEv(hostname) catch true) { 423 423 log.warn("host {s}: domain is banned, rejecting", .{hostname}); 424 424 return; 425 425 } 426 426 427 427 // step 3: check if host is banned/blocked in DB (Evented pool) 428 428 // Go relay: crawl.go checks host.Status == HostStatusBanned 429 - if (self.persist.isHostBannedEv(hostname)) { 429 + if (self.persist.isHostBannedEv(hostname) catch true) { 430 430 log.warn("host {s}: banned/blocked in DB, rejecting", .{hostname}); 431 431 return; 432 432 } ··· 468 468 const sub = try self.allocator.create(subscriber_mod.Subscriber); 469 469 errdefer self.allocator.destroy(sub); 470 470 471 - const account_count: u64 = self.persist.getEffectiveAccountCountEv(host_id); 471 + const account_count: u64 = self.persist.getEffectiveAccountCountEv(host_id) catch 0; 472 472 473 473 sub.* = subscriber_mod.Subscriber.init( 474 474 self.allocator,