atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

persistent host reconnect: never give up on known hosts

subscribers now retry with exponential backoff capped at 30 min
(was 60s cap with hard kill at 15 failures). on successful connect,
backoff resets to 1s and host flips back to active. hosts that fail
15+ consecutive times are marked dormant (observable) but the
subscriber keeps retrying. a reconciliation loop every 5 min
respawns any active/dormant host missing from the worker map.

this eliminates dependence on the external reconnect cron for host
retention — it can be reduced to discovery-only.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+125 -15
+15
docs/design.md
··· 54 54 | flush thread | 1 | 8 MB | batched fsync of event log (100ms or 400 events) | 55 55 | GC thread | 1 | 8 MB | event log file cleanup every 10 minutes | 56 56 | crawl queue thread | 1 | 8 MB | process `requestCrawl` — validate hostname, describeServer, spawn worker | 57 + | reconcile thread | 1 | 8 MB | every 5 min: respawn workers for active/dormant hosts missing from worker map | 57 58 | metrics server | 1 | 8 MB | HTTP on internal port, prometheus scrape | 58 59 | main thread | 1 | default | signal handling, shutdown coordination | 59 60 ··· 127 128 - `account` — uid, did, status, upstream_status, host_id 128 129 - `account_repo` — uid, rev, commit_data_cid (latest repo state) 129 130 - `host` — id, hostname, status, last_seq, failed_attempts 131 + - status lifecycle: `active` → `dormant` (after sustained failures, subscriber keeps retrying) → `active` (on successful reconnect). `blocked`/`banned` via admin. `idle` on FutureCursor. 130 132 - `log_file_refs` — seq→file mapping for cursor binary search 131 133 - `domain_ban` — banned domain suffixes 132 134 - `backfill_progress` — collection backfill cursor tracking ··· 221 223 - new accounts trigger async DID doc verification; on mismatch → rejected 222 224 - signature failures trigger key eviction + re-resolution (sync spec guidance) 223 225 - next commit from the same DID hits the refreshed cache 226 + 227 + ### host retention model 228 + 229 + indigo gives up on a host after 15 consecutive dial failures (~3 min) 230 + and marks it `offline`, relying on external `requestCrawl` to recover. 231 + zlay never gives up on a known host — subscribers retry with exponential 232 + backoff capped at 30 minutes, and backoff resets to 1 second on any 233 + successful connection. after sustained failure (15+ consecutive 234 + disconnects), the host is marked `dormant` for observability but the 235 + subscriber keeps retrying. a reconciliation loop (every 5 min) respawns 236 + any active/dormant host missing from the worker map, catching hosts lost 237 + to any gap. this eliminates the need for an external reconnect cron — 238 + the cron can be reduced to discovery-only (new hosts not yet in the DB). 224 239 225 240 ### consumer buffer sizing 226 241
+1 -1
src/api/xrpc.zig
··· 582 582 583 583 const status = if (std.mem.eql(u8, raw_status, "blocked")) 584 584 "banned" 585 - else if (std.mem.eql(u8, raw_status, "exhausted")) 585 + else if (std.mem.eql(u8, raw_status, "exhausted") or std.mem.eql(u8, raw_status, "dormant")) 586 586 "offline" 587 587 else 588 588 raw_status;
+15 -1
src/event_log.zig
··· 702 702 } 703 703 704 704 fn listActiveHostsImpl(allocator: Allocator, db: *pg.Pool) ![]Host { 705 + return listHostsByStatusImpl(allocator, db, "status = 'active'"); 706 + } 707 + 708 + /// list hosts eligible for reconnection: active + dormant (Threaded pool). 709 + /// used by the reconciliation loop to respawn missing workers. 710 + pub fn listReconnectableHosts(self: *DiskPersist, allocator: Allocator) ![]Host { 711 + return listReconnectableHostsImpl(allocator, self.db); 712 + } 713 + 714 + fn listReconnectableHostsImpl(allocator: Allocator, db: *pg.Pool) ![]Host { 715 + return listHostsByStatusImpl(allocator, db, "status IN ('active', 'dormant')"); 716 + } 717 + 718 + fn listHostsByStatusImpl(allocator: Allocator, db: *pg.Pool, comptime where: []const u8) ![]Host { 705 719 var hosts: std.ArrayListUnmanaged(Host) = .empty; 706 720 errdefer { 707 721 for (hosts.items) |host| { ··· 712 726 } 713 727 714 728 var result = try db.query( 715 - "SELECT id, hostname, status, last_seq, failed_attempts, account_limit FROM host WHERE status = 'active' ORDER BY id ASC", 729 + "SELECT id, hostname, status, last_seq, failed_attempts, account_limit FROM host WHERE " ++ where ++ " ORDER BY id ASC", 716 730 .{}, 717 731 ); 718 732 defer result.deinit();
+6 -3
src/host_ops.zig
··· 281 281 .increment_failures => { 282 282 const failures = self.persist.incrementHostFailures(op.host_id) catch 0; 283 283 if (failures >= self.max_consecutive_failures) { 284 - log.warn("host_ops: host_id={d} exhausted after {d} failures", .{ op.host_id, failures }); 285 - self.persist.updateHostStatus(op.host_id, "exhausted") catch {}; 286 - op.payload.host_shutdown.store(true, .release); 284 + // mark dormant for observability — subscriber keeps retrying. 285 + // reconciliation loop will also periodically re-check dormant hosts. 286 + log.warn("host_ops: host_id={d} marked dormant after {d} failures", .{ op.host_id, failures }); 287 + self.persist.updateHostStatus(op.host_id, "dormant") catch {}; 287 288 } 288 289 }, 289 290 .reset_failures => { 290 291 self.persist.resetHostFailures(op.host_id) catch |err| { 291 292 log.debug("host_ops: reset failures failed for host_id={d}: {s}", .{ op.host_id, @errorName(err) }); 292 293 }; 294 + // if host was dormant, flip back to active on successful reconnect 295 + self.persist.updateHostStatus(op.host_id, "active") catch {}; 293 296 }, 294 297 .update_status => { 295 298 self.persist.updateHostStatus(op.host_id, op.payload.status.slice()) catch |err| {
+76 -1
src/slurper.zig
··· 5 5 //! - spawning/stopping subscriber workers 6 6 //! - processing crawl requests (adding new hosts) 7 7 //! - host validation (format, domain ban, describeServer, relay loop detection) 8 - //! - tracking host lifecycle (active → exhausted → blocked) 8 + //! - tracking host lifecycle (active → dormant, with persistent retry) 9 + //! - periodic reconciliation of DB host table vs running workers 9 10 //! 10 11 //! all downstream components (Broadcaster, DiskPersist, Validator) are 11 12 //! thread-safe for N concurrent producers, so this just orchestrates. ··· 253 254 // background tasks 254 255 startup_future: ?Io.Future(void) = null, 255 256 crawl_future: ?Io.Future(void) = null, 257 + reconcile_future: ?Io.Future(void) = null, 256 258 257 259 io: Io, 258 260 /// dedicated Threaded io for the frame worker pool — safe from plain OS threads ··· 302 304 // pullHosts + listActiveHosts + spawnWorker all happen in the background thread. 303 305 self.startup_future = try self.io.concurrent(spawnWorkers, .{self}); 304 306 self.crawl_future = try self.io.concurrent(processCrawlQueue, .{self}); 307 + self.reconcile_future = try self.io.concurrent(reconcileHosts, .{self}); 305 308 } 306 309 307 310 /// pull PDS host list from the seed relay's com.atproto.sync.listHosts endpoint. ··· 709 712 } 710 713 } 711 714 715 + /// background fiber: periodically reconcile DB host table with running workers. 716 + /// respawns any active/dormant host that doesn't have a running subscriber. 717 + /// catches hosts lost to prior exhaustion, bugs, or any other gap. 718 + fn reconcileHosts(self: *Slurper) void { 719 + const interval = Io.Duration.fromSeconds(5 * 60); // 5 minutes 720 + 721 + // wait for initial startup to finish before first reconciliation 722 + if (self.startup_future) |*f| { 723 + f.wait(self.io) catch {}; 724 + } 725 + 726 + while (!self.shutdown.load(.acquire)) { 727 + self.io.sleep(interval, .awake) catch return; 728 + if (self.shutdown.load(.acquire)) return; 729 + 730 + const db_queue = self.db_queue orelse continue; 731 + 732 + // query reconnectable hosts (active + dormant) via DbRequestQueue 733 + const ListReconnectableReq = struct { 734 + base: event_log_mod.DbRequest = .{ .callback = &execute }, 735 + alloc: Allocator, 736 + result: ?[]event_log_mod.DiskPersist.Host = null, 737 + 738 + fn execute(b: *event_log_mod.DbRequest, dp: *event_log_mod.DiskPersist) void { 739 + const s: *@This() = @fieldParentPtr("base", b); 740 + s.result = dp.listReconnectableHosts(s.alloc) catch |e| { 741 + b.err = e; 742 + return; 743 + }; 744 + } 745 + }; 746 + var list_req: ListReconnectableReq = .{ .alloc = self.allocator }; 747 + db_queue.push(&list_req.base); 748 + list_req.base.wait(self.io, self.shutdown); 749 + 750 + if (list_req.base.err != null or list_req.result == null) continue; 751 + 752 + const hosts = list_req.result.?; 753 + defer { 754 + for (hosts) |h| { 755 + self.allocator.free(h.hostname); 756 + self.allocator.free(h.status); 757 + } 758 + self.allocator.free(hosts); 759 + } 760 + 761 + var respawned: usize = 0; 762 + for (hosts) |host| { 763 + if (self.shutdown.load(.acquire)) break; 764 + 765 + const missing = blk: { 766 + self.workers_mutex.lockUncancelable(self.io); 767 + defer self.workers_mutex.unlock(self.io); 768 + break :blk !self.workers.contains(host.id); 769 + }; 770 + 771 + if (missing) { 772 + self.spawnWorker(host.id, host.hostname, host.last_seq) catch |err| { 773 + log.warn("reconcile: failed to respawn {s}: {s}", .{ host.hostname, @errorName(err) }); 774 + continue; 775 + }; 776 + respawned += 1; 777 + } 778 + } 779 + 780 + if (respawned > 0) { 781 + log.info("reconcile: respawned {d} host(s)", .{respawned}); 782 + } 783 + } 784 + } 785 + 712 786 /// number of active workers 713 787 pub fn workerCount(self: *Slurper) usize { 714 788 self.workers_mutex.lockUncancelable(self.io); ··· 737 811 if (self.startup_future) |*f| f.cancel(self.io); 738 812 self.crawl_cond.signal(self.io); 739 813 if (self.crawl_future) |*f| f.cancel(self.io); 814 + if (self.reconcile_future) |*f| f.cancel(self.io); 740 815 741 816 // collect futures to cancel (can't cancel while holding workers_mutex) 742 817 var futures_to_cancel: std.ArrayListUnmanaged(Io.Future(void)) = .empty;
+12 -9
src/subscriber.zig
··· 219 219 shutdown: *std.atomic.Value(bool), 220 220 last_upstream_seq: ?u64 = null, 221 221 last_cursor_flush: i64 = 0, 222 + backoff: u64 = 1, 222 223 rate_limiter: RateLimiter = .{}, 223 224 224 225 // per-host shutdown (e.g. FutureCursor — stops only this subscriber) ··· 256 257 return self.shutdown.load(.acquire) or self.host_shutdown.load(.acquire); 257 258 } 258 259 259 - /// run the subscriber loop. reconnects with exponential backoff. 260 - /// blocks until shutdown flag is set or host is exhausted. 260 + /// run the subscriber loop. reconnects with exponential backoff (cap 30 min). 261 + /// blocks until shutdown flag is set. never gives up on a known host — 262 + /// host_ops marks hosts dormant after sustained failure for observability, 263 + /// but the subscriber keeps retrying. 261 264 pub fn run(self: *Subscriber) void { 262 - var backoff: u64 = 1; 263 - const max_backoff: u64 = 60; 265 + const max_backoff: u64 = 1800; // 30 minutes 264 266 265 267 // cursor is set at spawn time by slurper 266 268 if (self.last_upstream_seq) |seq| { ··· 272 274 273 275 self.connectAndRead() catch |err| { 274 276 if (self.shouldStop()) return; 275 - log.err("host {s}: error: {s}, reconnecting in {d}s...", .{ self.options.hostname, @errorName(err), backoff }); 277 + log.err("host {s}: error: {s}, reconnecting in {d}s...", .{ self.options.hostname, @errorName(err), self.backoff }); 276 278 }; 277 279 278 280 if (self.shouldStop()) return; ··· 283 285 hq.push(.{ 284 286 .host_id = self.options.host_id, 285 287 .kind = .increment_failures, 286 - .payload = .{ .host_shutdown = &self.host_shutdown }, 288 + .payload = .{ .none = {} }, 287 289 }); 288 290 } 289 291 } 290 292 291 293 // backoff sleep in small increments so we can check shutdown 292 - var remaining: u64 = backoff; 294 + var remaining: u64 = self.backoff; 293 295 while (remaining > 0 and !self.shouldStop()) { 294 296 const chunk = @min(remaining, 1); 295 297 self.io.sleep(Io.Duration.fromSeconds(@intCast(chunk)), .awake) catch {}; 296 298 remaining -= chunk; 297 299 } 298 - backoff = @min(backoff * 2, max_backoff); 300 + self.backoff = @min(self.backoff * 2, max_backoff); 299 301 } 300 302 } 301 303 ··· 345 347 try client.handshake(path, .{ .headers = host_header }); 346 348 log.info("host {s}: connected", .{self.options.hostname}); 347 349 348 - // reset failures on successful connect (via host_ops queue) 350 + // reset failures + backoff on successful connect 351 + self.backoff = 1; 349 352 if (self.options.host_id > 0) { 350 353 if (self.host_ops) |hq| { 351 354 hq.push(.{