atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

clean up stale Evented-era comments across codebase

now that Backend is Io.Threaded, remove references to cross-Io
constraints, Evented fiber context requirements, and Uring thread
warnings that no longer apply. the historical context is preserved
in docs/evented-attempt.md and docs/notes.md.

no behavioral changes — comments only.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+33 -39
+2 -2
src/api/admin.zig
··· 3 3 //! all handlers require Bearer token auth against RELAY_ADMIN_PASSWORD. 4 4 //! includes host blocking/unblocking, account bans, and backfill control. 5 5 //! 6 - //! DB-accessing handlers use DbRequest + DbRequestQueue to route queries through 7 - //! pool_io (Threaded) workers. 6 + //! DB-accessing handlers use DbRequest + DbRequestQueue to route queries 7 + //! through pool_io workers. 8 8 9 9 const std = @import("std"); 10 10 const Io = std.Io;
+1 -1
src/api/xrpc.zig
··· 5 5 //! listHosts, getHostStatus, requestCrawl 6 6 //! 7 7 //! DB-accessing handlers use DbRequest + DbRequestQueue to route queries through 8 - //! pool_io (Threaded) workers, avoiding the broken Evented pg.Pool. 8 + //! pool_io workers. 9 9 10 10 const std = @import("std"); 11 11 const Io = std.Io;
+3 -3
src/broadcaster.zig
··· 646 646 /// disk playback runs on pool_io (Threaded) via request/reply — playback() 647 647 /// holds the DiskPersist mutex and reads files, all of which require Threaded Io. 648 648 pub fn replayTo(self: *Broadcaster, consumer: *Consumer, cursor: u64) void { 649 - // phase 1: disk replay from diskpersist via cross-Io request/reply 649 + // phase 1: disk replay from diskpersist via request/reply 650 650 if (self.persist) |dp| { 651 651 var req: event_log_mod.PlaybackRequest = .{ .since = cursor, .allocator = self.allocator }; 652 652 dp.enqueuePlayback(&req); 653 653 654 - // poll until pool_io worker completes the request (yields to Evented scheduler). 654 + // poll until pool_io worker completes the request. 655 655 // SAFETY: req is stack-local — we MUST wait for the worker to finish before 656 656 // returning, otherwise the stack frame unwinds while the worker still holds &req. 657 657 // if sleep fails (shutdown/io error), fall back to spin-wait. the host_ops worker ··· 720 720 return self.consumers.items.len; 721 721 } 722 722 723 - /// broadcast loop — runs as an Evented fiber, drains the broadcast queue 723 + /// broadcast loop — drains the broadcast queue, fans out to consumers 724 724 /// and calls broadcast() for each item. this is the only path that touches 725 725 /// consumers_mutex / consumer.mutex / consumer.cond. 726 726 pub fn runBroadcastLoop(self: *Broadcaster) void {
+5 -5
src/event_log.zig
··· 77 77 78 78 // --- disk persistence --- 79 79 80 - /// cross-Io playback request — Evented fiber posts, pool_io worker executes 80 + /// playback request — caller posts, pool_io worker executes 81 81 pub const PlaybackRequest = struct { 82 82 since: u64, 83 83 allocator: Allocator, ··· 87 87 next: std.atomic.Value(?*PlaybackRequest) = .{ .raw = null }, 88 88 }; 89 89 90 - /// cross-Io DB request — Evented fiber posts, pool_io worker executes. 90 + /// DB request — caller posts, pool_io worker executes. 91 91 /// callers define typed structs embedding DbRequest + @fieldParentPtr. 92 92 pub const DbRequest = struct { 93 93 callback: *const fn (*DbRequest, *DiskPersist) void, ··· 114 114 }; 115 115 116 116 /// MPSC FIFO ring buffer for general DB traffic. 117 - /// multiple producers (Evented fibers) push via spinlock, 117 + /// multiple producers push via spinlock, 118 118 /// multiple consumers (pool_io worker threads) pop via CAS. 119 119 pub const DbRequestQueue = struct { 120 120 const CAPACITY = 4096; ··· 210 210 io: Io, 211 211 212 212 /// last successful DB interaction (epoch seconds, set by Threaded workers). 213 - /// read by metrics server to report health without cross-Io pg.Pool access. 213 + /// read by metrics server to report health without direct pg.Pool access. 214 214 last_db_success: std.atomic.Value(i64) = .{ .raw = 0 }, 215 215 216 - /// MPSC queue for cross-Io playback requests (Evented → pool_io) 216 + /// MPSC queue for playback requests (caller → pool_io worker) 217 217 playback_head: std.atomic.Value(?*PlaybackRequest) = .{ .raw = null }, 218 218 219 219 /// current evtbuf entry count (for metrics — non-blocking, returns 0 if lock is contended)
+2 -2
src/host_ops.zig
··· 1 - //! host ops — cross-Io host bookkeeping without pg.Pool from Evented fibers 1 + //! host ops — background thread for host bookkeeping (cursor flush, failure tracking) 2 2 //! 3 3 //! two mechanisms: 4 4 //! ··· 236 236 drained += 1; 237 237 } 238 238 239 - // drain playback requests (cross-Io: Evented fibers post, we execute) 239 + // drain playback requests 240 240 drained += self.drainPlaybackRequests(); 241 241 242 242 // periodic cursor sweep
+15 -19
src/main.zig
··· 182 182 const io = backend.io(); 183 183 184 184 // dedicated Threaded runtime for the frame worker pool. 185 - // worker threads are plain std.Thread — they cannot use Evented io 186 - // (Evented futex calls ev.yield() which requires fiber context). 187 - // this io is used for: persist ordering mutex, timestamps, validator cache, 185 + // historically needed to isolate Threaded work from Evented fibers (cross-Io 186 + // crash class). now redundant since Backend is also Threaded, but harmless. 187 + // used for: persist ordering mutex, timestamps, validator cache, 188 188 // DID resolution HTTP, and thread pool internal sync. 189 189 var pool_io_backend = Io.Threaded.init(allocator, .{ 190 190 .stack_size = default_stack_size, ··· 227 227 dp.retention_hours = retention_hours; 228 228 dp.max_dir_bytes = max_events_gb * 1024 * 1024 * 1024; 229 229 230 - // DbRequestQueue — general DB traffic from Evented fibers routed to pool_io workers. 231 - // replaces the broken ev_db (Evented pg.Pool) approach. 230 + // DbRequestQueue — routes DB requests through pool_io worker threads. 231 + // originally needed to bridge Evented fibers to Threaded pg.Pool; now 232 + // redundant under all-Threaded but still functional. cleanup candidate. 232 233 var db_queue: event_log_mod.DbRequestQueue = .{ 233 234 .shutdown = &shutdown_flag, 234 235 .persist = &dp, ··· 273 274 var cleaner = cleaner_mod.Cleaner.init(allocator, pool_io, &ci, &dp, &shutdown_flag); 274 275 275 276 // init resyncer (updates collection index on #sync events) 276 - // runs entirely on pool_io (Threaded) — enqueue() is called from frame worker 277 - // threads, and the background worker is a plain std.Thread. MUST NOT use 278 - // Evented io (Threaded futex on Evented fiber blocks Uring thread → deadlock; 279 - // Evented futex on plain thread → NULL threadlocal → SIGSEGV). 277 + // runs on pool_io — enqueue() is called from frame worker threads, 278 + // background worker is a plain std.Thread. 280 279 var resyncer = resync_mod.Resyncer.init(allocator, pool_io, &ci); 281 280 try resyncer.start(); 282 281 defer resyncer.deinit(); 283 282 284 - // init cursor map + host ops queue — subscriber fibers (Evented) write 285 - // cursor seqs to the coalescing map (atomic store, no lock) and push rare 286 - // ops (failures, status) to the MPSC queue. a background thread (Threaded) 287 - // sweeps cursors every 5s and drains rare ops immediately. 283 + // init cursor map + host ops queue — subscriber threads write cursor seqs 284 + // to the coalescing map (atomic store, no lock) and push rare ops (failures, 285 + // status) to the MPSC queue. a background thread sweeps cursors every 5s 286 + // and drains rare ops immediately. 288 287 var cursor_map: host_ops_mod.CursorMap = .{}; 289 288 var host_ops_queue: host_ops_mod.HostOpsQueue = .{ 290 289 .persist = &dp, ··· 339 338 break :blk null; 340 339 }; 341 340 342 - // start broadcaster fiber — drains broadcast queue, owns all consumer state. 343 - // this is the Evented-side sequencer: frame workers push results to the queue, 344 - // this fiber does the actual fan-out to downstream consumers. 341 + // start broadcast loop — drains broadcast queue, owns all consumer state. 342 + // frame workers push results to the queue, this thread does the fan-out. 345 343 var broadcast_future = try io.concurrent(broadcaster.Broadcaster.runBroadcastLoop, .{&bc}); 346 344 defer _ = broadcast_future.cancel(io); 347 345 348 - // start GC loop on a plain thread — dp.gc() uses pool_io (Threaded) mutex 349 - // and pg.Pool. MUST NOT run as Evented fiber: Threaded futex on Evented 350 - // fiber dereferences NULL Thread.current() threadlocal → heap corruption. 346 + // start GC loop on a plain thread — dp.gc() uses pool_io mutex and pg.Pool. 351 347 const gc_thread = std.Thread.spawn(.{}, gcLoop, .{ &dp, pool_io }) catch |err| { 352 348 log.err("failed to start GC thread: {s}", .{@errorName(err)}); 353 349 return err;
+2 -4
src/resync.zig
··· 5 5 //! describeRepo from the PDS to get the current collection list, then 6 6 //! replaces the index entries for that DID. 7 7 //! 8 - //! runs entirely on pool_io (Threaded) — enqueue() is called from frame worker 9 - //! threads (plain std.Thread), and the background worker is also a plain thread. 10 - //! MUST NOT use Evented io: Threaded futex on an Evented fiber blocks the Uring 11 - //! thread, and Evented futex on a plain thread dereferences a NULL threadlocal. 8 + //! runs on pool_io — enqueue() is called from frame worker threads, 9 + //! background worker is a plain std.Thread. 12 10 13 11 const std = @import("std"); 14 12 const Io = std.Io;
+1 -1
src/slurper.zig
··· 494 494 log.warn("host {s}: failed to spawn check thread", .{hostname}); 495 495 return; 496 496 }; 497 - // wait for check to complete (Evented fiber yields) 497 + // wait for check to complete 498 498 while (!check_req.done.load(.acquire)) { 499 499 if (self.shutdown.load(.acquire)) return; 500 500 self.io.sleep(Io.Duration.fromMicroseconds(100), .awake) catch {
+2 -2
src/subscriber.zig
··· 211 211 pool: ?*frame_worker_mod.FramePool = null, 212 212 /// dedicated Threaded io for frame workers — safe from plain OS threads 213 213 pool_io: ?Io = null, 214 - /// host ops queue — pushes rare DB ops to a background thread (avoids cross-Io pg.Pool access) 214 + /// host ops queue — pushes rare DB ops to a background thread 215 215 host_ops: ?*host_ops_mod.HostOpsQueue = null, 216 216 /// coalescing cursor map — subscriber writes latest seq atomically, worker sweeps every 5s 217 217 cursor_map: ?*host_ops_mod.CursorMap = null, ··· 262 262 var backoff: u64 = 1; 263 263 const max_backoff: u64 = 60; 264 264 265 - // cursor is set at spawn time by slurper (avoids cross-Io pg.Pool access) 265 + // cursor is set at spawn time by slurper 266 266 if (self.last_upstream_seq) |seq| { 267 267 log.info("host {s}: resuming from cursor {d}", .{ self.options.hostname, seq }); 268 268 }