atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

add connect gate to limit concurrent DNS/TLS handshakes

shared semaphore (ConnectGate) bounds concurrent connection attempts
across all subscriber threads, preventing the DNS/TLS storm at startup
and during reconnect storms that starve the HTTP health probe.

configurable via MAX_CONCURRENT_CONNECTS env var (default 50). each
subscriber acquires a permit before connectAndRead() and releases it
on return (success or failure) via defer.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+31
+2
src/main.zig
··· 203 203 const frame_workers = parseEnvInt(u16, "FRAME_WORKERS", 16); 204 204 const frame_queue_capacity = parseEnvInt(u16, "FRAME_QUEUE_CAPACITY", 4096); 205 205 const startup_batch_size = parseEnvInt(u16, "STARTUP_BATCH_SIZE", 50); 206 + const max_concurrent_connects = parseEnvInt(u32, "MAX_CONCURRENT_CONNECTS", 50); 206 207 const db_pool_size = parseEnvInt(u16, "DB_POOL_SIZE", 20); 207 208 208 209 // install signal handlers (including SIGPIPE ignore) ··· 310 311 .frame_workers = frame_workers, 311 312 .frame_queue_capacity = frame_queue_capacity, 312 313 .startup_batch_size = startup_batch_size, 314 + .max_concurrent_connects = max_concurrent_connects, 313 315 }, 314 316 io, 315 317 pool_io,
+8
src/slurper.zig
··· 34 34 /// prevents TLS handshake storm from starving the event loop. 35 35 /// 0 = unlimited (legacy behavior). 36 36 startup_batch_size: u16 = 50, 37 + /// max concurrent DNS/TLS connection attempts across all subscriber threads. 38 + /// bounds both the initial startup ramp and reconnect storms. 39 + max_concurrent_connects: u32 = 50, 37 40 }; 38 41 39 42 // --- host validation --- ··· 241 244 // shared TLS CA bundle — loaded once, used by all subscriber connections 242 245 ca_bundle: ?std.crypto.Certificate.Bundle = null, 243 246 247 + // connect gate — limits concurrent DNS/TLS handshakes across all subscribers 248 + connect_gate: subscriber_mod.ConnectGate = .{ .limit = 50 }, 249 + 244 250 // active subscriber threads, keyed by host_id 245 251 workers: std.AutoHashMapUnmanaged(u64, WorkerEntry) = .empty, 246 252 workers_mutex: Io.Mutex = Io.Mutex.init, ··· 277 283 .options = options, 278 284 .io = io, 279 285 .pool_io = pool_io, 286 + .connect_gate = .{ .limit = options.max_concurrent_connects }, 280 287 }; 281 288 } 282 289 ··· 578 585 if (self.frame_pool) |*fp| sub.pool = fp; 579 586 sub.pool_io = self.pool_io; 580 587 sub.host_ops = self.host_ops; 588 + sub.connect_gate = &self.connect_gate; 581 589 if (self.cursor_map) |cm| { 582 590 sub.cursor_map = cm; 583 591 sub.cursor_slot = cm.register(host_id, last_seq);
+21
src/subscriber.zig
··· 199 199 } 200 200 }; 201 201 202 + /// semaphore that limits concurrent connection attempts across all subscriber threads. 203 + /// bounds the DNS/TLS storm during startup and reconnect storms. 204 + pub const ConnectGate = struct { 205 + active: std.atomic.Value(u32) = .{ .raw = 0 }, 206 + limit: u32, 207 + }; 208 + 202 209 pub const Subscriber = struct { 203 210 allocator: Allocator, 204 211 io: Io, ··· 216 223 /// coalescing cursor map — subscriber writes latest seq atomically, worker sweeps every 5s 217 224 cursor_map: ?*host_ops_mod.CursorMap = null, 218 225 cursor_slot: ?u32 = null, 226 + /// connect gate — limits concurrent DNS/TLS handshakes across all subscribers 227 + connect_gate: ?*ConnectGate = null, 219 228 shutdown: *std.atomic.Value(bool), 220 229 last_upstream_seq: ?u64 = null, 221 230 last_cursor_flush: i64 = 0, ··· 268 277 } 269 278 270 279 while (!self.shouldStop()) { 280 + // wait for connect permit — limits concurrent DNS/TLS handshakes 281 + if (self.connect_gate) |gate| { 282 + while (gate.active.load(.acquire) >= gate.limit) { 283 + if (self.shouldStop()) return; 284 + self.io.sleep(Io.Duration.fromMilliseconds(100), .awake) catch {}; 285 + } 286 + _ = gate.active.fetchAdd(1, .monotonic); 287 + } 288 + defer if (self.connect_gate) |gate| { 289 + _ = gate.active.fetchSub(1, .monotonic); 290 + }; 291 + 271 292 log.info("host {s}: connecting...", .{self.options.hostname}); 272 293 273 294 self.connectAndRead() catch |err| {