atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix connect gate: release permit after connectAndRead, not at loop end

the defer was scoped to the while loop body, holding the permit during
backoff sleep (up to 30 min). if 50 hosts fail simultaneously, all
permits lock for the backoff duration and no other host can connect.

now releases the permit immediately after connectAndRead returns
(success or failure). connected subscribers reading frames don't
count against the handshake limit.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+16 -5
+16 -5
src/subscriber.zig
··· 268 268 return self.shutdown.load(.acquire) or self.host_shutdown.load(.acquire); 269 269 } 270 270 271 + /// release connect gate permit (idempotent-safe: only decrements if gate exists) 272 + fn releaseConnectGate(self: *Subscriber) void { 273 + if (self.connect_gate) |gate| _ = gate.active.fetchSub(1, .monotonic); 274 + } 275 + 271 276 /// run the subscriber loop. reconnects with exponential backoff. 272 277 /// blocks until shutdown or host marked dormant. 273 278 pub fn run(self: *Subscriber) void { ··· 279 284 } 280 285 281 286 while (!self.shouldStop()) { 282 - // wait for connect permit — limits concurrent DNS/TLS handshakes 287 + // acquire connect permit — limits concurrent DNS/TLS handshakes. 288 + // released immediately after connectAndRead returns (success or failure) 289 + // so the permit is not held during backoff sleep. 283 290 if (self.connect_gate) |gate| { 284 291 while (gate.active.load(.acquire) >= gate.limit) { 285 292 if (self.shouldStop()) return; ··· 287 294 } 288 295 _ = gate.active.fetchAdd(1, .monotonic); 289 296 } 290 - defer if (self.connect_gate) |gate| { 291 - _ = gate.active.fetchSub(1, .monotonic); 292 - }; 293 297 294 298 log.info("host {s}: connecting...", .{self.options.hostname}); 295 299 296 300 self.connectAndRead() catch |err| { 297 - if (self.shouldStop()) return; 301 + if (self.shouldStop()) { 302 + self.releaseConnectGate(); 303 + return; 304 + } 298 305 log.err("host {s}: error: {s}, reconnecting in {d}s...", .{ self.options.hostname, @errorName(err), self.backoff }); 299 306 }; 307 + // release permit after connectAndRead completes (success or failure) — 308 + // connected subscribers reading frames don't count against the limit, 309 + // and failed subscribers shouldn't hold permits during backoff sleep 310 + self.releaseConnectGate(); 300 311 301 312 if (self.shouldStop()) return; 302 313