atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

stagger startup connections across 30s window to survive health probes

all ~2,800 subscriber threads were starting DNS+TLS handshakes
simultaneously during startup, starving the HTTP server thread and
causing kubelet to kill the pod on liveness probe timeout.

each subscriber now sleeps a deterministic jitter (0-30s, based on
host_id hash) before its first connection attempt. threads still
spawn quickly (50/batch, 100ms yield) but actual handshakes are
spread across a 30-second window instead of hitting all at once.

jitter only applies to startup — requestCrawl and reconciliation
spawn workers with zero jitter since they're one-at-a-time.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

zzstoatzz e5a2a14f 5d3a7b54

+20 -5
+13 -5
src/slurper.zig
··· 530 530 reset_req.base.wait(self.io, self.shutdown); 531 531 532 532 // phase 5: spawn worker (Evented) 533 - try self.spawnWorker(db_req.host_id, hostname, db_req.last_seq); 533 + try self.spawnWorker(db_req.host_id, hostname, db_req.last_seq, 0); 534 534 log.info("added host {s} (id={d})", .{ hostname, db_req.host_id }); 535 535 } 536 536 537 - /// spawn a subscriber thread for a host 538 - fn spawnWorker(self: *Slurper, host_id: u64, hostname: []const u8, last_seq: u64) !void { 537 + /// spawn a subscriber thread for a host. 538 + /// startup_jitter_ms: random delay before first connect (0 = no jitter). 539 + fn spawnWorker(self: *Slurper, host_id: u64, hostname: []const u8, last_seq: u64, startup_jitter_ms: u32) !void { 539 540 const hostname_duped = try self.allocator.dupe(u8, hostname); 540 541 errdefer self.allocator.free(hostname_duped); 541 542 ··· 586 587 sub.cursor_slot = cm.register(host_id, last_seq); 587 588 } 588 589 if (last_seq > 0) sub.last_upstream_seq = last_seq; 590 + sub.startup_jitter_ms = startup_jitter_ms; 589 591 590 592 const future = try self.io.concurrent(runWorker, .{ self, host_id, sub }); 591 593 ··· 670 672 else 671 673 hosts.len; // 0 = unlimited 672 674 675 + // spread initial connections across a 30-second window to avoid 676 + // DNS/TLS handshake storm that starves health probes during startup. 677 + // each host gets a deterministic jitter based on its ID. 678 + const jitter_window_ms: u32 = 30_000; 679 + 673 680 var spawned: usize = 0; 674 681 for (hosts) |host| { 675 682 if (self.shutdown.load(.acquire)) break; 676 - self.spawnWorker(host.id, host.hostname, host.last_seq) catch |err| { 683 + const jitter: u32 = @intCast((host.id *% 2654435761) % jitter_window_ms); 684 + self.spawnWorker(host.id, host.hostname, host.last_seq, jitter) catch |err| { 677 685 log.warn("failed to spawn worker for {s}: {s}", .{ host.hostname, @errorName(err) }); 678 686 }; 679 687 spawned += 1; ··· 770 778 }; 771 779 772 780 if (missing) { 773 - self.spawnWorker(host.id, host.hostname, host.last_seq) catch |err| { 781 + self.spawnWorker(host.id, host.hostname, host.last_seq, 0) catch |err| { 774 782 log.warn("reconcile: failed to respawn {s}: {s}", .{ host.hostname, @errorName(err) }); 775 783 continue; 776 784 };
+7
src/subscriber.zig
··· 220 220 last_upstream_seq: ?u64 = null, 221 221 last_cursor_flush: i64 = 0, 222 222 backoff: u64 = 1, 223 + /// one-time random delay (ms) before first connect — spreads TLS storm at startup 224 + startup_jitter_ms: u32 = 0, 223 225 rate_limiter: RateLimiter = .{}, 224 226 225 227 // per-host shutdown (e.g. FutureCursor — stops only this subscriber) ··· 266 268 // cursor is set at spawn time by slurper 267 269 if (self.last_upstream_seq) |seq| { 268 270 log.info("host {s}: resuming from cursor {d}", .{ self.options.hostname, seq }); 271 + } 272 + 273 + // one-time startup jitter — spread DNS/TLS storm across a window 274 + if (self.startup_jitter_ms > 0 and !self.shouldStop()) { 275 + self.io.sleep(Io.Duration.fromMilliseconds(self.startup_jitter_ms), .awake) catch {}; 269 276 } 270 277 271 278 while (!self.shouldStop()) {