atproto utils for zig
zat.dev
atproto
sdk
zig
1//! firehose codec - com.atproto.sync.subscribeRepos
2//!
3//! encode and decode AT Protocol firehose events over WebSocket. messages are
4//! DAG-CBOR encoded (unlike jetstream, which is JSON). includes frame encoding/
5//! decoding, CAR block packing, and CID creation for records.
6//!
7//! wire format per frame:
8//! [DAG-CBOR header: {op, t}] [DAG-CBOR payload: {seq, repo, ops, blocks, ...}]
9//!
10//! see: https://atproto.com/specs/event-stream
11
12const std = @import("std");
13const websocket = @import("websocket");
14const cbor = @import("../repo/cbor.zig");
15const car = @import("../repo/car.zig");
16const sync = @import("sync.zig");
17
18const mem = std.mem;
19const Allocator = mem.Allocator;
20const posix = std.posix;
21const Io = std.Io;
22const log = std.log.scoped(.zat);
23
24pub const CommitAction = sync.CommitAction;
25pub const AccountStatus = sync.AccountStatus;
26
27pub const default_hosts = [_][]const u8{
28 "bsky.network",
29 "northamerica.firehose.network",
30 "europe.firehose.network",
31 "asia.firehose.network",
32};
33
34pub const Options = struct {
35 hosts: []const []const u8 = &default_hosts,
36 cursor: ?i64 = null,
37 max_message_size: usize = 5 * 1024 * 1024, // 5MB — firehose frames can be large
38};
39
40/// decoded firehose event
41pub const Event = union(enum) {
42 commit: CommitEvent,
43 identity: IdentityEvent,
44 account: AccountEvent,
45 info: InfoEvent,
46
47 pub fn seq(self: Event) ?i64 {
48 return switch (self) {
49 .commit => |c| c.seq,
50 .identity => |i| i.seq,
51 .account => |a| a.seq,
52 .info => null,
53 };
54 }
55};
56
57pub const CommitEvent = struct {
58 seq: i64,
59 repo: []const u8, // DID
60 rev: []const u8, // TID — revision of the commit
61 time: []const u8, // datetime — when event was received
62 since: ?[]const u8 = null, // TID — rev of preceding commit (null = full repo export)
63 commit: ?cbor.Cid = null, // CID of the commit object
64 ops: []const RepoOp,
65 blobs: []const cbor.Cid = &.{}, // new blobs referenced by records in this commit
66 too_big: bool = false,
67};
68
69pub const RepoOp = struct {
70 action: CommitAction,
71 collection: []const u8,
72 rkey: []const u8,
73 cid: ?cbor.Cid = null, // CID of the record (null for deletes)
74 record: ?cbor.Value = null, // decoded DAG-CBOR record from CAR block
75};
76
77pub const IdentityEvent = struct {
78 seq: i64,
79 did: []const u8,
80 time: []const u8, // datetime — when event was received
81 handle: ?[]const u8 = null,
82};
83
84pub const AccountEvent = struct {
85 seq: i64,
86 did: []const u8,
87 time: []const u8, // datetime — when event was received
88 active: bool = true,
89 status: ?AccountStatus = null,
90};
91
92pub const InfoEvent = struct {
93 name: ?[]const u8 = null,
94 message: ?[]const u8 = null,
95};
96
97/// frame header from the wire
98const FrameHeader = struct {
99 op: i64,
100 t: ?[]const u8 = null,
101};
102
103pub const FrameOp = enum(i64) {
104 message = 1,
105 err = -1,
106};
107
108pub const DecodeError = error{
109 InvalidFrame,
110 InvalidHeader,
111 UnexpectedEof,
112 MissingField,
113 UnknownOp,
114 UnknownEventType,
115} || cbor.DecodeError || car.CarError;
116
117/// decode a raw WebSocket binary frame into a firehose Event
118pub fn decodeFrame(allocator: Allocator, data: []const u8) DecodeError!Event {
119 // frame = [CBOR header] [CBOR payload] concatenated
120 const header_result = try cbor.decode(allocator, data);
121 const header_val = header_result.value;
122 const payload_data = data[header_result.consumed..];
123
124 // parse header
125 const op = header_val.getInt("op") orelse return error.InvalidHeader;
126 if (op == -1) return error.UnknownOp; // error frame
127
128 const t = header_val.getString("t") orelse return error.InvalidHeader;
129
130 // decode payload
131 const payload = try cbor.decodeAll(allocator, payload_data);
132
133 if (mem.eql(u8, t, "#commit")) {
134 return try decodeCommit(allocator, payload);
135 } else if (mem.eql(u8, t, "#identity")) {
136 return decodeIdentity(payload);
137 } else if (mem.eql(u8, t, "#account")) {
138 return decodeAccount(payload);
139 } else if (mem.eql(u8, t, "#info")) {
140 return .{ .info = .{
141 .name = payload.getString("name"),
142 .message = payload.getString("message"),
143 } };
144 }
145
146 return error.UnknownEventType;
147}
148
149fn decodeCommit(allocator: Allocator, payload: cbor.Value) DecodeError!Event {
150 const seq_val = payload.getInt("seq") orelse return error.MissingField;
151 const repo = payload.getString("repo") orelse return error.MissingField;
152 const rev = payload.getString("rev") orelse return error.MissingField;
153 const time = payload.getString("time") orelse return error.MissingField;
154
155 // parse commit CID
156 var commit_cid: ?cbor.Cid = null;
157 if (payload.get("commit")) |commit_val| {
158 switch (commit_val) {
159 .cid => |c| commit_cid = c,
160 else => {},
161 }
162 }
163
164 // parse blobs array (array of CID links)
165 var blobs: std.ArrayList(cbor.Cid) = .empty;
166 if (payload.getArray("blobs")) |blob_values| {
167 for (blob_values) |blob_val| {
168 switch (blob_val) {
169 .cid => |c| try blobs.append(allocator, c),
170 else => {},
171 }
172 }
173 }
174
175 // parse CAR blocks
176 const blocks_bytes = payload.getBytes("blocks");
177 var parsed_car: ?car.Car = null;
178 if (blocks_bytes) |b| {
179 parsed_car = car.read(allocator, b) catch null;
180 }
181
182 // parse ops
183 const ops_array = payload.getArray("ops");
184 var ops: std.ArrayList(RepoOp) = .empty;
185
186 if (ops_array) |op_values| {
187 for (op_values) |op_val| {
188 const action_str = op_val.getString("action") orelse continue;
189 const action = CommitAction.parse(action_str) orelse continue;
190 const path = op_val.getString("path") orelse continue;
191
192 // split path into collection/rkey
193 const slash = mem.indexOfScalar(u8, path, '/') orelse continue;
194 const collection = path[0..slash];
195 const rkey = path[slash + 1 ..];
196
197 // extract CID from op and look up record from CAR blocks
198 var op_cid: ?cbor.Cid = null;
199 var record: ?cbor.Value = null;
200 if (op_val.get("cid")) |cid_val| {
201 switch (cid_val) {
202 .cid => |cid| {
203 op_cid = cid;
204 if (parsed_car) |c| {
205 if (car.findBlock(c, cid.raw)) |block_data| {
206 record = cbor.decodeAll(allocator, block_data) catch null;
207 }
208 }
209 },
210 else => {},
211 }
212 }
213
214 try ops.append(allocator, .{
215 .action = action,
216 .collection = collection,
217 .rkey = rkey,
218 .cid = op_cid,
219 .record = record,
220 });
221 }
222 }
223
224 return .{ .commit = .{
225 .seq = seq_val,
226 .repo = repo,
227 .rev = rev,
228 .time = time,
229 .since = payload.getString("since"),
230 .commit = commit_cid,
231 .ops = try ops.toOwnedSlice(allocator),
232 .blobs = try blobs.toOwnedSlice(allocator),
233 .too_big = payload.getBool("tooBig") orelse false,
234 } };
235}
236
237fn decodeIdentity(payload: cbor.Value) DecodeError!Event {
238 return .{ .identity = .{
239 .seq = payload.getInt("seq") orelse return error.MissingField,
240 .did = payload.getString("did") orelse return error.MissingField,
241 .time = payload.getString("time") orelse return error.MissingField,
242 .handle = payload.getString("handle"),
243 } };
244}
245
246fn decodeAccount(payload: cbor.Value) DecodeError!Event {
247 const status_str = payload.getString("status");
248 return .{ .account = .{
249 .seq = payload.getInt("seq") orelse return error.MissingField,
250 .did = payload.getString("did") orelse return error.MissingField,
251 .time = payload.getString("time") orelse return error.MissingField,
252 .active = payload.getBool("active") orelse true,
253 .status = if (status_str) |s| AccountStatus.parse(s) else null,
254 } };
255}
256
257// === encoder ===
258
259/// encode a firehose Event into a wire frame: [DAG-CBOR header] [DAG-CBOR payload]
260pub fn encodeFrame(allocator: Allocator, event: Event) ![]u8 {
261 var aw: std.Io.Writer.Allocating = .init(allocator);
262 errdefer aw.deinit();
263
264 const tag = switch (event) {
265 .commit => "#commit",
266 .identity => "#identity",
267 .account => "#account",
268 .info => "#info",
269 };
270
271 // encode header: {op: 1, t: "#..."}
272 const header: cbor.Value = .{ .map = &.{
273 .{ .key = "op", .value = .{ .unsigned = 1 } },
274 .{ .key = "t", .value = .{ .text = tag } },
275 } };
276 try cbor.encode(allocator, &aw.writer, header);
277
278 // encode payload based on event type
279 switch (event) {
280 .commit => |commit| try encodeCommitPayload(allocator, &aw.writer, commit),
281 .identity => |id| try encodeIdentityPayload(allocator, &aw.writer, id),
282 .account => |acct| try encodeAccountPayload(allocator, &aw.writer, acct),
283 .info => |inf| try encodeInfoPayload(allocator, &aw.writer, inf),
284 }
285
286 return try aw.toOwnedSlice();
287}
288
289fn encodeCommitPayload(allocator: Allocator, writer: anytype, commit: CommitEvent) !void {
290 // build ops array and CAR blocks simultaneously
291 var op_values: std.ArrayList(cbor.Value) = .empty;
292 defer op_values.deinit(allocator);
293 var car_blocks: std.ArrayList(car.Block) = .empty;
294 defer car_blocks.deinit(allocator);
295 var root_cids: std.ArrayList(cbor.Cid) = .empty;
296 defer root_cids.deinit(allocator);
297
298 for (commit.ops) |op| {
299 const action_str: []const u8 = @tagName(op.action);
300 const path = try std.fmt.allocPrint(allocator, "{s}/{s}", .{ op.collection, op.rkey });
301
302 if (op.record) |record| {
303 // encode record, create CID, add to CAR blocks
304 const record_bytes = try cbor.encodeAlloc(allocator, record);
305 const cid = try cbor.Cid.forDagCbor(allocator, record_bytes);
306
307 try car_blocks.append(allocator, .{
308 .cid_raw = cid.raw,
309 .data = record_bytes,
310 });
311
312 if (root_cids.items.len == 0) {
313 try root_cids.append(allocator, cid);
314 }
315
316 try op_values.append(allocator, .{ .map = @constCast(&[_]cbor.Value.MapEntry{
317 .{ .key = "action", .value = .{ .text = action_str } },
318 .{ .key = "cid", .value = .{ .cid = cid } },
319 .{ .key = "path", .value = .{ .text = path } },
320 }) });
321 } else {
322 try op_values.append(allocator, .{ .map = @constCast(&[_]cbor.Value.MapEntry{
323 .{ .key = "action", .value = .{ .text = action_str } },
324 .{ .key = "path", .value = .{ .text = path } },
325 }) });
326 }
327 }
328
329 // build CAR file from blocks
330 const car_data = car.Car{
331 .roots = root_cids.items,
332 .blocks = car_blocks.items,
333 };
334 const blocks_bytes = try car.writeAlloc(allocator, car_data);
335
336 // build blobs array
337 var blob_values: std.ArrayList(cbor.Value) = .empty;
338 defer blob_values.deinit(allocator);
339 for (commit.blobs) |blob| {
340 try blob_values.append(allocator, .{ .cid = blob });
341 }
342
343 // build payload entries
344 var entries: std.ArrayList(cbor.Value.MapEntry) = .empty;
345 defer entries.deinit(allocator);
346
347 try entries.append(allocator, .{ .key = "blocks", .value = .{ .bytes = blocks_bytes } });
348 if (commit.commit) |c| {
349 try entries.append(allocator, .{ .key = "commit", .value = .{ .cid = c } });
350 }
351 try entries.append(allocator, .{ .key = "blobs", .value = .{ .array = blob_values.items } });
352 try entries.append(allocator, .{ .key = "ops", .value = .{ .array = op_values.items } });
353 try entries.append(allocator, .{ .key = "repo", .value = .{ .text = commit.repo } });
354 try entries.append(allocator, .{ .key = "rev", .value = .{ .text = commit.rev } });
355 try entries.append(allocator, .{ .key = "seq", .value = .{ .unsigned = @intCast(commit.seq) } });
356 if (commit.since) |s| {
357 try entries.append(allocator, .{ .key = "since", .value = .{ .text = s } });
358 }
359 try entries.append(allocator, .{ .key = "time", .value = .{ .text = commit.time } });
360 if (commit.too_big) {
361 try entries.append(allocator, .{ .key = "tooBig", .value = .{ .boolean = true } });
362 }
363
364 try cbor.encode(allocator, writer, .{ .map = entries.items });
365}
366
367fn encodeIdentityPayload(allocator: Allocator, writer: anytype, identity: IdentityEvent) !void {
368 var entries: std.ArrayList(cbor.Value.MapEntry) = .empty;
369 defer entries.deinit(allocator);
370
371 try entries.append(allocator, .{ .key = "did", .value = .{ .text = identity.did } });
372 if (identity.handle) |h| {
373 try entries.append(allocator, .{ .key = "handle", .value = .{ .text = h } });
374 }
375 try entries.append(allocator, .{ .key = "seq", .value = .{ .unsigned = @intCast(identity.seq) } });
376 try entries.append(allocator, .{ .key = "time", .value = .{ .text = identity.time } });
377
378 try cbor.encode(allocator, writer, .{ .map = entries.items });
379}
380
381fn encodeAccountPayload(allocator: Allocator, writer: anytype, account: AccountEvent) !void {
382 var entries: std.ArrayList(cbor.Value.MapEntry) = .empty;
383 defer entries.deinit(allocator);
384
385 if (!account.active) {
386 try entries.append(allocator, .{ .key = "active", .value = .{ .boolean = false } });
387 }
388 try entries.append(allocator, .{ .key = "did", .value = .{ .text = account.did } });
389 try entries.append(allocator, .{ .key = "seq", .value = .{ .unsigned = @intCast(account.seq) } });
390 if (account.status) |s| {
391 try entries.append(allocator, .{ .key = "status", .value = .{ .text = @tagName(s) } });
392 }
393 try entries.append(allocator, .{ .key = "time", .value = .{ .text = account.time } });
394
395 try cbor.encode(allocator, writer, .{ .map = entries.items });
396}
397
398fn encodeInfoPayload(allocator: Allocator, writer: anytype, info: InfoEvent) !void {
399 var entries: std.ArrayList(cbor.Value.MapEntry) = .empty;
400 defer entries.deinit(allocator);
401
402 if (info.message) |m| {
403 try entries.append(allocator, .{ .key = "message", .value = .{ .text = m } });
404 }
405 if (info.name) |n| {
406 try entries.append(allocator, .{ .key = "name", .value = .{ .text = n } });
407 }
408
409 try cbor.encode(allocator, writer, .{ .map = entries.items });
410}
411
412pub const FirehoseClient = struct {
413 io: Io,
414 allocator: Allocator,
415 options: Options,
416 last_seq: ?i64 = null,
417
418 pub fn init(io: Io, allocator: Allocator, options: Options) FirehoseClient {
419 return .{
420 .io = io,
421 .allocator = allocator,
422 .options = options,
423 .last_seq = if (options.cursor) |c| c else null,
424 };
425 }
426
427 pub fn deinit(_: *FirehoseClient) void {}
428
429 /// subscribe with a user-provided handler.
430 /// handler must implement: fn onEvent(*@TypeOf(handler), Event) void
431 /// optional: fn onError(*@TypeOf(handler), anyerror) void
432 /// blocks forever — reconnects with exponential backoff on disconnect.
433 /// rotates through hosts on each reconnect attempt.
434 pub fn subscribe(self: *FirehoseClient, handler: anytype) Io.Cancelable!void {
435 var backoff: u64 = 1;
436 var host_index: usize = 0;
437 const max_backoff: u64 = 60;
438 var prev_host_index: usize = 0;
439
440 while (true) {
441 const host = self.options.hosts[host_index % self.options.hosts.len];
442 const effective_index = host_index % self.options.hosts.len;
443
444 // reset backoff on host switch (fresh host deserves a fresh chance)
445 if (host_index > 0 and effective_index != prev_host_index) {
446 backoff = 1;
447 }
448
449 log.info("connecting to host {d}/{d}: {s}", .{ effective_index + 1, self.options.hosts.len, host });
450
451 self.connectAndRead(host, handler) catch |err| {
452 if (comptime @hasDecl(@TypeOf(handler.*), "onError")) {
453 handler.onError(err);
454 } else {
455 log.err("firehose error: {s}, reconnecting in {d}s...", .{ @errorName(err), backoff });
456 }
457 };
458
459 prev_host_index = effective_index;
460 host_index += 1;
461 try self.io.sleep(Io.Duration.fromSeconds(@intCast(backoff)), .awake);
462 backoff = @min(backoff * 2, max_backoff);
463 }
464 }
465
466 fn connectAndRead(self: *FirehoseClient, host: []const u8, handler: anytype) !void {
467 var path_buf: [256]u8 = undefined;
468 var w: std.Io.Writer = .fixed(&path_buf);
469
470 try w.writeAll("/xrpc/com.atproto.sync.subscribeRepos");
471 if (self.last_seq) |cursor| {
472 try w.print("?cursor={d}", .{cursor});
473 }
474 const path = w.buffered();
475
476 log.info("connecting to wss://{s}{s}", .{ host, path });
477
478 var client = try websocket.Client.init(self.io, self.allocator, .{
479 .host = host,
480 .port = 443,
481 .tls = true,
482 .max_size = self.options.max_message_size,
483 });
484 defer client.deinit();
485
486 var host_header_buf: [256]u8 = undefined;
487 const host_header = std.fmt.bufPrint(&host_header_buf, "Host: {s}\r\n", .{host}) catch host;
488
489 try client.handshake(path, .{ .headers = host_header });
490 configureKeepalive(&client);
491
492 log.info("firehose connected to {s}", .{host});
493
494 var ws_handler = WsHandler(@TypeOf(handler.*)){
495 .allocator = self.allocator,
496 .handler = handler,
497 .client_state = self,
498 };
499 try client.readLoop(&ws_handler);
500 }
501};
502
503fn WsHandler(comptime H: type) type {
504 return struct {
505 allocator: Allocator,
506 handler: *H,
507 client_state: *FirehoseClient,
508
509 const Self = @This();
510
511 pub fn serverMessage(self: *Self, data: []const u8) !void {
512 var arena = std.heap.ArenaAllocator.init(self.allocator);
513 defer arena.deinit();
514
515 const event = decodeFrame(arena.allocator(), data) catch |err| {
516 log.debug("frame decode error: {s}", .{@errorName(err)});
517 return;
518 };
519
520 if (event.seq()) |s| {
521 self.client_state.last_seq = s;
522 }
523
524 self.handler.onEvent(event);
525 }
526
527 pub fn close(_: *Self) void {
528 log.info("firehose connection closed", .{});
529 }
530 };
531}
532
533/// enable TCP keepalive so reads don't block forever when a peer
534/// disappears without FIN/RST (network partition, crash, power loss).
535/// detection time: 10s idle + 5s × 2 probes = 20s.
536fn configureKeepalive(client: *websocket.Client) void {
537 const fd = client.stream.stream.socket.handle;
538 const builtin = @import("builtin");
539 posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.KEEPALIVE, &std.mem.toBytes(@as(i32, 1))) catch return;
540 const tcp: i32 = @intCast(posix.IPPROTO.TCP);
541 if (builtin.os.tag == .linux) {
542 posix.setsockopt(fd, tcp, posix.TCP.KEEPIDLE, &std.mem.toBytes(@as(i32, 10))) catch return;
543 } else if (builtin.os.tag == .macos) {
544 posix.setsockopt(fd, tcp, posix.TCP.KEEPALIVE, &std.mem.toBytes(@as(i32, 10))) catch return;
545 }
546 posix.setsockopt(fd, tcp, posix.TCP.KEEPINTVL, &std.mem.toBytes(@as(i32, 5))) catch return;
547 posix.setsockopt(fd, tcp, posix.TCP.KEEPCNT, &std.mem.toBytes(@as(i32, 2))) catch return;
548}
549
550// === tests ===
551
552test "decode frame header" {
553 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
554 defer arena.deinit();
555 const alloc = arena.allocator();
556
557 // simulate a frame: header {op: 1, t: "#info"} + payload {name: "OutdatedCursor"}
558 const header_bytes = [_]u8{
559 0xa2, // map(2)
560 0x62, 'o', 'p', 0x01, // "op": 1
561 0x61, 't', 0x65, '#', 'i', 'n', 'f', 'o', // "t": "#info"
562 };
563 const payload_bytes = [_]u8{
564 0xa1, // map(1)
565 0x64, 'n', 'a', 'm', 'e', // "name"
566 0x6e, 'O', 'u', 't', 'd', 'a', 't', 'e', 'd', 'C', 'u', 'r', 's', 'o', 'r', // "OutdatedCursor"
567 };
568
569 var frame: [header_bytes.len + payload_bytes.len]u8 = undefined;
570 @memcpy(frame[0..header_bytes.len], &header_bytes);
571 @memcpy(frame[header_bytes.len..], &payload_bytes);
572
573 const event = try decodeFrame(alloc, &frame);
574 const info = event.info;
575 try std.testing.expectEqualStrings("OutdatedCursor", info.name.?);
576}
577
578test "decode identity frame" {
579 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
580 defer arena.deinit();
581 const alloc = arena.allocator();
582
583 // build frame via encoder for cleaner test
584 const original = Event{ .identity = .{
585 .seq = 42,
586 .did = "did:plc:test",
587 .time = "2024-01-15T10:30:00Z",
588 } };
589 const frame = try encodeFrame(alloc, original);
590
591 const event = try decodeFrame(alloc, frame);
592 const identity = event.identity;
593 try std.testing.expectEqual(@as(i64, 42), identity.seq);
594 try std.testing.expectEqualStrings("did:plc:test", identity.did);
595 try std.testing.expectEqualStrings("2024-01-15T10:30:00Z", identity.time);
596}
597
598test "Event.seq works" {
599 const info_event = Event{ .info = .{ .name = "test" } };
600 try std.testing.expect(info_event.seq() == null);
601
602 const identity_event = Event{ .identity = .{
603 .seq = 42,
604 .did = "did:plc:test",
605 .time = "2024-01-15T10:30:00Z",
606 } };
607 try std.testing.expectEqual(@as(i64, 42), identity_event.seq().?);
608}
609
610// === encoder tests ===
611
612test "encode → decode info frame" {
613 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
614 defer arena.deinit();
615 const alloc = arena.allocator();
616
617 const original = Event{ .info = .{
618 .name = "OutdatedCursor",
619 .message = "cursor is behind",
620 } };
621
622 const frame = try encodeFrame(alloc, original);
623 const decoded = try decodeFrame(alloc, frame);
624
625 try std.testing.expectEqualStrings("OutdatedCursor", decoded.info.name.?);
626 try std.testing.expectEqualStrings("cursor is behind", decoded.info.message.?);
627}
628
629test "encode → decode identity frame" {
630 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
631 defer arena.deinit();
632 const alloc = arena.allocator();
633
634 const original = Event{ .identity = .{
635 .seq = 42,
636 .did = "did:plc:test123",
637 .time = "2024-01-15T10:30:00Z",
638 .handle = "alice.bsky.social",
639 } };
640
641 const frame = try encodeFrame(alloc, original);
642 const decoded = try decodeFrame(alloc, frame);
643
644 const id = decoded.identity;
645 try std.testing.expectEqual(@as(i64, 42), id.seq);
646 try std.testing.expectEqualStrings("did:plc:test123", id.did);
647 try std.testing.expectEqualStrings("2024-01-15T10:30:00Z", id.time);
648 try std.testing.expectEqualStrings("alice.bsky.social", id.handle.?);
649}
650
651test "encode → decode account frame" {
652 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
653 defer arena.deinit();
654 const alloc = arena.allocator();
655
656 const original = Event{ .account = .{
657 .seq = 100,
658 .did = "did:plc:suspended",
659 .time = "2024-01-15T10:30:00Z",
660 .active = false,
661 .status = .suspended,
662 } };
663
664 const frame = try encodeFrame(alloc, original);
665 const decoded = try decodeFrame(alloc, frame);
666
667 const acct = decoded.account;
668 try std.testing.expectEqual(@as(i64, 100), acct.seq);
669 try std.testing.expectEqualStrings("did:plc:suspended", acct.did);
670 try std.testing.expectEqualStrings("2024-01-15T10:30:00Z", acct.time);
671 try std.testing.expectEqual(false, acct.active);
672 try std.testing.expectEqual(AccountStatus.suspended, acct.status.?);
673}
674
675test "encode → decode commit frame with record" {
676 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
677 defer arena.deinit();
678 const alloc = arena.allocator();
679
680 const record: cbor.Value = .{ .map = &.{
681 .{ .key = "$type", .value = .{ .text = "app.bsky.feed.post" } },
682 .{ .key = "text", .value = .{ .text = "hello firehose" } },
683 } };
684
685 const original = Event{ .commit = .{
686 .seq = 999,
687 .repo = "did:plc:poster",
688 .rev = "3k2abc000000",
689 .time = "2024-01-15T10:30:00Z",
690 .since = "3k2abd000000",
691 .ops = &.{.{
692 .action = .create,
693 .collection = "app.bsky.feed.post",
694 .rkey = "3k2abc",
695 .record = record,
696 }},
697 } };
698
699 const frame = try encodeFrame(alloc, original);
700 const decoded = try decodeFrame(alloc, frame);
701
702 const commit = decoded.commit;
703 try std.testing.expectEqual(@as(i64, 999), commit.seq);
704 try std.testing.expectEqualStrings("did:plc:poster", commit.repo);
705 try std.testing.expectEqualStrings("3k2abc000000", commit.rev);
706 try std.testing.expectEqualStrings("2024-01-15T10:30:00Z", commit.time);
707 try std.testing.expectEqualStrings("3k2abd000000", commit.since.?);
708 try std.testing.expectEqual(@as(usize, 0), commit.blobs.len);
709 try std.testing.expectEqual(@as(usize, 1), commit.ops.len);
710
711 const op = commit.ops[0];
712 try std.testing.expectEqual(CommitAction.create, op.action);
713 try std.testing.expectEqualStrings("app.bsky.feed.post", op.collection);
714 try std.testing.expectEqualStrings("3k2abc", op.rkey);
715 try std.testing.expect(op.cid != null);
716
717 // record should be decoded from the CAR blocks
718 const rec = op.record.?;
719 try std.testing.expectEqualStrings("hello firehose", rec.getString("text").?);
720 try std.testing.expectEqualStrings("app.bsky.feed.post", rec.getString("$type").?);
721}
722
723test "encode → decode commit with delete (no record)" {
724 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
725 defer arena.deinit();
726 const alloc = arena.allocator();
727
728 const original = Event{ .commit = .{
729 .seq = 500,
730 .repo = "did:plc:deleter",
731 .rev = "3k2xyz000000",
732 .time = "2024-01-15T10:30:00Z",
733 .ops = &.{.{
734 .action = .delete,
735 .collection = "app.bsky.feed.post",
736 .rkey = "abc123",
737 .record = null,
738 }},
739 } };
740
741 const frame = try encodeFrame(alloc, original);
742 const decoded = try decodeFrame(alloc, frame);
743
744 try std.testing.expectEqual(@as(i64, 500), decoded.commit.seq);
745 try std.testing.expectEqualStrings("3k2xyz000000", decoded.commit.rev);
746 try std.testing.expectEqualStrings("2024-01-15T10:30:00Z", decoded.commit.time);
747 try std.testing.expectEqual(@as(usize, 1), decoded.commit.ops.len);
748 try std.testing.expectEqual(CommitAction.delete, decoded.commit.ops[0].action);
749 try std.testing.expect(decoded.commit.ops[0].cid == null);
750 try std.testing.expect(decoded.commit.ops[0].record == null);
751}