atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix host retention: dormant hosts stop their thread, not just relabel

the previous change kept dormant subscriber threads running forever,
meaning thread count could only go up. dormant now correctly stops
the worker thread (freeing resources) while preserving the DB row
for discovery to re-activate later. reconciliation loop queries only
active hosts — dormant hosts wait for requestCrawl.

separated "don't forget the host" (DB row persists) from "don't stop
the thread" (thread exits on dormancy). removed unused
listReconnectableHosts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+25 -38
+10 -10
docs/design.md
··· 54 54 | flush thread | 1 | 8 MB | batched fsync of event log (100ms or 400 events) | 55 55 | GC thread | 1 | 8 MB | event log file cleanup every 10 minutes | 56 56 | crawl queue thread | 1 | 8 MB | process `requestCrawl` — validate hostname, describeServer, spawn worker | 57 - | reconcile thread | 1 | 8 MB | every 5 min: respawn workers for active/dormant hosts missing from worker map | 57 + | reconcile thread | 1 | 8 MB | every 5 min: respawn workers for active hosts missing from worker map | 58 58 | metrics server | 1 | 8 MB | HTTP on internal port, prometheus scrape | 59 59 | main thread | 1 | default | signal handling, shutdown coordination | 60 60 ··· 128 128 - `account` — uid, did, status, upstream_status, host_id 129 129 - `account_repo` — uid, rev, commit_data_cid (latest repo state) 130 130 - `host` — id, hostname, status, last_seq, failed_attempts 131 - - status lifecycle: `active` → `dormant` (after sustained failures, subscriber keeps retrying) → `active` (on successful reconnect). `blocked`/`banned` via admin. `idle` on FutureCursor. 131 + - status lifecycle: `active` → `dormant` (after sustained failures, thread exits, DB row persists) → `active` (when discovery re-activates via requestCrawl). `blocked`/`banned` via admin. `idle` on FutureCursor. 132 132 - `log_file_refs` — seq→file mapping for cursor binary search 133 133 - `domain_ban` — banned domain suffixes 134 134 - `backfill_progress` — collection backfill cursor tracking ··· 228 228 229 229 indigo gives up on a host after 15 consecutive dial failures (~3 min) 230 230 and marks it `offline`, relying on external `requestCrawl` to recover. 231 - zlay never gives up on a known host — subscribers retry with exponential 232 - backoff capped at 30 minutes, and backoff resets to 1 second on any 233 - successful connection. after sustained failure (15+ consecutive 234 - disconnects), the host is marked `dormant` for observability but the 235 - subscriber keeps retrying. a reconciliation loop (every 5 min) respawns 236 - any active/dormant host missing from the worker map, catching hosts lost 237 - to any gap. this eliminates the need for an external reconnect cron — 238 - the cron can be reduced to discovery-only (new hosts not yet in the DB). 231 + zlay retries with exponential backoff capped at 30 minutes (vs indigo's 232 + 60s cap). backoff resets to 1 second on any successful connection. after 233 + 15 consecutive failures, the host is marked `dormant` — the worker 234 + thread exits (freeing resources) but the DB row persists. a 235 + reconciliation loop (every 5 min) respawns any active host that lost 236 + its worker to a bug or restart. dormant hosts are re-activated by the 237 + discovery cron (requestCrawl), which can now be lightweight and frequent 238 + since it only needs to handle genuinely new hosts + dormant recovery. 239 239 240 240 ### consumer buffer sizing 241 241
+1 -15
src/event_log.zig
··· 702 702 } 703 703 704 704 fn listActiveHostsImpl(allocator: Allocator, db: *pg.Pool) ![]Host { 705 - return listHostsByStatusImpl(allocator, db, "status = 'active'"); 706 - } 707 - 708 - /// list hosts eligible for reconnection: active + dormant (Threaded pool). 709 - /// used by the reconciliation loop to respawn missing workers. 710 - pub fn listReconnectableHosts(self: *DiskPersist, allocator: Allocator) ![]Host { 711 - return listReconnectableHostsImpl(allocator, self.db); 712 - } 713 - 714 - fn listReconnectableHostsImpl(allocator: Allocator, db: *pg.Pool) ![]Host { 715 - return listHostsByStatusImpl(allocator, db, "status IN ('active', 'dormant')"); 716 - } 717 - 718 - fn listHostsByStatusImpl(allocator: Allocator, db: *pg.Pool, comptime where: []const u8) ![]Host { 719 705 var hosts: std.ArrayListUnmanaged(Host) = .empty; 720 706 errdefer { 721 707 for (hosts.items) |host| { ··· 726 712 } 727 713 728 714 var result = try db.query( 729 - "SELECT id, hostname, status, last_seq, failed_attempts, account_limit FROM host WHERE " ++ where ++ " ORDER BY id ASC", 715 + "SELECT id, hostname, status, last_seq, failed_attempts, account_limit FROM host WHERE status = 'active' ORDER BY id ASC", 730 716 .{}, 731 717 ); 732 718 defer result.deinit();
+4 -3
src/host_ops.zig
··· 281 281 .increment_failures => { 282 282 const failures = self.persist.incrementHostFailures(op.host_id) catch 0; 283 283 if (failures >= self.max_consecutive_failures) { 284 - // mark dormant for observability — subscriber keeps retrying. 285 - // reconciliation loop will also periodically re-check dormant hosts. 286 - log.warn("host_ops: host_id={d} marked dormant after {d} failures", .{ op.host_id, failures }); 284 + // mark dormant and stop the subscriber thread — free resources. 285 + // the DB row persists so discovery can re-activate the host later. 286 + log.warn("host_ops: host_id={d} dormant after {d} failures, stopping worker", .{ op.host_id, failures }); 287 287 self.persist.updateHostStatus(op.host_id, "dormant") catch {}; 288 + op.payload.host_shutdown.store(true, .release); 288 289 } 289 290 }, 290 291 .reset_failures => {
+7 -6
src/slurper.zig
··· 713 713 } 714 714 715 715 /// background fiber: periodically reconcile DB host table with running workers. 716 - /// respawns any active/dormant host that doesn't have a running subscriber. 717 - /// catches hosts lost to prior exhaustion, bugs, or any other gap. 716 + /// respawns any active host that doesn't have a running subscriber — catches 717 + /// hosts lost to bugs, restarts, or other gaps. dormant hosts are excluded; 718 + /// they wait for discovery (requestCrawl) to flip them back to active. 718 719 fn reconcileHosts(self: *Slurper) void { 719 720 const interval_secs: u64 = 5 * 60; // 5 minutes 720 721 ··· 729 730 730 731 const db_queue = self.db_queue orelse continue; 731 732 732 - // query reconnectable hosts (active + dormant) via DbRequestQueue 733 - const ListReconnectableReq = struct { 733 + // query active hosts only — dormant hosts wait for discovery 734 + const ListActiveReq = struct { 734 735 base: event_log_mod.DbRequest = .{ .callback = &execute }, 735 736 alloc: Allocator, 736 737 result: ?[]event_log_mod.DiskPersist.Host = null, 737 738 738 739 fn execute(b: *event_log_mod.DbRequest, dp: *event_log_mod.DiskPersist) void { 739 740 const s: *@This() = @fieldParentPtr("base", b); 740 - s.result = dp.listReconnectableHosts(s.alloc) catch |e| { 741 + s.result = dp.listActiveHosts(s.alloc) catch |e| { 741 742 b.err = e; 742 743 return; 743 744 }; 744 745 } 745 746 }; 746 - var list_req: ListReconnectableReq = .{ .alloc = self.allocator }; 747 + var list_req: ListActiveReq = .{ .alloc = self.allocator }; 747 748 db_queue.push(&list_req.base); 748 749 list_req.base.wait(self.io, self.shutdown); 749 750
+3 -4
src/subscriber.zig
··· 258 258 } 259 259 260 260 /// run the subscriber loop. reconnects with exponential backoff (cap 30 min). 261 - /// blocks until shutdown flag is set. never gives up on a known host — 262 - /// host_ops marks hosts dormant after sustained failure for observability, 263 - /// but the subscriber keeps retrying. 261 + /// blocks until shutdown or host marked dormant. on dormancy the thread exits 262 + /// and frees resources — the DB row persists so discovery can re-activate later. 264 263 pub fn run(self: *Subscriber) void { 265 264 const max_backoff: u64 = 1800; // 30 minutes 266 265 ··· 285 284 hq.push(.{ 286 285 .host_id = self.options.host_id, 287 286 .kind = .increment_failures, 288 - .payload = .{ .none = {} }, 287 + .payload = .{ .host_shutdown = &self.host_shutdown }, 289 288 }); 290 289 } 291 290 }