atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

throttle startup connection ramp to prevent event loop starvation

spawning ~2,800 TLS handshakes simultaneously starves the single
Evented event loop — health checks on both :3000 and :3001 time out,
liveness probe kills the pod. batch-spawn with io.sleep yields between
batches so the event loop stays responsive during ramp.

STARTUP_BATCH_SIZE env var (default 50), 100ms yield between batches.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+23 -4
+2
src/main.zig
··· 200 200 const max_events_gb = parseEnvInt(u64, "RELAY_MAX_EVENTS_GB", 100); 201 201 const frame_workers = parseEnvInt(u16, "FRAME_WORKERS", 16); 202 202 const frame_queue_capacity = parseEnvInt(u16, "FRAME_QUEUE_CAPACITY", 4096); 203 + const startup_batch_size = parseEnvInt(u16, "STARTUP_BATCH_SIZE", 50); 203 204 const db_pool_size = parseEnvInt(u16, "DB_POOL_SIZE", 20); 204 205 205 206 // install signal handlers (including SIGPIPE ignore) ··· 266 267 .max_message_size = 5 * 1024 * 1024, 267 268 .frame_workers = frame_workers, 268 269 .frame_queue_capacity = frame_queue_capacity, 270 + .startup_batch_size = startup_batch_size, 269 271 }, 270 272 io, 271 273 pool_io,
+21 -4
src/slurper.zig
··· 1 1 //! slurper — multi-host PDS crawl manager 2 2 //! 3 - //! manages one Subscriber thread per tracked PDS host. handles: 3 + //! manages one Subscriber fiber per tracked PDS host. handles: 4 4 //! - loading known hosts from DB on startup 5 5 //! - spawning/stopping subscriber workers 6 6 //! - processing crawl requests (adding new hosts) ··· 29 29 max_message_size: usize = 5 * 1024 * 1024, 30 30 frame_workers: u16 = 16, 31 31 frame_queue_capacity: u16 = 4096, 32 + /// max hosts to connect concurrently during initial startup ramp. 33 + /// prevents TLS handshake storm from starving the event loop. 34 + /// 0 = unlimited (legacy behavior). 35 + startup_batch_size: u16 = 50, 32 36 }; 33 37 34 38 // --- host validation --- ··· 510 514 self.allocator.destroy(sub); 511 515 } 512 516 513 - /// background thread: load hosts from DB and spawn all workers. 517 + /// background fiber: load hosts from DB and spawn all workers. 514 518 /// runs in background so HTTP server + probes come up immediately. 515 519 /// Go relay: ResubscribeAllHosts loops with 1ms sleep per host (goroutines). 516 - /// we spawn all at once — the brief memory spike from concurrent TLS handshakes 517 - /// is shorter than a throttled ramp (many hosts fail-fast, freeing memory quickly). 520 + /// we batch-spawn with yields between batches to keep the event loop responsive 521 + /// for health checks and metrics during the initial TLS handshake ramp. 518 522 fn spawnWorkers(self: *Slurper) void { 519 523 // pull hosts from seed relay first — idempotent (getOrCreateHost skips existing) 520 524 if (self.options.seed_host.len > 0) { ··· 538 542 self.allocator.free(hosts); 539 543 } 540 544 545 + const batch: usize = if (self.options.startup_batch_size > 0) 546 + self.options.startup_batch_size 547 + else 548 + hosts.len; // 0 = unlimited 549 + 550 + var spawned: usize = 0; 541 551 for (hosts) |host| { 542 552 if (self.shutdown.load(.acquire)) break; 543 553 self.spawnWorker(host.id, host.hostname) catch |err| { 544 554 log.warn("failed to spawn worker for {s}: {s}", .{ host.hostname, @errorName(err) }); 545 555 }; 556 + spawned += 1; 557 + 558 + // yield between batches so the event loop can service health checks 559 + if (spawned % batch == 0 and spawned < hosts.len) { 560 + log.info("startup: spawned {d}/{d} hosts, yielding...", .{ spawned, hosts.len }); 561 + self.io.sleep(Io.Duration.fromMilliseconds(100), .awake) catch break; 562 + } 546 563 } 547 564 548 565 log.info("startup complete: {d} host(s) spawned", .{hosts.len});