atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix socket-after-close panic in consumer drop path

ReleaseSafe caught an unreachable panic in posix.setsockopt when a
ConsumerTooSlow kick raced with the websocket server's readLoop thread.

the race: dropSlowConsumer called conn.close() from the broadcast thread
while readLoop (server thread) was about to call setsockopt on the same
socket fd. setsockopt got EBADF, zig stdlib hit unreachable → panic.
under ReleaseFast this was silent undefined behavior — likely existed on
every prior build.

fix: move conn.close() from dropSlowConsumer (broadcast thread) to the
end of writeLoop (consumer's own thread). writeLoop exits when alive is
set to false, drains remaining frames, then closes the connection. this
unblocks readLoop's pending read without racing on socket state.

also bump BUFFER_CAP 8192 → 65536 to reduce ConsumerTooSlow frequency
(cherry-pick of the ee4e368 change onto the current tree).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

zzstoatzz 4735725c e6cdf844

+13 -7
+13 -7
src/broadcaster.zig
··· 353 353 const ping_timeout_ns: u64 = 5 * std.time.ns_per_s; 354 354 355 355 pub const Consumer = struct { 356 - const BUFFER_CAP = 8192; 356 + const BUFFER_CAP = 65536; 357 357 358 358 conn: *websocket.Conn, 359 359 allocator: Allocator, ··· 427 427 defer f.release(); 428 428 self.conn.writeBin(f.data) catch { 429 429 self.alive.store(false, .release); 430 - return; 430 + break; 431 431 }; 432 432 self.last_send_time = Io.Timestamp.now(self.io, .real).nanoseconds; 433 433 } else { ··· 444 444 f.release(); 445 445 } else break; 446 446 } 447 + // close the connection from the writeLoop thread to unblock readLoop. 448 + // this MUST happen here, not from the broadcast thread — closing from 449 + // broadcast races with readLoop's setsockopt/read calls on the socket, 450 + // causing EBADF → unreachable panic under ReleaseSafe. 451 + self.conn.close(.{}) catch {}; 447 452 } 448 453 449 454 fn maybePing(self: *Consumer) void { ··· 622 627 if (self.error_frame) |ef| { 623 628 consumer.conn.writeBin(ef) catch {}; 624 629 } 625 - // signal consumer to stop — writeLoop will exit, then Handler.close → 626 - // removeConsumer handles actual shutdown + destroy. 627 - // IMPORTANT: do NOT spawn a plain std.Thread for cleanup — it would call 628 - // Evented future cancel / mutex ops from a non-Uring thread → SIGSEGV. 630 + // signal consumer to stop — writeLoop checks alive and exits, then 631 + // Handler.close → removeConsumer handles shutdown + destroy. 632 + // do NOT close the socket here: readLoop (websocket server thread) may 633 + // be calling setsockopt/read on it concurrently. closing from a third 634 + // thread causes EBADF → unreachable panic in posix.setsockopt under 635 + // ReleaseSafe. let writeLoop's exit trigger the natural close sequence. 629 636 consumer.alive.store(false, .release); 630 637 consumer.cond.signal(consumer.io); 631 - consumer.conn.close(.{}) catch {}; 632 638 // consumer is removed from list by broadcast() caller (swapRemove). 633 639 // cleanup deferred to removeConsumer to avoid double-destroy. 634 640 }