atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: serialize persist + broadcast for monotonic firehose sequences

zlay's thread-per-PDS architecture (2,700 concurrent subscribers) allowed
interleaving between seq assignment (under persist lock) and broadcast
(unlocked), delivering frames out of order to consumers.

adds broadcast_order mutex to Broadcaster — subscriber threads now hold
it across persist → resequence → broadcast, matching Indigo's pattern of
serializing the entire pipeline through one lock.

also fixes the error fallback that broadcast with upstream seq on persist
failure (mixing sequence domains). now drops the frame instead.

includes regression test that spawns 8 threads doing concurrent
broadcast and verifies monotonic output. confirmed: test catches the
bug when the lock is removed (seq 3 after 4).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz e42ba3b9 66187087

+74 -10
+56
src/broadcaster.zig
··· 288 288 allocator: Allocator, 289 289 consumers: std.ArrayListUnmanaged(*Consumer) = .{}, 290 290 consumers_mutex: std.Thread.Mutex = .{}, 291 + broadcast_order: std.Thread.Mutex = .{}, 291 292 history: FrameHistory, 292 293 persist: ?*event_log_mod.DiskPersist = null, 293 294 stats: Stats = .{}, ··· 912 913 try std.testing.expectEqualStrings("did:plc:alice", p.getString("did").?); 913 914 try std.testing.expectEqualStrings("2024-01-15T10:30:00Z", p.getString("time").?); 914 915 } 916 + 917 + test "concurrent broadcast through ordering mutex produces monotonic sequences" { 918 + // regression test: without broadcast_order serialization, concurrent 919 + // subscriber threads can interleave persist (seq assignment) and broadcast, 920 + // delivering frames out of order to consumers. 921 + // 922 + // this simulates the subscriber pattern: N threads each acquire the 923 + // ordering lock, assign a seq (atomic increment, like persist), and 924 + // broadcast. the ring buffer history must be strictly monotonic. 925 + 926 + var bc = Broadcaster.init(std.testing.allocator); 927 + defer bc.deinit(); 928 + 929 + const num_threads = 8; 930 + const frames_per_thread = 500; 931 + const total_frames = num_threads * frames_per_thread; 932 + var seq_counter = std.atomic.Value(u64).init(0); 933 + 934 + const Worker = struct { 935 + fn run(broadcaster: *Broadcaster, counter: *std.atomic.Value(u64)) void { 936 + for (0..frames_per_thread) |_| { 937 + broadcaster.broadcast_order.lock(); 938 + defer broadcaster.broadcast_order.unlock(); 939 + 940 + const seq = counter.fetchAdd(1, .monotonic) + 1; 941 + broadcaster.broadcast(seq, "x"); 942 + } 943 + } 944 + }; 945 + 946 + var threads: [num_threads]std.Thread = undefined; 947 + for (&threads) |*t| { 948 + t.* = std.Thread.spawn(.{}, Worker.run, .{ &bc, &seq_counter }) catch unreachable; 949 + } 950 + for (&threads) |*t| t.join(); 951 + 952 + // verify: ring buffer history has strictly monotonic sequences 953 + const frames = try bc.history.framesSince(std.testing.allocator, 0); 954 + defer { 955 + for (frames) |f| std.testing.allocator.free(f.data); 956 + std.testing.allocator.free(frames); 957 + } 958 + 959 + // history is capped at 50K, we pushed 4K — all should be present 960 + try std.testing.expectEqual(total_frames, frames.len); 961 + 962 + var prev_seq: u64 = 0; 963 + for (frames) |f| { 964 + if (f.seq <= prev_seq) { 965 + std.debug.print("monotonicity violation: seq {d} after {d}\n", .{ f.seq, prev_seq }); 966 + return error.NonMonotonicSequence; 967 + } 968 + prev_seq = f.seq; 969 + } 970 + }
+18 -10
src/subscriber.zig
··· 425 425 else 426 426 .identity; 427 427 428 - // persist and get relay-assigned seq, broadcast raw bytes 428 + // persist and get relay-assigned seq, broadcast raw bytes. 429 + // ordering mutex ensures frames are broadcast in seq order — 430 + // without it, concurrent subscriber threads can interleave 431 + // persist (seq assignment) and broadcast, delivering out-of-order. 429 432 if (sub.persist) |dp| { 430 - const relay_seq = dp.persist(kind, uid, data) catch |err| { 431 - log.warn("persist failed: {s}", .{@errorName(err)}); 432 - sub.bc.broadcast(upstream_seq orelse 0, data); 433 - return; 433 + const relay_seq = blk: { 434 + sub.bc.broadcast_order.lock(); 435 + defer sub.bc.broadcast_order.unlock(); 436 + 437 + const seq = dp.persist(kind, uid, data) catch |err| { 438 + log.warn("persist failed: {s}", .{@errorName(err)}); 439 + return; 440 + }; 441 + sub.bc.stats.relay_seq.store(seq, .release); 442 + const broadcast_data = broadcaster.resequenceFrame(alloc, data, seq) orelse data; 443 + sub.bc.broadcast(seq, broadcast_data); 444 + break :blk seq; 434 445 }; 446 + _ = relay_seq; 435 447 436 - // update per-DID state after successful commit/sync validation 448 + // update per-DID state outside the ordering lock (Postgres round-trip) 437 449 if ((is_commit or is_sync) and uid > 0) { 438 450 if (commit_rev) |rev| { 439 451 const cid_str: []const u8 = if (commit_data_cid) |cid_raw| ··· 445 457 }; 446 458 } 447 459 } 448 - 449 - sub.bc.stats.relay_seq.store(relay_seq, .release); 450 - const broadcast_data = broadcaster.resequenceFrame(alloc, data, relay_seq) orelse data; 451 - sub.bc.broadcast(relay_seq, broadcast_data); 452 460 } else { 453 461 sub.bc.broadcast(upstream_seq orelse 0, data); 454 462 }