atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix SIGSEGV: plain threads calling Evented Io.Mutex → Uring NULL deref

root cause: Resyncer.enqueue() called from frame worker threads (plain
std.Thread) used Evented Io for mutex/cond ops. when contended,
futexWaitUncancelable enters the Uring fiber scheduler which calls
Thread.current() — a threadlocal only set on Uring threads. on plain
threads it's null, and ReleaseFast silently dereferences NULL at struct
field offsets (0x28, 0x30, 0x38) → SIGSEGV.

fix: add queue_io (pool_io/Threaded) to Resyncer for cross-thread
synchronization. Evented io kept for HTTP client and fiber spawning.

also fixes two consumer bugs:
- dropSlowConsumer spawned plain std.Thread that called Evented future
cancel → same NULL deref class. removed cleanup thread, deferred
destruction to Handler.close → removeConsumer.
- removeConsumer unconditionally decremented consumer count after
dropSlowConsumer already did → double-decrement. now only decrements
when consumer is found in the list.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+33 -20
+9 -10
src/broadcaster.zig
··· 502 502 for (self.consumers.items, 0..) |c, i| { 503 503 if (c == consumer) { 504 504 _ = self.consumers.swapRemove(i); 505 + // only decrement if we found it — dropSlowConsumer already 506 + // decremented for slow consumers removed during broadcast() 507 + _ = self.stats.consumer_count.fetchSub(1, .monotonic); 505 508 break; 506 509 } 507 510 } 508 511 } 509 512 consumer.shutdown(); 510 - _ = self.stats.consumer_count.fetchSub(1, .monotonic); 511 513 log.info("consumer disconnected ({d} remaining)", .{self.consumers.items.len}); 512 514 self.allocator.destroy(consumer); 513 515 } ··· 559 561 if (self.error_frame) |ef| { 560 562 consumer.conn.writeBin(ef) catch {}; 561 563 } 564 + // signal consumer to stop — writeLoop will exit, then Handler.close → 565 + // removeConsumer handles actual shutdown + destroy. 566 + // IMPORTANT: do NOT spawn a plain std.Thread for cleanup — it would call 567 + // Evented future cancel / mutex ops from a non-Uring thread → SIGSEGV. 562 568 consumer.alive.store(false, .release); 563 569 consumer.cond.signal(consumer.io); 564 570 consumer.conn.close(.{}) catch {}; 565 - // clean up asynchronously to avoid joining thread while holding mutex 566 - const alloc = self.allocator; 567 - const cleanup_thread = std.Thread.spawn(.{ .stack_size = 512 * 1024 }, struct { 568 - fn run(c: *Consumer, a: Allocator) void { 569 - c.shutdown(); 570 - a.destroy(c); 571 - } 572 - }.run, .{ consumer, alloc }) catch null; 573 - if (cleanup_thread) |t| t.detach(); 571 + // consumer is removed from list by broadcast() caller (swapRemove). 572 + // cleanup deferred to removeConsumer to avoid double-destroy. 574 573 } 575 574 576 575 /// two-phase cursor replay: disk (diskpersist) first, then in-memory ring buffer.
+3 -1
src/main.zig
··· 251 251 var cleaner = cleaner_mod.Cleaner.init(allocator, io, &ci, dp.db); 252 252 253 253 // init resyncer (updates collection index on #sync events) 254 - var resyncer = resync_mod.Resyncer.init(allocator, io, &ci); 254 + // queue_io = pool_io: enqueue() is called from frame worker threads (plain std.Thread), 255 + // so mutex/cond operations must use Threaded io, not Evented. 256 + var resyncer = resync_mod.Resyncer.init(allocator, io, pool_io, &ci); 255 257 try resyncer.start(); 256 258 defer resyncer.deinit(); 257 259
+21 -9
src/resync.zig
··· 35 35 pub const Resyncer = struct { 36 36 allocator: Allocator, 37 37 io: Io, 38 + /// pool_io (Threaded) for queue synchronization — enqueue() is called from 39 + /// frame worker threads (plain std.Thread), so mutex/cond ops must use an Io 40 + /// whose futexWait/futexWake work outside of Io.Uring fibers. 41 + queue_io: Io, 38 42 collection_index: *collection_index_mod.CollectionIndex, 39 43 40 44 // bounded ring buffer queue ··· 56 60 pub fn init( 57 61 allocator: Allocator, 58 62 io: Io, 63 + queue_io: Io, 59 64 collection_index: *collection_index_mod.CollectionIndex, 60 65 ) Resyncer { 61 66 return .{ 62 67 .allocator = allocator, 63 68 .io = io, 69 + .queue_io = queue_io, 64 70 .collection_index = collection_index, 65 71 .queue = undefined, 66 72 .head = 0, ··· 83 89 } 84 90 85 91 /// enqueue a DID for resync. non-blocking, drops if queue full or inputs too long. 92 + /// called from frame worker threads (plain std.Thread) — uses queue_io (Threaded). 86 93 pub fn enqueue(self: *Resyncer, did: []const u8, hostname: []const u8) void { 87 94 if (did.len == 0 or did.len > 128 or hostname.len == 0 or hostname.len > 256) return; 88 95 89 - self.mutex.lockUncancelable(self.io); 90 - defer self.mutex.unlock(self.io); 96 + self.mutex.lockUncancelable(self.queue_io); 97 + defer self.mutex.unlock(self.queue_io); 91 98 92 99 if (self.len >= queue_capacity) { 93 100 _ = self.dropped.fetchAdd(1, .monotonic); ··· 106 113 self.queue[self.tail] = item; 107 114 self.tail = (self.tail + 1) % queue_capacity; 108 115 self.len += 1; 109 - self.cond.signal(self.io); 116 + self.cond.signal(self.queue_io); 110 117 } 111 118 112 119 /// dequeue one item. blocks until available or shutdown. 120 + /// runs in the Evented fiber (run()), but uses queue_io for mutex/cond 121 + /// since the mutex is shared with enqueue() which runs on plain threads. 113 122 fn dequeue(self: *Resyncer) ?ResyncItem { 114 - self.mutex.lockUncancelable(self.io); 115 - defer self.mutex.unlock(self.io); 123 + self.mutex.lockUncancelable(self.queue_io); 124 + defer self.mutex.unlock(self.queue_io); 116 125 117 126 while (self.len == 0 and self.running.load(.acquire)) { 118 - self.cond.waitUncancelable(self.io, &self.mutex); 127 + self.cond.waitUncancelable(self.queue_io, &self.mutex); 119 128 } 120 129 121 130 if (self.len == 0) return null; ··· 216 225 } 217 226 218 227 pub fn queueDepth(self: *Resyncer) usize { 219 - self.mutex.lockUncancelable(self.io); 220 - defer self.mutex.unlock(self.io); 228 + self.mutex.lockUncancelable(self.queue_io); 229 + defer self.mutex.unlock(self.queue_io); 221 230 return self.len; 222 231 } 223 232 224 233 pub fn stop(self: *Resyncer) void { 225 234 self.running.store(false, .release); 226 - self.cond.signal(self.io); 235 + self.cond.signal(self.queue_io); 227 236 } 228 237 229 238 pub fn deinit(self: *Resyncer) void { ··· 244 253 var r: Resyncer = .{ 245 254 .allocator = std.testing.allocator, 246 255 .io = std.testing.io, 256 + .queue_io = std.testing.io, 247 257 .collection_index = undefined, // not used in this test 248 258 .queue = undefined, 249 259 .head = 0, ··· 272 282 var r: Resyncer = .{ 273 283 .allocator = std.testing.allocator, 274 284 .io = std.testing.io, 285 + .queue_io = std.testing.io, 275 286 .collection_index = undefined, 276 287 .queue = undefined, 277 288 .head = 0, ··· 294 305 var r: Resyncer = .{ 295 306 .allocator = std.testing.allocator, 296 307 .io = std.testing.io, 308 + .queue_io = std.testing.io, 297 309 .collection_index = undefined, 298 310 .queue = undefined, 299 311 .head = 0,