atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

revert host retention changes (143605c..e5a2a14)

reverts all four host retention commits that caused production
restart loops. the interaction between reconciliation, dormant
logic, startup jitter, and cold-start ramp with ~2,800 hosts
was not testable via unit tests and each deploy regressed
relay-eval coverage (alternating 0%/97% from kubelet kills).

back to 80eca78 behavior: exhausted hosts stop after 15 failures,
cron handles re-discovery. stable baseline for 24+ hours at 97-99%.

the feature needs a local test harness that validates startup ramp
behavior against a realistic host table before any production deploy.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+16 -128
-15
docs/design.md
··· 54 54 | flush thread | 1 | 8 MB | batched fsync of event log (100ms or 400 events) | 55 55 | GC thread | 1 | 8 MB | event log file cleanup every 10 minutes | 56 56 | crawl queue thread | 1 | 8 MB | process `requestCrawl` — validate hostname, describeServer, spawn worker | 57 - | reconcile thread | 1 | 8 MB | every 5 min: respawn workers for active hosts missing from worker map | 58 57 | metrics server | 1 | 8 MB | HTTP on internal port, prometheus scrape | 59 58 | main thread | 1 | default | signal handling, shutdown coordination | 60 59 ··· 128 127 - `account` — uid, did, status, upstream_status, host_id 129 128 - `account_repo` — uid, rev, commit_data_cid (latest repo state) 130 129 - `host` — id, hostname, status, last_seq, failed_attempts 131 - - status lifecycle: `active` → `dormant` (after sustained failures, thread exits, DB row persists) → `active` (when discovery re-activates via requestCrawl). `blocked`/`banned` via admin. `idle` on FutureCursor. 132 130 - `log_file_refs` — seq→file mapping for cursor binary search 133 131 - `domain_ban` — banned domain suffixes 134 132 - `backfill_progress` — collection backfill cursor tracking ··· 223 221 - new accounts trigger async DID doc verification; on mismatch → rejected 224 222 - signature failures trigger key eviction + re-resolution (sync spec guidance) 225 223 - next commit from the same DID hits the refreshed cache 226 - 227 - ### host retention model 228 - 229 - indigo gives up on a host after 15 consecutive dial failures (~3 min) 230 - and marks it `offline`, relying on external `requestCrawl` to recover. 231 - zlay retries with exponential backoff capped at 30 minutes (vs indigo's 232 - 60s cap). backoff resets to 1 second on any successful connection. after 233 - 15 consecutive failures, the host is marked `dormant` — the worker 234 - thread exits (freeing resources) but the DB row persists. a 235 - reconciliation loop (every 5 min) respawns any active host that lost 236 - its worker to a bug or restart. dormant hosts are re-activated by the 237 - discovery cron (requestCrawl), which can now be lightweight and frequent 238 - since it only needs to handle genuinely new hosts + dormant recovery. 239 224 240 225 ### consumer buffer sizing 241 226
+1 -1
src/api/xrpc.zig
··· 582 582 583 583 const status = if (std.mem.eql(u8, raw_status, "blocked")) 584 584 "banned" 585 - else if (std.mem.eql(u8, raw_status, "exhausted") or std.mem.eql(u8, raw_status, "dormant")) 585 + else if (std.mem.eql(u8, raw_status, "exhausted")) 586 586 "offline" 587 587 else 588 588 raw_status;
+2 -6
src/host_ops.zig
··· 281 281 .increment_failures => { 282 282 const failures = self.persist.incrementHostFailures(op.host_id) catch 0; 283 283 if (failures >= self.max_consecutive_failures) { 284 - // mark dormant and stop the subscriber thread — free resources. 285 - // the DB row persists so discovery can re-activate the host later. 286 - log.warn("host_ops: host_id={d} dormant after {d} failures, stopping worker", .{ op.host_id, failures }); 287 - self.persist.updateHostStatus(op.host_id, "dormant") catch {}; 284 + log.warn("host_ops: host_id={d} exhausted after {d} failures", .{ op.host_id, failures }); 285 + self.persist.updateHostStatus(op.host_id, "exhausted") catch {}; 288 286 op.payload.host_shutdown.store(true, .release); 289 287 } 290 288 }, ··· 292 290 self.persist.resetHostFailures(op.host_id) catch |err| { 293 291 log.debug("host_ops: reset failures failed for host_id={d}: {s}", .{ op.host_id, @errorName(err) }); 294 292 }; 295 - // if host was dormant, flip back to active on successful reconnect 296 - self.persist.updateHostStatus(op.host_id, "active") catch {}; 297 293 }, 298 294 .update_status => { 299 295 self.persist.updateHostStatus(op.host_id, op.payload.status.slice()) catch |err| {
+5 -89
src/slurper.zig
··· 5 5 //! - spawning/stopping subscriber workers 6 6 //! - processing crawl requests (adding new hosts) 7 7 //! - host validation (format, domain ban, describeServer, relay loop detection) 8 - //! - tracking host lifecycle (active → dormant, with persistent retry) 9 - //! - periodic reconciliation of DB host table vs running workers 8 + //! - tracking host lifecycle (active → exhausted → blocked) 10 9 //! 11 10 //! all downstream components (Broadcaster, DiskPersist, Validator) are 12 11 //! thread-safe for N concurrent producers, so this just orchestrates. ··· 254 253 // background tasks 255 254 startup_future: ?Io.Future(void) = null, 256 255 crawl_future: ?Io.Future(void) = null, 257 - reconcile_future: ?Io.Future(void) = null, 258 256 259 257 io: Io, 260 258 /// dedicated Threaded io for the frame worker pool — safe from plain OS threads ··· 304 302 // pullHosts + listActiveHosts + spawnWorker all happen in the background thread. 305 303 self.startup_future = try self.io.concurrent(spawnWorkers, .{self}); 306 304 self.crawl_future = try self.io.concurrent(processCrawlQueue, .{self}); 307 - self.reconcile_future = try self.io.concurrent(reconcileHosts, .{self}); 308 305 } 309 306 310 307 /// pull PDS host list from the seed relay's com.atproto.sync.listHosts endpoint. ··· 530 527 reset_req.base.wait(self.io, self.shutdown); 531 528 532 529 // phase 5: spawn worker (Evented) 533 - try self.spawnWorker(db_req.host_id, hostname, db_req.last_seq, 0); 530 + try self.spawnWorker(db_req.host_id, hostname, db_req.last_seq); 534 531 log.info("added host {s} (id={d})", .{ hostname, db_req.host_id }); 535 532 } 536 533 537 - /// spawn a subscriber thread for a host. 538 - /// startup_jitter_ms: random delay before first connect (0 = no jitter). 539 - fn spawnWorker(self: *Slurper, host_id: u64, hostname: []const u8, last_seq: u64, startup_jitter_ms: u32) !void { 534 + /// spawn a subscriber thread for a host 535 + fn spawnWorker(self: *Slurper, host_id: u64, hostname: []const u8, last_seq: u64) !void { 540 536 const hostname_duped = try self.allocator.dupe(u8, hostname); 541 537 errdefer self.allocator.free(hostname_duped); 542 538 ··· 587 583 sub.cursor_slot = cm.register(host_id, last_seq); 588 584 } 589 585 if (last_seq > 0) sub.last_upstream_seq = last_seq; 590 - sub.startup_jitter_ms = startup_jitter_ms; 591 586 592 587 const future = try self.io.concurrent(runWorker, .{ self, host_id, sub }); 593 588 ··· 672 667 else 673 668 hosts.len; // 0 = unlimited 674 669 675 - // spread initial connections across a 30-second window to avoid 676 - // DNS/TLS handshake storm that starves health probes during startup. 677 - // each host gets a deterministic jitter based on its ID. 678 - const jitter_window_ms: u32 = 30_000; 679 - 680 670 var spawned: usize = 0; 681 671 for (hosts) |host| { 682 672 if (self.shutdown.load(.acquire)) break; 683 - const jitter: u32 = @intCast((host.id *% 2654435761) % jitter_window_ms); 684 - self.spawnWorker(host.id, host.hostname, host.last_seq, jitter) catch |err| { 673 + self.spawnWorker(host.id, host.hostname, host.last_seq) catch |err| { 685 674 log.warn("failed to spawn worker for {s}: {s}", .{ host.hostname, @errorName(err) }); 686 675 }; 687 676 spawned += 1; ··· 720 709 } 721 710 } 722 711 723 - /// background fiber: periodically reconcile DB host table with running workers. 724 - /// respawns any active host that doesn't have a running subscriber — catches 725 - /// hosts lost to bugs, restarts, or other gaps. dormant hosts are excluded; 726 - /// they wait for discovery (requestCrawl) to flip them back to active. 727 - fn reconcileHosts(self: *Slurper) void { 728 - const interval_secs: u64 = 5 * 60; // 5 minutes 729 - 730 - while (!self.shutdown.load(.acquire)) { 731 - // sleep in 1-second increments so we can check shutdown 732 - var remaining: u64 = interval_secs; 733 - while (remaining > 0 and !self.shutdown.load(.acquire)) { 734 - self.io.sleep(Io.Duration.fromSeconds(1), .awake) catch return; 735 - remaining -= 1; 736 - } 737 - if (self.shutdown.load(.acquire)) return; 738 - 739 - const db_queue = self.db_queue orelse continue; 740 - 741 - // query active hosts only — dormant hosts wait for discovery 742 - const ListActiveReq = struct { 743 - base: event_log_mod.DbRequest = .{ .callback = &execute }, 744 - alloc: Allocator, 745 - result: ?[]event_log_mod.DiskPersist.Host = null, 746 - 747 - fn execute(b: *event_log_mod.DbRequest, dp: *event_log_mod.DiskPersist) void { 748 - const s: *@This() = @fieldParentPtr("base", b); 749 - s.result = dp.listActiveHosts(s.alloc) catch |e| { 750 - b.err = e; 751 - return; 752 - }; 753 - } 754 - }; 755 - var list_req: ListActiveReq = .{ .alloc = self.allocator }; 756 - db_queue.push(&list_req.base); 757 - list_req.base.wait(self.io, self.shutdown); 758 - 759 - if (list_req.base.err != null or list_req.result == null) continue; 760 - 761 - const hosts = list_req.result.?; 762 - defer { 763 - for (hosts) |h| { 764 - self.allocator.free(h.hostname); 765 - self.allocator.free(h.status); 766 - } 767 - self.allocator.free(hosts); 768 - } 769 - 770 - var respawned: usize = 0; 771 - for (hosts) |host| { 772 - if (self.shutdown.load(.acquire)) break; 773 - 774 - const missing = blk: { 775 - self.workers_mutex.lockUncancelable(self.io); 776 - defer self.workers_mutex.unlock(self.io); 777 - break :blk !self.workers.contains(host.id); 778 - }; 779 - 780 - if (missing) { 781 - self.spawnWorker(host.id, host.hostname, host.last_seq, 0) catch |err| { 782 - log.warn("reconcile: failed to respawn {s}: {s}", .{ host.hostname, @errorName(err) }); 783 - continue; 784 - }; 785 - respawned += 1; 786 - } 787 - } 788 - 789 - if (respawned > 0) { 790 - log.info("reconcile: respawned {d} host(s)", .{respawned}); 791 - } 792 - } 793 - } 794 - 795 712 /// number of active workers 796 713 pub fn workerCount(self: *Slurper) usize { 797 714 self.workers_mutex.lockUncancelable(self.io); ··· 820 737 if (self.startup_future) |*f| f.cancel(self.io); 821 738 self.crawl_cond.signal(self.io); 822 739 if (self.crawl_future) |*f| f.cancel(self.io); 823 - if (self.reconcile_future) |*f| f.cancel(self.io); 824 740 825 741 // collect futures to cancel (can't cancel while holding workers_mutex) 826 742 var futures_to_cancel: std.ArrayListUnmanaged(Io.Future(void)) = .empty;
+8 -17
src/subscriber.zig
··· 219 219 shutdown: *std.atomic.Value(bool), 220 220 last_upstream_seq: ?u64 = null, 221 221 last_cursor_flush: i64 = 0, 222 - backoff: u64 = 1, 223 - /// one-time random delay (ms) before first connect — spreads TLS storm at startup 224 - startup_jitter_ms: u32 = 0, 225 222 rate_limiter: RateLimiter = .{}, 226 223 227 224 // per-host shutdown (e.g. FutureCursor — stops only this subscriber) ··· 259 256 return self.shutdown.load(.acquire) or self.host_shutdown.load(.acquire); 260 257 } 261 258 262 - /// run the subscriber loop. reconnects with exponential backoff (cap 30 min). 263 - /// blocks until shutdown or host marked dormant. on dormancy the thread exits 264 - /// and frees resources — the DB row persists so discovery can re-activate later. 259 + /// run the subscriber loop. reconnects with exponential backoff. 260 + /// blocks until shutdown flag is set or host is exhausted. 265 261 pub fn run(self: *Subscriber) void { 266 - const max_backoff: u64 = 1800; // 30 minutes 262 + var backoff: u64 = 1; 263 + const max_backoff: u64 = 60; 267 264 268 265 // cursor is set at spawn time by slurper 269 266 if (self.last_upstream_seq) |seq| { 270 267 log.info("host {s}: resuming from cursor {d}", .{ self.options.hostname, seq }); 271 268 } 272 269 273 - // one-time startup jitter — spread DNS/TLS storm across a window 274 - if (self.startup_jitter_ms > 0 and !self.shouldStop()) { 275 - self.io.sleep(Io.Duration.fromMilliseconds(self.startup_jitter_ms), .awake) catch {}; 276 - } 277 - 278 270 while (!self.shouldStop()) { 279 271 log.info("host {s}: connecting...", .{self.options.hostname}); 280 272 281 273 self.connectAndRead() catch |err| { 282 274 if (self.shouldStop()) return; 283 - log.err("host {s}: error: {s}, reconnecting in {d}s...", .{ self.options.hostname, @errorName(err), self.backoff }); 275 + log.err("host {s}: error: {s}, reconnecting in {d}s...", .{ self.options.hostname, @errorName(err), backoff }); 284 276 }; 285 277 286 278 if (self.shouldStop()) return; ··· 297 289 } 298 290 299 291 // backoff sleep in small increments so we can check shutdown 300 - var remaining: u64 = self.backoff; 292 + var remaining: u64 = backoff; 301 293 while (remaining > 0 and !self.shouldStop()) { 302 294 const chunk = @min(remaining, 1); 303 295 self.io.sleep(Io.Duration.fromSeconds(@intCast(chunk)), .awake) catch {}; 304 296 remaining -= chunk; 305 297 } 306 - self.backoff = @min(self.backoff * 2, max_backoff); 298 + backoff = @min(backoff * 2, max_backoff); 307 299 } 308 300 } 309 301 ··· 353 345 try client.handshake(path, .{ .headers = host_header }); 354 346 log.info("host {s}: connected", .{self.options.hostname}); 355 347 356 - // reset failures + backoff on successful connect 357 - self.backoff = 1; 348 + // reset failures on successful connect (via host_ops queue) 358 349 if (self.options.host_id > 0) { 359 350 if (self.host_ops) |hq| { 360 351 hq.push(.{