atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

firehose: send OutdatedCursor and FutureCursor to consumers

implements spec-compliant cursor validation for subscribeRepos
(addresses bluesky-social/indigo#1328):

- OutdatedCursor: #info message frame (op: 1, t: "#info") when cursor
is older than the replay window. stream continues from oldest available.
- FutureCursor: error frame (op: -1) when cursor exceeds current seq.
connection closes.
- add firstSeq() to DiskPersist for oldest available sequence lookup
- rename encodeInfoFrame -> encodeErrorFrame (was always error format)
- add encodeInfoMessage for proper #info message frames

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz 93512d16 1c01190e

+119 -5
+105 -5
src/broadcaster.zig
··· 121 121 } 122 122 123 123 /// encode an error frame into caller-provided buffer. 124 - /// used for InvalidCursor / OutdatedCursor info frames. 124 + /// used for InvalidCursor / FutureCursor error frames. 125 125 /// header: {op: -1}, payload: {error: <name>, message: <msg>} 126 - fn encodeInfoFrame(error_name: []const u8, message: []const u8, out: []u8) ?[]const u8 { 126 + fn encodeErrorFrame(error_name: []const u8, message: []const u8, out: []u8) ?[]const u8 { 127 127 const cbor = zat.cbor; 128 128 const alloc = std.heap.c_allocator; 129 129 ··· 132 132 } }; 133 133 const payload: cbor.Value = .{ .map = &.{ 134 134 .{ .key = "error", .value = .{ .text = error_name } }, 135 + .{ .key = "message", .value = .{ .text = message } }, 136 + } }; 137 + 138 + const header_bytes = cbor.encodeAlloc(alloc, header) catch return null; 139 + defer alloc.free(header_bytes); 140 + const payload_bytes = cbor.encodeAlloc(alloc, payload) catch return null; 141 + defer alloc.free(payload_bytes); 142 + 143 + const total = header_bytes.len + payload_bytes.len; 144 + if (total > out.len) return null; 145 + @memcpy(out[0..header_bytes.len], header_bytes); 146 + @memcpy(out[header_bytes.len..][0..payload_bytes.len], payload_bytes); 147 + 148 + return out[0..total]; 149 + } 150 + 151 + /// encode an #info message frame into caller-provided buffer. 152 + /// used for OutdatedCursor. this is a message frame (op: 1), not an error frame. 153 + /// header: {op: 1, t: "#info"}, payload: {name: <name>, message: <msg>} 154 + fn encodeInfoMessage(name: []const u8, message: []const u8, out: []u8) ?[]const u8 { 155 + const cbor = zat.cbor; 156 + const alloc = std.heap.c_allocator; 157 + 158 + const header: cbor.Value = .{ .map = &.{ 159 + .{ .key = "op", .value = .{ .unsigned = 1 } }, 160 + .{ .key = "t", .value = .{ .text = "#info" } }, 161 + } }; 162 + const payload: cbor.Value = .{ .map = &.{ 163 + .{ .key = "name", .value = .{ .text = name } }, 135 164 .{ .key = "message", .value = .{ .text = message } }, 136 165 } }; 137 166 ··· 563 592 self.consumer = consumer; 564 593 565 594 if (self.invalid_cursor) { 566 - // spec: send #info frame with InvalidCursor error, then close 567 - var info_buf: [256]u8 = undefined; 568 - if (encodeInfoFrame("InvalidCursor", "cursor parameter is not a valid integer", &info_buf)) |frame| { 595 + // spec: send error frame with InvalidCursor, then close 596 + var buf: [256]u8 = undefined; 597 + if (encodeErrorFrame("InvalidCursor", "cursor parameter is not a valid integer", &buf)) |frame| { 569 598 consumer.conn.writeBin(frame) catch {}; 570 599 } 571 600 consumer.conn.close(.{}) catch {}; ··· 573 602 } 574 603 575 604 if (self.cursor) |cursor| { 605 + const latest = ctx.stats.relay_seq.load(.acquire); 606 + 607 + // FutureCursor: cursor ahead of current seq — error, close 608 + if (latest > 0 and cursor > latest) { 609 + var buf: [256]u8 = undefined; 610 + if (encodeErrorFrame("FutureCursor", "Cursor in the future.", &buf)) |frame| { 611 + consumer.conn.writeBin(frame) catch {}; 612 + } 613 + consumer.conn.close(.{}) catch {}; 614 + return; 615 + } 616 + 617 + // OutdatedCursor: cursor older than oldest available — info, continue 618 + const oldest = blk: { 619 + if (ctx.persist) |dp| { 620 + if (dp.firstSeq()) |s| break :blk s; 621 + } 622 + break :blk ctx.history.oldestSeq() orelse 0; 623 + }; 624 + if (oldest > 0 and cursor < oldest) { 625 + var buf: [256]u8 = undefined; 626 + if (encodeInfoMessage("OutdatedCursor", "Requested cursor exceeded limit. Possibly missing events", &buf)) |frame| { 627 + consumer.conn.writeBin(frame) catch {}; 628 + } 629 + // don't close — fall through to replayTo which replays from oldest available 630 + } 631 + 576 632 ctx.replayTo(consumer, cursor); 577 633 } 578 634 } ··· 1132 1188 try std.testing.expectEqual(@as(u64, 42), p.getUint("seq").?); 1133 1189 try std.testing.expectEqualStrings("did:plc:alice", p.getString("did").?); 1134 1190 try std.testing.expectEqualStrings("2024-01-15T10:30:00Z", p.getString("time").?); 1191 + } 1192 + 1193 + test "encodeErrorFrame produces valid error frame CBOR" { 1194 + const cbor = zat.cbor; 1195 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 1196 + defer arena.deinit(); 1197 + const alloc = arena.allocator(); 1198 + 1199 + var buf: [256]u8 = undefined; 1200 + const frame = encodeErrorFrame("FutureCursor", "Cursor in the future.", &buf) orelse return error.EncodeFailed; 1201 + 1202 + // decode header — must be {op: -1} 1203 + const h_result = try cbor.decode(alloc, frame); 1204 + const h = h_result.value; 1205 + try std.testing.expectEqual(@as(i64, -1), h.getInt("op").?); 1206 + try std.testing.expect(h.getString("t") == null); // no t field on error frames 1207 + 1208 + // decode payload — must have error and message fields 1209 + const p = try cbor.decodeAll(alloc, frame[h_result.consumed..]); 1210 + try std.testing.expectEqualStrings("FutureCursor", p.getString("error").?); 1211 + try std.testing.expectEqualStrings("Cursor in the future.", p.getString("message").?); 1212 + } 1213 + 1214 + test "encodeInfoMessage produces valid #info message frame CBOR" { 1215 + const cbor = zat.cbor; 1216 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 1217 + defer arena.deinit(); 1218 + const alloc = arena.allocator(); 1219 + 1220 + var buf: [256]u8 = undefined; 1221 + const frame = encodeInfoMessage("OutdatedCursor", "Requested cursor exceeded limit. Possibly missing events", &buf) orelse return error.EncodeFailed; 1222 + 1223 + // decode header — must be {op: 1, t: "#info"} 1224 + const h_result = try cbor.decode(alloc, frame); 1225 + const h = h_result.value; 1226 + try std.testing.expectEqual(@as(i64, 1), h.getInt("op").?); 1227 + try std.testing.expectEqualStrings("#info", h.getString("t").?); 1228 + 1229 + // decode payload — must have name and message fields (not error) 1230 + const p = try cbor.decodeAll(alloc, frame[h_result.consumed..]); 1231 + try std.testing.expectEqualStrings("OutdatedCursor", p.getString("name").?); 1232 + try std.testing.expectEqualStrings("Requested cursor exceeded limit. Possibly missing events", p.getString("message").?); 1233 + // info messages must NOT have an error field 1234 + try std.testing.expect(p.getString("error") == null); 1135 1235 } 1136 1236 1137 1237 test "concurrent broadcast through ordering mutex produces monotonic sequences" {
+14
src/event_log.zig
··· 745 745 return self.cur_seq - 1; 746 746 } 747 747 748 + /// oldest available sequence number on disk, or null if no log files exist 749 + pub fn firstSeq(self: *DiskPersist) ?u64 { 750 + if (self.db.rowUnsafe( 751 + "SELECT seq_start FROM log_file_refs ORDER BY seq_start ASC LIMIT 1", 752 + .{}, 753 + ) catch null) |row| { 754 + var r = row; 755 + defer r.deinit() catch {}; 756 + const v = r.get(i64, 0); 757 + return if (v > 0) @intCast(v) else null; 758 + } 759 + return null; 760 + } 761 + 748 762 /// garbage-collect log files older than the retention period 749 763 pub fn gc(self: *DiskPersist) !void { 750 764 self.mutex.lock();