atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix startup deadlock: run resyncer on plain thread, not Evented fiber

the previous fix (6674812) correctly identified that Evented Io.Mutex
from a plain thread causes SIGSEGV, but the fix used Threaded futex
from within an Evented fiber. Threaded futexWait blocks the Uring OS
thread, preventing it from processing io_uring completions for other
fibers — deadlocking the event loop during CA bundle scan.

fix: the resyncer now runs entirely on pool_io (Threaded) via a plain
std.Thread. no Evented io involvement at all. this is correct because:
- enqueue() from frame workers: Threaded futex on plain thread ✓
- dequeue() in worker: Threaded futex on plain thread ✓
- HTTP client: blocking I/O on plain thread ✓

the fundamental constraint: Io.Mutex cannot be shared across Io types.
Threaded futex on Evented fiber → blocks Uring thread → deadlock.
Evented futex on plain thread → NULL threadlocal → SIGSEGV.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz 439c678a 66748127

+29 -33
+5 -3
src/main.zig
··· 251 251 var cleaner = cleaner_mod.Cleaner.init(allocator, io, &ci, dp.db); 252 252 253 253 // init resyncer (updates collection index on #sync events) 254 - // queue_io = pool_io: enqueue() is called from frame worker threads (plain std.Thread), 255 - // so mutex/cond operations must use Threaded io, not Evented. 256 - var resyncer = resync_mod.Resyncer.init(allocator, io, pool_io, &ci); 254 + // runs entirely on pool_io (Threaded) — enqueue() is called from frame worker 255 + // threads, and the background worker is a plain std.Thread. MUST NOT use 256 + // Evented io (Threaded futex on Evented fiber blocks Uring thread → deadlock; 257 + // Evented futex on plain thread → NULL threadlocal → SIGSEGV). 258 + var resyncer = resync_mod.Resyncer.init(allocator, pool_io, &ci); 257 259 try resyncer.start(); 258 260 defer resyncer.deinit(); 259 261
+24 -30
src/resync.zig
··· 5 5 //! describeRepo from the PDS to get the current collection list, then 6 6 //! replaces the index entries for that DID. 7 7 //! 8 - //! modeled on cleaner.zig — bounded queue, single background worker thread. 8 + //! runs entirely on pool_io (Threaded) — enqueue() is called from frame worker 9 + //! threads (plain std.Thread), and the background worker is also a plain thread. 10 + //! MUST NOT use Evented io: Threaded futex on an Evented fiber blocks the Uring 11 + //! thread, and Evented futex on a plain thread dereferences a NULL threadlocal. 9 12 10 13 const std = @import("std"); 11 14 const Io = std.Io; ··· 34 37 35 38 pub const Resyncer = struct { 36 39 allocator: Allocator, 40 + /// pool_io (Threaded) — used for all synchronization AND the HTTP client. 41 + /// this struct must never touch the Evented io. 37 42 io: Io, 38 - /// pool_io (Threaded) for queue synchronization — enqueue() is called from 39 - /// frame worker threads (plain std.Thread), so mutex/cond ops must use an Io 40 - /// whose futexWait/futexWake work outside of Io.Uring fibers. 41 - queue_io: Io, 42 43 collection_index: *collection_index_mod.CollectionIndex, 43 44 44 45 // bounded ring buffer queue ··· 50 51 cond: Io.Condition, 51 52 52 53 running: std.atomic.Value(bool), 53 - future: ?Io.Future(void), 54 + thread: ?std.Thread, 54 55 55 56 // stats 56 57 processed: std.atomic.Value(u64), ··· 60 61 pub fn init( 61 62 allocator: Allocator, 62 63 io: Io, 63 - queue_io: Io, 64 64 collection_index: *collection_index_mod.CollectionIndex, 65 65 ) Resyncer { 66 66 return .{ 67 67 .allocator = allocator, 68 68 .io = io, 69 - .queue_io = queue_io, 70 69 .collection_index = collection_index, 71 70 .queue = undefined, 72 71 .head = 0, ··· 75 74 .mutex = Io.Mutex.init, 76 75 .cond = Io.Condition.init, 77 76 .running = .{ .raw = false }, 78 - .future = null, 77 + .thread = null, 79 78 .processed = .{ .raw = 0 }, 80 79 .failed = .{ .raw = 0 }, 81 80 .dropped = .{ .raw = 0 }, ··· 85 84 /// start the background worker thread. 86 85 pub fn start(self: *Resyncer) !void { 87 86 self.running.store(true, .release); 88 - self.future = try self.io.concurrent(run, .{self}); 87 + self.thread = try std.Thread.spawn(.{}, run, .{self}); 89 88 } 90 89 91 90 /// enqueue a DID for resync. non-blocking, drops if queue full or inputs too long. 92 - /// called from frame worker threads (plain std.Thread) — uses queue_io (Threaded). 91 + /// called from frame worker threads (plain std.Thread). 93 92 pub fn enqueue(self: *Resyncer, did: []const u8, hostname: []const u8) void { 94 93 if (did.len == 0 or did.len > 128 or hostname.len == 0 or hostname.len > 256) return; 95 94 96 - self.mutex.lockUncancelable(self.queue_io); 97 - defer self.mutex.unlock(self.queue_io); 95 + self.mutex.lockUncancelable(self.io); 96 + defer self.mutex.unlock(self.io); 98 97 99 98 if (self.len >= queue_capacity) { 100 99 _ = self.dropped.fetchAdd(1, .monotonic); ··· 113 112 self.queue[self.tail] = item; 114 113 self.tail = (self.tail + 1) % queue_capacity; 115 114 self.len += 1; 116 - self.cond.signal(self.queue_io); 115 + self.cond.signal(self.io); 117 116 } 118 117 119 118 /// dequeue one item. blocks until available or shutdown. 120 - /// runs in the Evented fiber (run()), but uses queue_io for mutex/cond 121 - /// since the mutex is shared with enqueue() which runs on plain threads. 122 119 fn dequeue(self: *Resyncer) ?ResyncItem { 123 - self.mutex.lockUncancelable(self.queue_io); 124 - defer self.mutex.unlock(self.queue_io); 120 + self.mutex.lockUncancelable(self.io); 121 + defer self.mutex.unlock(self.io); 125 122 126 123 while (self.len == 0 and self.running.load(.acquire)) { 127 - self.cond.waitUncancelable(self.queue_io, &self.mutex); 124 + self.cond.waitUncancelable(self.io, &self.mutex); 128 125 } 129 126 130 127 if (self.len == 0) return null; ··· 225 222 } 226 223 227 224 pub fn queueDepth(self: *Resyncer) usize { 228 - self.mutex.lockUncancelable(self.queue_io); 229 - defer self.mutex.unlock(self.queue_io); 225 + self.mutex.lockUncancelable(self.io); 226 + defer self.mutex.unlock(self.io); 230 227 return self.len; 231 228 } 232 229 233 230 pub fn stop(self: *Resyncer) void { 234 231 self.running.store(false, .release); 235 - self.cond.signal(self.queue_io); 232 + self.cond.signal(self.io); 236 233 } 237 234 238 235 pub fn deinit(self: *Resyncer) void { 239 236 self.stop(); 240 - if (self.future) |*f| f.cancel(self.io); 241 - self.future = null; 237 + if (self.thread) |t| t.join(); 238 + self.thread = null; 242 239 } 243 240 244 241 const DescribeRepoResponse = struct { ··· 253 250 var r: Resyncer = .{ 254 251 .allocator = std.testing.allocator, 255 252 .io = std.testing.io, 256 - .queue_io = std.testing.io, 257 253 .collection_index = undefined, // not used in this test 258 254 .queue = undefined, 259 255 .head = 0, ··· 262 258 .mutex = Io.Mutex.init, 263 259 .cond = Io.Condition.init, 264 260 .running = .{ .raw = true }, 265 - .future = null, 261 + .thread = null, 266 262 .processed = .{ .raw = 0 }, 267 263 .failed = .{ .raw = 0 }, 268 264 .dropped = .{ .raw = 0 }, ··· 282 278 var r: Resyncer = .{ 283 279 .allocator = std.testing.allocator, 284 280 .io = std.testing.io, 285 - .queue_io = std.testing.io, 286 281 .collection_index = undefined, 287 282 .queue = undefined, 288 283 .head = 0, ··· 291 286 .mutex = Io.Mutex.init, 292 287 .cond = Io.Condition.init, 293 288 .running = .{ .raw = true }, 294 - .future = null, 289 + .thread = null, 295 290 .processed = .{ .raw = 0 }, 296 291 .failed = .{ .raw = 0 }, 297 292 .dropped = .{ .raw = 0 }, ··· 305 300 var r: Resyncer = .{ 306 301 .allocator = std.testing.allocator, 307 302 .io = std.testing.io, 308 - .queue_io = std.testing.io, 309 303 .collection_index = undefined, 310 304 .queue = undefined, 311 305 .head = 0, ··· 314 308 .mutex = Io.Mutex.init, 315 309 .cond = Io.Condition.init, 316 310 .running = .{ .raw = true }, 317 - .future = null, 311 + .thread = null, 318 312 .processed = .{ .raw = 0 }, 319 313 .failed = .{ .raw = 0 }, 320 314 .dropped = .{ .raw = 0 },