atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 258 lines 9.0 kB view raw
1//! fixed-size ring buffer for firehose frames 2//! 3//! used for per-consumer send buffers (non-blocking broadcast) 4//! and global frame history (cursor replay). 5 6const std = @import("std"); 7const Io = std.Io; 8const Allocator = std.mem.Allocator; 9 10pub const Frame = struct { 11 seq: u64, 12 data: []const u8, // owned by the ring buffer 13 14 pub const empty: Frame = .{ .seq = 0, .data = &.{} }; 15}; 16 17/// thread-safe fixed-size ring buffer of frames. 18/// push overwrites oldest entries when full. 19/// data is duped into the buffer and freed on overwrite/deinit. 20pub fn RingBuffer(comptime capacity: usize) type { 21 return struct { 22 entries: [capacity]Frame = [_]Frame{Frame.empty} ** capacity, 23 write_pos: usize = 0, // next write position 24 read_pos: usize = 0, // next read position (for pop) 25 len: usize = 0, 26 allocator: Allocator, 27 mutex: Io.Mutex = Io.Mutex.init, 28 io: Io, 29 30 const Self = @This(); 31 32 pub fn init(allocator: Allocator, io: Io) Self { 33 return .{ .allocator = allocator, .io = io }; 34 } 35 36 pub fn deinit(self: *Self) void { 37 // free all live entries 38 var i: usize = 0; 39 while (i < self.len) : (i += 1) { 40 const idx = (self.read_pos + i) % capacity; 41 const entry = self.entries[idx]; 42 if (entry.data.len > 0) { 43 self.allocator.free(entry.data); 44 } 45 } 46 self.* = undefined; 47 } 48 49 /// push a frame. if full, overwrites oldest. returns false if alloc failed. 50 pub fn push(self: *Self, seq: u64, data: []const u8) bool { 51 self.mutex.lockUncancelable(self.io); 52 defer self.mutex.unlock(self.io); 53 return self.pushUnlocked(seq, data); 54 } 55 56 fn pushUnlocked(self: *Self, seq: u64, data: []const u8) bool { 57 const duped = self.allocator.dupe(u8, data) catch return false; 58 59 // free old entry if overwriting 60 if (self.len == capacity) { 61 const old = self.entries[self.write_pos]; 62 if (old.data.len > 0) { 63 self.allocator.free(old.data); 64 } 65 // advance read_pos since we're overwriting the oldest 66 self.read_pos = (self.read_pos + 1) % capacity; 67 } else { 68 self.len += 1; 69 } 70 71 self.entries[self.write_pos] = .{ .seq = seq, .data = duped }; 72 self.write_pos = (self.write_pos + 1) % capacity; 73 return true; 74 } 75 76 /// pop the oldest frame. caller owns the returned data. 77 pub fn pop(self: *Self) ?Frame { 78 self.mutex.lockUncancelable(self.io); 79 defer self.mutex.unlock(self.io); 80 return self.popUnlocked(); 81 } 82 83 fn popUnlocked(self: *Self) ?Frame { 84 if (self.len == 0) return null; 85 const frame = self.entries[self.read_pos]; 86 self.entries[self.read_pos] = Frame.empty; 87 self.read_pos = (self.read_pos + 1) % capacity; 88 self.len -= 1; 89 return frame; 90 } 91 92 /// number of frames currently buffered (non-blocking — returns 0 if lock is contended) 93 pub fn count(self: *Self) usize { 94 if (!self.mutex.tryLock()) return 0; 95 defer self.mutex.unlock(self.io); 96 return self.len; 97 } 98 99 /// check if buffer is full 100 pub fn isFull(self: *Self) bool { 101 self.mutex.lockUncancelable(self.io); 102 defer self.mutex.unlock(self.io); 103 return self.len == capacity; 104 } 105 106 /// get all frames with seq > cursor, ordered by seq. 107 /// caller owns the returned slice AND frame data. 108 pub fn framesSince(self: *Self, allocator: Allocator, cursor: u64) ![]const Frame { 109 self.mutex.lockUncancelable(self.io); 110 defer self.mutex.unlock(self.io); 111 112 var result: std.ArrayList(Frame) = .empty; 113 errdefer { 114 for (result.items) |f| allocator.free(f.data); 115 result.deinit(allocator); 116 } 117 118 var i: usize = 0; 119 while (i < self.len) : (i += 1) { 120 const idx = (self.read_pos + i) % capacity; 121 const entry = self.entries[idx]; 122 if (entry.seq > cursor) { 123 const duped = try allocator.dupe(u8, entry.data); 124 try result.append(allocator, .{ .seq = entry.seq, .data = duped }); 125 } 126 } 127 return try result.toOwnedSlice(allocator); 128 } 129 130 /// oldest seq in the buffer, or null if empty 131 pub fn oldestSeq(self: *Self) ?u64 { 132 self.mutex.lockUncancelable(self.io); 133 defer self.mutex.unlock(self.io); 134 if (self.len == 0) return null; 135 return self.entries[self.read_pos].seq; 136 } 137 138 /// newest seq in the buffer, or null if empty 139 pub fn newestSeq(self: *Self) ?u64 { 140 self.mutex.lockUncancelable(self.io); 141 defer self.mutex.unlock(self.io); 142 if (self.len == 0) return null; 143 const newest_idx = if (self.write_pos == 0) capacity - 1 else self.write_pos - 1; 144 return self.entries[newest_idx].seq; 145 } 146 }; 147} 148 149// === tests === 150 151test "push and pop" { 152 var buf = RingBuffer(4).init(std.testing.allocator, std.testing.io); 153 defer buf.deinit(); 154 155 try std.testing.expect(buf.push(1, "hello")); 156 try std.testing.expect(buf.push(2, "world")); 157 try std.testing.expectEqual(@as(usize, 2), buf.count()); 158 159 const f1 = buf.pop().?; 160 defer std.testing.allocator.free(f1.data); 161 try std.testing.expectEqual(@as(u64, 1), f1.seq); 162 try std.testing.expectEqualStrings("hello", f1.data); 163 164 const f2 = buf.pop().?; 165 defer std.testing.allocator.free(f2.data); 166 try std.testing.expectEqual(@as(u64, 2), f2.seq); 167 168 try std.testing.expect(buf.pop() == null); 169} 170 171test "overwrite when full" { 172 var buf = RingBuffer(3).init(std.testing.allocator, std.testing.io); 173 defer buf.deinit(); 174 175 try std.testing.expect(buf.push(1, "a")); 176 try std.testing.expect(buf.push(2, "b")); 177 try std.testing.expect(buf.push(3, "c")); 178 try std.testing.expectEqual(@as(usize, 3), buf.count()); 179 180 // push overwrites oldest (seq=1) 181 try std.testing.expect(buf.push(4, "d")); 182 try std.testing.expectEqual(@as(usize, 3), buf.count()); 183 184 const f1 = buf.pop().?; 185 defer std.testing.allocator.free(f1.data); 186 try std.testing.expectEqual(@as(u64, 2), f1.seq); 187} 188 189test "framesSince" { 190 var buf = RingBuffer(8).init(std.testing.allocator, std.testing.io); 191 defer buf.deinit(); 192 193 for (1..6) |i| { 194 try std.testing.expect(buf.push(@intCast(i), "data")); 195 } 196 197 const frames = try buf.framesSince(std.testing.allocator, 3); 198 defer { 199 for (frames) |f| std.testing.allocator.free(f.data); 200 std.testing.allocator.free(frames); 201 } 202 203 try std.testing.expectEqual(@as(usize, 2), frames.len); 204 try std.testing.expectEqual(@as(u64, 4), frames[0].seq); 205 try std.testing.expectEqual(@as(u64, 5), frames[1].seq); 206} 207 208test "oldestSeq and newestSeq" { 209 var buf = RingBuffer(4).init(std.testing.allocator, std.testing.io); 210 defer buf.deinit(); 211 212 try std.testing.expect(buf.oldestSeq() == null); 213 try std.testing.expect(buf.newestSeq() == null); 214 215 try std.testing.expect(buf.push(10, "x")); 216 try std.testing.expect(buf.push(20, "y")); 217 try std.testing.expect(buf.push(30, "z")); 218 219 try std.testing.expectEqual(@as(u64, 10), buf.oldestSeq().?); 220 try std.testing.expectEqual(@as(u64, 30), buf.newestSeq().?); 221} 222 223test "empty buffer operations" { 224 var buf = RingBuffer(4).init(std.testing.allocator, std.testing.io); 225 defer buf.deinit(); 226 227 try std.testing.expectEqual(@as(usize, 0), buf.count()); 228 try std.testing.expect(!buf.isFull()); 229 try std.testing.expect(buf.pop() == null); 230 231 const frames = try buf.framesSince(std.testing.allocator, 0); 232 defer std.testing.allocator.free(frames); 233 try std.testing.expectEqual(@as(usize, 0), frames.len); 234} 235 236test "wrap-around with pop and push" { 237 var buf = RingBuffer(3).init(std.testing.allocator, std.testing.io); 238 defer buf.deinit(); 239 240 // fill 241 try std.testing.expect(buf.push(1, "a")); 242 try std.testing.expect(buf.push(2, "b")); 243 try std.testing.expect(buf.push(3, "c")); 244 245 // pop two 246 const f1 = buf.pop().?; 247 std.testing.allocator.free(f1.data); 248 const f2 = buf.pop().?; 249 std.testing.allocator.free(f2.data); 250 251 // push two more (wraps around) 252 try std.testing.expect(buf.push(4, "d")); 253 try std.testing.expect(buf.push(5, "e")); 254 255 try std.testing.expectEqual(@as(usize, 3), buf.count()); 256 try std.testing.expectEqual(@as(u64, 3), buf.oldestSeq().?); 257 try std.testing.expectEqual(@as(u64, 5), buf.newestSeq().?); 258}