atproto relay implementation in zig
zlay.waow.tech
1//! fixed-size ring buffer for firehose frames
2//!
3//! used for per-consumer send buffers (non-blocking broadcast)
4//! and global frame history (cursor replay).
5
6const std = @import("std");
7const Io = std.Io;
8const Allocator = std.mem.Allocator;
9
10pub const Frame = struct {
11 seq: u64,
12 data: []const u8, // owned by the ring buffer
13
14 pub const empty: Frame = .{ .seq = 0, .data = &.{} };
15};
16
17/// thread-safe fixed-size ring buffer of frames.
18/// push overwrites oldest entries when full.
19/// data is duped into the buffer and freed on overwrite/deinit.
20pub fn RingBuffer(comptime capacity: usize) type {
21 return struct {
22 entries: [capacity]Frame = [_]Frame{Frame.empty} ** capacity,
23 write_pos: usize = 0, // next write position
24 read_pos: usize = 0, // next read position (for pop)
25 len: usize = 0,
26 allocator: Allocator,
27 mutex: Io.Mutex = Io.Mutex.init,
28 io: Io,
29
30 const Self = @This();
31
32 pub fn init(allocator: Allocator, io: Io) Self {
33 return .{ .allocator = allocator, .io = io };
34 }
35
36 pub fn deinit(self: *Self) void {
37 // free all live entries
38 var i: usize = 0;
39 while (i < self.len) : (i += 1) {
40 const idx = (self.read_pos + i) % capacity;
41 const entry = self.entries[idx];
42 if (entry.data.len > 0) {
43 self.allocator.free(entry.data);
44 }
45 }
46 self.* = undefined;
47 }
48
49 /// push a frame. if full, overwrites oldest. returns false if alloc failed.
50 pub fn push(self: *Self, seq: u64, data: []const u8) bool {
51 self.mutex.lockUncancelable(self.io);
52 defer self.mutex.unlock(self.io);
53 return self.pushUnlocked(seq, data);
54 }
55
56 fn pushUnlocked(self: *Self, seq: u64, data: []const u8) bool {
57 const duped = self.allocator.dupe(u8, data) catch return false;
58
59 // free old entry if overwriting
60 if (self.len == capacity) {
61 const old = self.entries[self.write_pos];
62 if (old.data.len > 0) {
63 self.allocator.free(old.data);
64 }
65 // advance read_pos since we're overwriting the oldest
66 self.read_pos = (self.read_pos + 1) % capacity;
67 } else {
68 self.len += 1;
69 }
70
71 self.entries[self.write_pos] = .{ .seq = seq, .data = duped };
72 self.write_pos = (self.write_pos + 1) % capacity;
73 return true;
74 }
75
76 /// pop the oldest frame. caller owns the returned data.
77 pub fn pop(self: *Self) ?Frame {
78 self.mutex.lockUncancelable(self.io);
79 defer self.mutex.unlock(self.io);
80 return self.popUnlocked();
81 }
82
83 fn popUnlocked(self: *Self) ?Frame {
84 if (self.len == 0) return null;
85 const frame = self.entries[self.read_pos];
86 self.entries[self.read_pos] = Frame.empty;
87 self.read_pos = (self.read_pos + 1) % capacity;
88 self.len -= 1;
89 return frame;
90 }
91
92 /// number of frames currently buffered (non-blocking — returns 0 if lock is contended)
93 pub fn count(self: *Self) usize {
94 if (!self.mutex.tryLock()) return 0;
95 defer self.mutex.unlock(self.io);
96 return self.len;
97 }
98
99 /// check if buffer is full
100 pub fn isFull(self: *Self) bool {
101 self.mutex.lockUncancelable(self.io);
102 defer self.mutex.unlock(self.io);
103 return self.len == capacity;
104 }
105
106 /// get all frames with seq > cursor, ordered by seq.
107 /// caller owns the returned slice AND frame data.
108 pub fn framesSince(self: *Self, allocator: Allocator, cursor: u64) ![]const Frame {
109 self.mutex.lockUncancelable(self.io);
110 defer self.mutex.unlock(self.io);
111
112 var result: std.ArrayList(Frame) = .empty;
113 errdefer {
114 for (result.items) |f| allocator.free(f.data);
115 result.deinit(allocator);
116 }
117
118 var i: usize = 0;
119 while (i < self.len) : (i += 1) {
120 const idx = (self.read_pos + i) % capacity;
121 const entry = self.entries[idx];
122 if (entry.seq > cursor) {
123 const duped = try allocator.dupe(u8, entry.data);
124 try result.append(allocator, .{ .seq = entry.seq, .data = duped });
125 }
126 }
127 return try result.toOwnedSlice(allocator);
128 }
129
130 /// oldest seq in the buffer, or null if empty
131 pub fn oldestSeq(self: *Self) ?u64 {
132 self.mutex.lockUncancelable(self.io);
133 defer self.mutex.unlock(self.io);
134 if (self.len == 0) return null;
135 return self.entries[self.read_pos].seq;
136 }
137
138 /// newest seq in the buffer, or null if empty
139 pub fn newestSeq(self: *Self) ?u64 {
140 self.mutex.lockUncancelable(self.io);
141 defer self.mutex.unlock(self.io);
142 if (self.len == 0) return null;
143 const newest_idx = if (self.write_pos == 0) capacity - 1 else self.write_pos - 1;
144 return self.entries[newest_idx].seq;
145 }
146 };
147}
148
149// === tests ===
150
151test "push and pop" {
152 var buf = RingBuffer(4).init(std.testing.allocator, std.testing.io);
153 defer buf.deinit();
154
155 try std.testing.expect(buf.push(1, "hello"));
156 try std.testing.expect(buf.push(2, "world"));
157 try std.testing.expectEqual(@as(usize, 2), buf.count());
158
159 const f1 = buf.pop().?;
160 defer std.testing.allocator.free(f1.data);
161 try std.testing.expectEqual(@as(u64, 1), f1.seq);
162 try std.testing.expectEqualStrings("hello", f1.data);
163
164 const f2 = buf.pop().?;
165 defer std.testing.allocator.free(f2.data);
166 try std.testing.expectEqual(@as(u64, 2), f2.seq);
167
168 try std.testing.expect(buf.pop() == null);
169}
170
171test "overwrite when full" {
172 var buf = RingBuffer(3).init(std.testing.allocator, std.testing.io);
173 defer buf.deinit();
174
175 try std.testing.expect(buf.push(1, "a"));
176 try std.testing.expect(buf.push(2, "b"));
177 try std.testing.expect(buf.push(3, "c"));
178 try std.testing.expectEqual(@as(usize, 3), buf.count());
179
180 // push overwrites oldest (seq=1)
181 try std.testing.expect(buf.push(4, "d"));
182 try std.testing.expectEqual(@as(usize, 3), buf.count());
183
184 const f1 = buf.pop().?;
185 defer std.testing.allocator.free(f1.data);
186 try std.testing.expectEqual(@as(u64, 2), f1.seq);
187}
188
189test "framesSince" {
190 var buf = RingBuffer(8).init(std.testing.allocator, std.testing.io);
191 defer buf.deinit();
192
193 for (1..6) |i| {
194 try std.testing.expect(buf.push(@intCast(i), "data"));
195 }
196
197 const frames = try buf.framesSince(std.testing.allocator, 3);
198 defer {
199 for (frames) |f| std.testing.allocator.free(f.data);
200 std.testing.allocator.free(frames);
201 }
202
203 try std.testing.expectEqual(@as(usize, 2), frames.len);
204 try std.testing.expectEqual(@as(u64, 4), frames[0].seq);
205 try std.testing.expectEqual(@as(u64, 5), frames[1].seq);
206}
207
208test "oldestSeq and newestSeq" {
209 var buf = RingBuffer(4).init(std.testing.allocator, std.testing.io);
210 defer buf.deinit();
211
212 try std.testing.expect(buf.oldestSeq() == null);
213 try std.testing.expect(buf.newestSeq() == null);
214
215 try std.testing.expect(buf.push(10, "x"));
216 try std.testing.expect(buf.push(20, "y"));
217 try std.testing.expect(buf.push(30, "z"));
218
219 try std.testing.expectEqual(@as(u64, 10), buf.oldestSeq().?);
220 try std.testing.expectEqual(@as(u64, 30), buf.newestSeq().?);
221}
222
223test "empty buffer operations" {
224 var buf = RingBuffer(4).init(std.testing.allocator, std.testing.io);
225 defer buf.deinit();
226
227 try std.testing.expectEqual(@as(usize, 0), buf.count());
228 try std.testing.expect(!buf.isFull());
229 try std.testing.expect(buf.pop() == null);
230
231 const frames = try buf.framesSince(std.testing.allocator, 0);
232 defer std.testing.allocator.free(frames);
233 try std.testing.expectEqual(@as(usize, 0), frames.len);
234}
235
236test "wrap-around with pop and push" {
237 var buf = RingBuffer(3).init(std.testing.allocator, std.testing.io);
238 defer buf.deinit();
239
240 // fill
241 try std.testing.expect(buf.push(1, "a"));
242 try std.testing.expect(buf.push(2, "b"));
243 try std.testing.expect(buf.push(3, "c"));
244
245 // pop two
246 const f1 = buf.pop().?;
247 std.testing.allocator.free(f1.data);
248 const f2 = buf.pop().?;
249 std.testing.allocator.free(f2.data);
250
251 // push two more (wraps around)
252 try std.testing.expect(buf.push(4, "d"));
253 try std.testing.expect(buf.push(5, "e"));
254
255 try std.testing.expectEqual(@as(usize, 3), buf.count());
256 try std.testing.expectEqual(@as(u64, 3), buf.oldestSeq().?);
257 try std.testing.expectEqual(@as(u64, 5), buf.newestSeq().?);
258}