atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: count events at receipt, not broadcast, to match Go relay semantics

relay_frames_received_total was only incremented in broadcast(), missing
events dropped by inactive-account filtering or validation failure.
move the counter to subscriber's serverMessage after frame decode, matching
the Go relay's events_received_counter which counts at processRepoEvent entry.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz e480f358 9e1a8e83

+5 -2
+2 -2
src/broadcaster.zig
··· 344 344 /// broadcast a frame to all consumers. non-blocking — just enqueues. 345 345 /// drops slow consumers whose buffers are full after sending ConsumerTooSlow. 346 346 pub fn broadcast(self: *Broadcaster, seq: u64, data: []const u8) void { 347 - _ = self.stats.frames_in.fetchAdd(1, .monotonic); 348 347 self.stats.seq.store(seq, .release); 349 348 350 349 // add to history for cursor replay ··· 612 611 b.broadcast(3, "frame3"); 613 612 614 613 try std.testing.expectEqual(@as(u64, 3), b.stats.seq.load(.acquire)); 615 - try std.testing.expectEqual(@as(u64, 3), b.stats.frames_in.load(.acquire)); 614 + // frames_in is now incremented in subscriber, not broadcast 615 + try std.testing.expectEqual(@as(u64, 0), b.stats.frames_in.load(.acquire)); 616 616 try std.testing.expectEqual(@as(u64, 1), b.history.oldestSeq().?); 617 617 try std.testing.expectEqual(@as(u64, 3), b.history.newestSeq().?); 618 618 }
+3
src/subscriber.zig
··· 216 216 return; 217 217 }; 218 218 219 + // count every successfully decoded event (matches Go relay's events_received_counter) 220 + _ = sub.bc.stats.frames_in.fetchAdd(1, .monotonic); 221 + 219 222 // extract seq for cursor tracking (all event types have seq) 220 223 const upstream_seq = payload.getUint("seq"); 221 224 if (upstream_seq) |s| {