atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

subscriber: add 30s keepalive pings to upstream WebSocket connections

without periodic pings, intermediate proxies (e.g. Cloudflare) kill
idle WebSocket connections after ~100s. this causes the subscriber to
disconnect after catching up on backlog, then reconnect via backoff —
matching the behavior reported by PDS operators behind Cloudflare.

spawns a ping thread per connection (same pattern as indigo relay's
consumer.go goroutine): 30s interval, closes after 4 consecutive
failures.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+42
+42
src/subscriber.zig
··· 21 21 22 22 const max_consecutive_failures = 15; 23 23 const cursor_flush_interval_sec = 4; // flush cursor to DB every N seconds (Go relay: 4s) 24 + const ping_interval_sec = 30; // keepalive ping interval (Go relay: 30s) 25 + const max_ping_failures = 4; // close connection after N consecutive ping failures (Go relay: 4) 24 26 25 27 // per-host rate limits — backpressure thresholds (matches indigo: 50/s, 2500/hr, 20k/day) 26 28 // blocking instead of dropping means these trigger TCP backpressure on the upstream PDS ··· 328 330 var handler = FrameHandler{ 329 331 .subscriber = self, 330 332 }; 333 + 334 + // spawn keepalive ping thread (Go relay: 30s ticker goroutine in consumer.go) 335 + // prevents intermediate proxies (e.g. Cloudflare) from killing idle connections 336 + const ping_thread = std.Thread.spawn( 337 + .{ .stack_size = @import("main.zig").default_stack_size }, 338 + pingLoop, 339 + .{ &client, self }, 340 + ) catch |err| { 341 + log.warn("host {s}: failed to spawn ping thread: {s}", .{ self.options.hostname, @errorName(err) }); 342 + return err; 343 + }; 344 + 345 + defer ping_thread.join(); 331 346 try client.readLoop(&handler); 347 + } 348 + 349 + /// periodic WebSocket ping to keep the connection alive. 350 + /// matches indigo relay: 30s interval, close after 4 consecutive failures. 351 + fn pingLoop(client: *websocket.Client, self: *Subscriber) void { 352 + var fail_count: u32 = 0; 353 + while (!self.shouldStop()) { 354 + // sleep in 1s increments so we can check shutdown 355 + var elapsed: u32 = 0; 356 + while (elapsed < ping_interval_sec and !self.shouldStop()) { 357 + std.posix.nanosleep(1, 0); 358 + elapsed += 1; 359 + } 360 + if (self.shouldStop()) return; 361 + 362 + client.writeFrame(.ping, &.{}) catch { 363 + fail_count += 1; 364 + log.warn("host {s}: ping failed ({d}/{d})", .{ self.options.hostname, fail_count, max_ping_failures }); 365 + if (fail_count >= max_ping_failures) { 366 + log.err("host {s}: too many ping failures, closing connection", .{self.options.hostname}); 367 + client.close(.{}) catch {}; 368 + return; 369 + } 370 + continue; 371 + }; 372 + fail_count = 0; 373 + } 332 374 } 333 375 }; 334 376