atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

add host retention: dormant state, higher backoff, reconciliation

hosts that exhaust max failures are now marked "dormant" (not
"exhausted") and their thread stops. a reconciliation fiber runs every
5 minutes, queries active hosts from DB, and respawns any missing
workers. on successful reconnect, reset_failures also flips the host
back to "active", so recovered PDS instances are picked up automatically.

other changes:
- backoff is now a struct field on Subscriber, reset to 1 on connect
- max_backoff raised from 60s to 1800s (30 min) for dormant-eligible hosts
- "dormant" mapped to "offline" in getHostStatus API responses
- reconcile thread added to design.md threading table
- host status lifecycle documented in postgres tables section

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+119 -10
+22
docs/design.md
··· 54 54 | flush thread | 1 | 8 MB | batched fsync of event log (100ms or 400 events) | 55 55 | GC thread | 1 | 8 MB | event log file cleanup every 10 minutes | 56 56 | crawl queue thread | 1 | 8 MB | process `requestCrawl` — validate hostname, describeServer, spawn worker | 57 + | reconcile thread | 1 | 8 MB | every 5 min, query active hosts from DB, respawn missing workers | 57 58 | metrics server | 1 | 8 MB | HTTP on internal port, prometheus scrape | 58 59 | main thread | 1 | default | signal handling, shutdown coordination | 59 60 ··· 127 128 - `account` — uid, did, status, upstream_status, host_id 128 129 - `account_repo` — uid, rev, commit_data_cid (latest repo state) 129 130 - `host` — id, hostname, status, last_seq, failed_attempts 131 + - status lifecycle: `active` → `dormant` (after max failures, thread stops) 132 + → `active` (on successful reconnect via reconciliation or crawl request). 133 + `blocked` is permanent (admin action). `idle` is set by FutureCursor. 130 134 - `log_file_refs` — seq→file mapping for cursor binary search 131 135 - `domain_ban` — banned domain suffixes 132 136 - `backfill_progress` — collection backfill cursor tracking ··· 228 232 can be tuned independently based on observed `ConsumerTooSlow` disconnect rate. 229 233 the ring buffer is lock-free (atomic read/write indices), so the bottleneck is 230 234 consumer write throughput, not buffer contention. 235 + 236 + ### host retention model 237 + 238 + indigo marks hosts as "exhausted" after max failures and never retries. zlay 239 + uses a dormant/reconciliation model instead: 240 + 241 + - after max consecutive failures (15), the host is marked `dormant` and its 242 + thread stops. backoff grows to 30 min max (vs indigo's permanent death). 243 + - every 5 minutes, a reconciliation thread queries active hosts from the DB 244 + and respawns workers for any missing from the workers map. 245 + - on successful reconnect, `reset_failures` also sets the host back to 246 + `active`, so dormant hosts that come back online are automatically recovered. 247 + - a connect gate (`MAX_CONCURRENT_CONNECTS`, default 50) bounds the DNS/TLS 248 + storm during startup and reconnect waves, preventing health probe starvation. 249 + 250 + the net effect: hosts that go down temporarily are retried with increasing 251 + backoff, and hosts that come back are picked up within 5 minutes. no manual 252 + intervention needed unless the host is permanently dead (admin can block it).
+2
src/api/xrpc.zig
··· 584 584 "banned" 585 585 else if (std.mem.eql(u8, raw_status, "exhausted")) 586 586 "offline" 587 + else if (std.mem.eql(u8, raw_status, "dormant")) 588 + "offline" 587 589 else 588 590 raw_status; 589 591
+6 -2
src/host_ops.zig
··· 281 281 .increment_failures => { 282 282 const failures = self.persist.incrementHostFailures(op.host_id) catch 0; 283 283 if (failures >= self.max_consecutive_failures) { 284 - log.warn("host_ops: host_id={d} exhausted after {d} failures", .{ op.host_id, failures }); 285 - self.persist.updateHostStatus(op.host_id, "exhausted") catch {}; 284 + log.warn("host_ops: host_id={d} dormant after {d} failures", .{ op.host_id, failures }); 285 + self.persist.updateHostStatus(op.host_id, "dormant") catch {}; 286 286 op.payload.host_shutdown.store(true, .release); 287 287 } 288 288 }, 289 289 .reset_failures => { 290 290 self.persist.resetHostFailures(op.host_id) catch |err| { 291 291 log.debug("host_ops: reset failures failed for host_id={d}: {s}", .{ op.host_id, @errorName(err) }); 292 + }; 293 + // flip dormant hosts back to active on successful reconnect 294 + self.persist.updateHostStatus(op.host_id, "active") catch |err| { 295 + log.debug("host_ops: reset status failed for host_id={d}: {s}", .{ op.host_id, @errorName(err) }); 292 296 }; 293 297 }, 294 298 .update_status => {
+79 -1
src/slurper.zig
··· 5 5 //! - spawning/stopping subscriber workers 6 6 //! - processing crawl requests (adding new hosts) 7 7 //! - host validation (format, domain ban, describeServer, relay loop detection) 8 - //! - tracking host lifecycle (active → exhausted → blocked) 8 + //! - host lifecycle: active → dormant (after max failures) → active (on reconnect) 9 + //! - reconciliation: every 5 min, respawns workers for active hosts missing from the map 9 10 //! 10 11 //! all downstream components (Broadcaster, DiskPersist, Validator) are 11 12 //! thread-safe for N concurrent producers, so this just orchestrates. ··· 259 260 // background tasks 260 261 startup_future: ?Io.Future(void) = null, 261 262 crawl_future: ?Io.Future(void) = null, 263 + reconcile_future: ?Io.Future(void) = null, 262 264 263 265 io: Io, 264 266 /// dedicated Threaded io for the frame worker pool — safe from plain OS threads ··· 309 311 // pullHosts + listActiveHosts + spawnWorker all happen in the background thread. 310 312 self.startup_future = try self.io.concurrent(spawnWorkers, .{self}); 311 313 self.crawl_future = try self.io.concurrent(processCrawlQueue, .{self}); 314 + self.reconcile_future = try self.io.concurrent(reconcileHosts, .{self}); 312 315 } 313 316 314 317 /// pull PDS host list from the seed relay's com.atproto.sync.listHosts endpoint. ··· 693 696 log.info("startup complete: {d} host(s) spawned", .{hosts.len}); 694 697 } 695 698 699 + /// background fiber: every 5 minutes, query active hosts from DB and respawn 700 + /// any that are missing from the workers map. handles hosts that exited due to 701 + /// transient errors or were marked dormant and later flipped back to active. 702 + fn reconcileHosts(self: *Slurper) void { 703 + const reconcile_interval: u64 = 5 * 60; // 5 minutes 704 + 705 + while (!self.shutdown.load(.acquire)) { 706 + // sleep in 1-second increments so we can check shutdown 707 + var remaining: u64 = reconcile_interval; 708 + while (remaining > 0 and !self.shutdown.load(.acquire)) { 709 + self.io.sleep(Io.Duration.fromSeconds(1), .awake) catch return; 710 + remaining -= 1; 711 + } 712 + if (self.shutdown.load(.acquire)) return; 713 + 714 + const db_queue = self.db_queue orelse continue; 715 + 716 + // load active hosts via DbRequestQueue 717 + const ListActiveHostsReq = struct { 718 + base: event_log_mod.DbRequest = .{ .callback = &execute }, 719 + alloc: Allocator, 720 + result: ?[]event_log_mod.DiskPersist.Host = null, 721 + 722 + fn execute(b: *event_log_mod.DbRequest, dp: *event_log_mod.DiskPersist) void { 723 + const s: *@This() = @fieldParentPtr("base", b); 724 + s.result = dp.listActiveHosts(s.alloc) catch |e| { 725 + b.err = e; 726 + return; 727 + }; 728 + } 729 + }; 730 + var list_req: ListActiveHostsReq = .{ .alloc = self.allocator }; 731 + db_queue.push(&list_req.base); 732 + list_req.base.wait(self.io, self.shutdown); 733 + 734 + if (list_req.base.err != null or list_req.result == null) { 735 + log.warn("reconcile: failed to load active hosts", .{}); 736 + continue; 737 + } 738 + 739 + const hosts = list_req.result.?; 740 + defer { 741 + for (hosts) |h| { 742 + self.allocator.free(h.hostname); 743 + self.allocator.free(h.status); 744 + } 745 + self.allocator.free(hosts); 746 + } 747 + 748 + var respawned: usize = 0; 749 + for (hosts) |host| { 750 + if (self.shutdown.load(.acquire)) break; 751 + 752 + // check if already running 753 + const has_worker = blk: { 754 + self.workers_mutex.lockUncancelable(self.io); 755 + defer self.workers_mutex.unlock(self.io); 756 + break :blk self.workers.contains(host.id); 757 + }; 758 + if (has_worker) continue; 759 + 760 + self.spawnWorker(host.id, host.hostname, host.last_seq) catch |err| { 761 + log.warn("reconcile: failed to respawn {s}: {s}", .{ host.hostname, @errorName(err) }); 762 + continue; 763 + }; 764 + respawned += 1; 765 + } 766 + 767 + if (respawned > 0) { 768 + log.info("reconcile: respawned {d} host(s)", .{respawned}); 769 + } 770 + } 771 + } 772 + 696 773 /// background thread: process crawl requests 697 774 fn processCrawlQueue(self: *Slurper) void { 698 775 while (!self.shutdown.load(.acquire)) { ··· 745 822 if (self.startup_future) |*f| f.cancel(self.io); 746 823 self.crawl_cond.signal(self.io); 747 824 if (self.crawl_future) |*f| f.cancel(self.io); 825 + if (self.reconcile_future) |*f| f.cancel(self.io); 748 826 749 827 // collect futures to cancel (can't cancel while holding workers_mutex) 750 828 var futures_to_cancel: std.ArrayListUnmanaged(Io.Future(void)) = .empty;
+10 -7
src/subscriber.zig
··· 229 229 last_upstream_seq: ?u64 = null, 230 230 last_cursor_flush: i64 = 0, 231 231 rate_limiter: RateLimiter = .{}, 232 + /// exponential backoff (seconds) between reconnect attempts — persists across retries, 233 + /// reset to 1 on successful connect. max 1800s (30 min) for dormant-eligible hosts. 234 + backoff: u64 = 1, 232 235 233 236 // per-host shutdown (e.g. FutureCursor — stops only this subscriber) 234 237 host_shutdown: std.atomic.Value(bool) = .{ .raw = false }, ··· 266 269 } 267 270 268 271 /// run the subscriber loop. reconnects with exponential backoff. 269 - /// blocks until shutdown flag is set or host is exhausted. 272 + /// blocks until shutdown or host marked dormant. 270 273 pub fn run(self: *Subscriber) void { 271 - var backoff: u64 = 1; 272 - const max_backoff: u64 = 60; 274 + const max_backoff: u64 = 1800; 273 275 274 276 // cursor is set at spawn time by slurper 275 277 if (self.last_upstream_seq) |seq| { ··· 293 295 294 296 self.connectAndRead() catch |err| { 295 297 if (self.shouldStop()) return; 296 - log.err("host {s}: error: {s}, reconnecting in {d}s...", .{ self.options.hostname, @errorName(err), backoff }); 298 + log.err("host {s}: error: {s}, reconnecting in {d}s...", .{ self.options.hostname, @errorName(err), self.backoff }); 297 299 }; 298 300 299 301 if (self.shouldStop()) return; ··· 310 312 } 311 313 312 314 // backoff sleep in small increments so we can check shutdown 313 - var remaining: u64 = backoff; 315 + var remaining: u64 = self.backoff; 314 316 while (remaining > 0 and !self.shouldStop()) { 315 317 const chunk = @min(remaining, 1); 316 318 self.io.sleep(Io.Duration.fromSeconds(@intCast(chunk)), .awake) catch {}; 317 319 remaining -= chunk; 318 320 } 319 - backoff = @min(backoff * 2, max_backoff); 321 + self.backoff = @min(self.backoff * 2, max_backoff); 320 322 } 321 323 } 322 324 ··· 366 368 try client.handshake(path, .{ .headers = host_header }); 367 369 log.info("host {s}: connected", .{self.options.hostname}); 368 370 369 - // reset failures on successful connect (via host_ops queue) 371 + // reset failures + backoff on successful connect (via host_ops queue) 372 + self.backoff = 1; 370 373 if (self.options.host_id > 0) { 371 374 if (self.host_ops) |hq| { 372 375 hq.push(.{