atproto utils for zig zat.dev
atproto sdk zig
26
fork

Configure Feed

Select the types of activity you want to include in your feed.

at codex/xrpc-errors-retry 751 lines 27 kB view raw
1//! firehose codec - com.atproto.sync.subscribeRepos 2//! 3//! encode and decode AT Protocol firehose events over WebSocket. messages are 4//! DAG-CBOR encoded (unlike jetstream, which is JSON). includes frame encoding/ 5//! decoding, CAR block packing, and CID creation for records. 6//! 7//! wire format per frame: 8//! [DAG-CBOR header: {op, t}] [DAG-CBOR payload: {seq, repo, ops, blocks, ...}] 9//! 10//! see: https://atproto.com/specs/event-stream 11 12const std = @import("std"); 13const websocket = @import("websocket"); 14const cbor = @import("../repo/cbor.zig"); 15const car = @import("../repo/car.zig"); 16const sync = @import("sync.zig"); 17 18const mem = std.mem; 19const Allocator = mem.Allocator; 20const posix = std.posix; 21const Io = std.Io; 22const log = std.log.scoped(.zat); 23 24pub const CommitAction = sync.CommitAction; 25pub const AccountStatus = sync.AccountStatus; 26 27pub const default_hosts = [_][]const u8{ 28 "bsky.network", 29 "northamerica.firehose.network", 30 "europe.firehose.network", 31 "asia.firehose.network", 32}; 33 34pub const Options = struct { 35 hosts: []const []const u8 = &default_hosts, 36 cursor: ?i64 = null, 37 max_message_size: usize = 5 * 1024 * 1024, // 5MB — firehose frames can be large 38}; 39 40/// decoded firehose event 41pub const Event = union(enum) { 42 commit: CommitEvent, 43 identity: IdentityEvent, 44 account: AccountEvent, 45 info: InfoEvent, 46 47 pub fn seq(self: Event) ?i64 { 48 return switch (self) { 49 .commit => |c| c.seq, 50 .identity => |i| i.seq, 51 .account => |a| a.seq, 52 .info => null, 53 }; 54 } 55}; 56 57pub const CommitEvent = struct { 58 seq: i64, 59 repo: []const u8, // DID 60 rev: []const u8, // TID — revision of the commit 61 time: []const u8, // datetime — when event was received 62 since: ?[]const u8 = null, // TID — rev of preceding commit (null = full repo export) 63 commit: ?cbor.Cid = null, // CID of the commit object 64 ops: []const RepoOp, 65 blobs: []const cbor.Cid = &.{}, // new blobs referenced by records in this commit 66 too_big: bool = false, 67}; 68 69pub const RepoOp = struct { 70 action: CommitAction, 71 collection: []const u8, 72 rkey: []const u8, 73 cid: ?cbor.Cid = null, // CID of the record (null for deletes) 74 record: ?cbor.Value = null, // decoded DAG-CBOR record from CAR block 75}; 76 77pub const IdentityEvent = struct { 78 seq: i64, 79 did: []const u8, 80 time: []const u8, // datetime — when event was received 81 handle: ?[]const u8 = null, 82}; 83 84pub const AccountEvent = struct { 85 seq: i64, 86 did: []const u8, 87 time: []const u8, // datetime — when event was received 88 active: bool = true, 89 status: ?AccountStatus = null, 90}; 91 92pub const InfoEvent = struct { 93 name: ?[]const u8 = null, 94 message: ?[]const u8 = null, 95}; 96 97/// frame header from the wire 98const FrameHeader = struct { 99 op: i64, 100 t: ?[]const u8 = null, 101}; 102 103pub const FrameOp = enum(i64) { 104 message = 1, 105 err = -1, 106}; 107 108pub const DecodeError = error{ 109 InvalidFrame, 110 InvalidHeader, 111 UnexpectedEof, 112 MissingField, 113 UnknownOp, 114 UnknownEventType, 115} || cbor.DecodeError || car.CarError; 116 117/// decode a raw WebSocket binary frame into a firehose Event 118pub fn decodeFrame(allocator: Allocator, data: []const u8) DecodeError!Event { 119 // frame = [CBOR header] [CBOR payload] concatenated 120 const header_result = try cbor.decode(allocator, data); 121 const header_val = header_result.value; 122 const payload_data = data[header_result.consumed..]; 123 124 // parse header 125 const op = header_val.getInt("op") orelse return error.InvalidHeader; 126 if (op == -1) return error.UnknownOp; // error frame 127 128 const t = header_val.getString("t") orelse return error.InvalidHeader; 129 130 // decode payload 131 const payload = try cbor.decodeAll(allocator, payload_data); 132 133 if (mem.eql(u8, t, "#commit")) { 134 return try decodeCommit(allocator, payload); 135 } else if (mem.eql(u8, t, "#identity")) { 136 return decodeIdentity(payload); 137 } else if (mem.eql(u8, t, "#account")) { 138 return decodeAccount(payload); 139 } else if (mem.eql(u8, t, "#info")) { 140 return .{ .info = .{ 141 .name = payload.getString("name"), 142 .message = payload.getString("message"), 143 } }; 144 } 145 146 return error.UnknownEventType; 147} 148 149fn decodeCommit(allocator: Allocator, payload: cbor.Value) DecodeError!Event { 150 const seq_val = payload.getInt("seq") orelse return error.MissingField; 151 const repo = payload.getString("repo") orelse return error.MissingField; 152 const rev = payload.getString("rev") orelse return error.MissingField; 153 const time = payload.getString("time") orelse return error.MissingField; 154 155 // parse commit CID 156 var commit_cid: ?cbor.Cid = null; 157 if (payload.get("commit")) |commit_val| { 158 switch (commit_val) { 159 .cid => |c| commit_cid = c, 160 else => {}, 161 } 162 } 163 164 // parse blobs array (array of CID links) 165 var blobs: std.ArrayList(cbor.Cid) = .empty; 166 if (payload.getArray("blobs")) |blob_values| { 167 for (blob_values) |blob_val| { 168 switch (blob_val) { 169 .cid => |c| try blobs.append(allocator, c), 170 else => {}, 171 } 172 } 173 } 174 175 // parse CAR blocks 176 const blocks_bytes = payload.getBytes("blocks"); 177 var parsed_car: ?car.Car = null; 178 if (blocks_bytes) |b| { 179 parsed_car = car.read(allocator, b) catch null; 180 } 181 182 // parse ops 183 const ops_array = payload.getArray("ops"); 184 var ops: std.ArrayList(RepoOp) = .empty; 185 186 if (ops_array) |op_values| { 187 for (op_values) |op_val| { 188 const action_str = op_val.getString("action") orelse continue; 189 const action = CommitAction.parse(action_str) orelse continue; 190 const path = op_val.getString("path") orelse continue; 191 192 // split path into collection/rkey 193 const slash = mem.indexOfScalar(u8, path, '/') orelse continue; 194 const collection = path[0..slash]; 195 const rkey = path[slash + 1 ..]; 196 197 // extract CID from op and look up record from CAR blocks 198 var op_cid: ?cbor.Cid = null; 199 var record: ?cbor.Value = null; 200 if (op_val.get("cid")) |cid_val| { 201 switch (cid_val) { 202 .cid => |cid| { 203 op_cid = cid; 204 if (parsed_car) |c| { 205 if (car.findBlock(c, cid.raw)) |block_data| { 206 record = cbor.decodeAll(allocator, block_data) catch null; 207 } 208 } 209 }, 210 else => {}, 211 } 212 } 213 214 try ops.append(allocator, .{ 215 .action = action, 216 .collection = collection, 217 .rkey = rkey, 218 .cid = op_cid, 219 .record = record, 220 }); 221 } 222 } 223 224 return .{ .commit = .{ 225 .seq = seq_val, 226 .repo = repo, 227 .rev = rev, 228 .time = time, 229 .since = payload.getString("since"), 230 .commit = commit_cid, 231 .ops = try ops.toOwnedSlice(allocator), 232 .blobs = try blobs.toOwnedSlice(allocator), 233 .too_big = payload.getBool("tooBig") orelse false, 234 } }; 235} 236 237fn decodeIdentity(payload: cbor.Value) DecodeError!Event { 238 return .{ .identity = .{ 239 .seq = payload.getInt("seq") orelse return error.MissingField, 240 .did = payload.getString("did") orelse return error.MissingField, 241 .time = payload.getString("time") orelse return error.MissingField, 242 .handle = payload.getString("handle"), 243 } }; 244} 245 246fn decodeAccount(payload: cbor.Value) DecodeError!Event { 247 const status_str = payload.getString("status"); 248 return .{ .account = .{ 249 .seq = payload.getInt("seq") orelse return error.MissingField, 250 .did = payload.getString("did") orelse return error.MissingField, 251 .time = payload.getString("time") orelse return error.MissingField, 252 .active = payload.getBool("active") orelse true, 253 .status = if (status_str) |s| AccountStatus.parse(s) else null, 254 } }; 255} 256 257// === encoder === 258 259/// encode a firehose Event into a wire frame: [DAG-CBOR header] [DAG-CBOR payload] 260pub fn encodeFrame(allocator: Allocator, event: Event) ![]u8 { 261 var aw: std.Io.Writer.Allocating = .init(allocator); 262 errdefer aw.deinit(); 263 264 const tag = switch (event) { 265 .commit => "#commit", 266 .identity => "#identity", 267 .account => "#account", 268 .info => "#info", 269 }; 270 271 // encode header: {op: 1, t: "#..."} 272 const header: cbor.Value = .{ .map = &.{ 273 .{ .key = "op", .value = .{ .unsigned = 1 } }, 274 .{ .key = "t", .value = .{ .text = tag } }, 275 } }; 276 try cbor.encode(allocator, &aw.writer, header); 277 278 // encode payload based on event type 279 switch (event) { 280 .commit => |commit| try encodeCommitPayload(allocator, &aw.writer, commit), 281 .identity => |id| try encodeIdentityPayload(allocator, &aw.writer, id), 282 .account => |acct| try encodeAccountPayload(allocator, &aw.writer, acct), 283 .info => |inf| try encodeInfoPayload(allocator, &aw.writer, inf), 284 } 285 286 return try aw.toOwnedSlice(); 287} 288 289fn encodeCommitPayload(allocator: Allocator, writer: anytype, commit: CommitEvent) !void { 290 // build ops array and CAR blocks simultaneously 291 var op_values: std.ArrayList(cbor.Value) = .empty; 292 defer op_values.deinit(allocator); 293 var car_blocks: std.ArrayList(car.Block) = .empty; 294 defer car_blocks.deinit(allocator); 295 var root_cids: std.ArrayList(cbor.Cid) = .empty; 296 defer root_cids.deinit(allocator); 297 298 for (commit.ops) |op| { 299 const action_str: []const u8 = @tagName(op.action); 300 const path = try std.fmt.allocPrint(allocator, "{s}/{s}", .{ op.collection, op.rkey }); 301 302 if (op.record) |record| { 303 // encode record, create CID, add to CAR blocks 304 const record_bytes = try cbor.encodeAlloc(allocator, record); 305 const cid = try cbor.Cid.forDagCbor(allocator, record_bytes); 306 307 try car_blocks.append(allocator, .{ 308 .cid_raw = cid.raw, 309 .data = record_bytes, 310 }); 311 312 if (root_cids.items.len == 0) { 313 try root_cids.append(allocator, cid); 314 } 315 316 try op_values.append(allocator, .{ .map = @constCast(&[_]cbor.Value.MapEntry{ 317 .{ .key = "action", .value = .{ .text = action_str } }, 318 .{ .key = "cid", .value = .{ .cid = cid } }, 319 .{ .key = "path", .value = .{ .text = path } }, 320 }) }); 321 } else { 322 try op_values.append(allocator, .{ .map = @constCast(&[_]cbor.Value.MapEntry{ 323 .{ .key = "action", .value = .{ .text = action_str } }, 324 .{ .key = "path", .value = .{ .text = path } }, 325 }) }); 326 } 327 } 328 329 // build CAR file from blocks 330 const car_data = car.Car{ 331 .roots = root_cids.items, 332 .blocks = car_blocks.items, 333 }; 334 const blocks_bytes = try car.writeAlloc(allocator, car_data); 335 336 // build blobs array 337 var blob_values: std.ArrayList(cbor.Value) = .empty; 338 defer blob_values.deinit(allocator); 339 for (commit.blobs) |blob| { 340 try blob_values.append(allocator, .{ .cid = blob }); 341 } 342 343 // build payload entries 344 var entries: std.ArrayList(cbor.Value.MapEntry) = .empty; 345 defer entries.deinit(allocator); 346 347 try entries.append(allocator, .{ .key = "blocks", .value = .{ .bytes = blocks_bytes } }); 348 if (commit.commit) |c| { 349 try entries.append(allocator, .{ .key = "commit", .value = .{ .cid = c } }); 350 } 351 try entries.append(allocator, .{ .key = "blobs", .value = .{ .array = blob_values.items } }); 352 try entries.append(allocator, .{ .key = "ops", .value = .{ .array = op_values.items } }); 353 try entries.append(allocator, .{ .key = "repo", .value = .{ .text = commit.repo } }); 354 try entries.append(allocator, .{ .key = "rev", .value = .{ .text = commit.rev } }); 355 try entries.append(allocator, .{ .key = "seq", .value = .{ .unsigned = @intCast(commit.seq) } }); 356 if (commit.since) |s| { 357 try entries.append(allocator, .{ .key = "since", .value = .{ .text = s } }); 358 } 359 try entries.append(allocator, .{ .key = "time", .value = .{ .text = commit.time } }); 360 if (commit.too_big) { 361 try entries.append(allocator, .{ .key = "tooBig", .value = .{ .boolean = true } }); 362 } 363 364 try cbor.encode(allocator, writer, .{ .map = entries.items }); 365} 366 367fn encodeIdentityPayload(allocator: Allocator, writer: anytype, identity: IdentityEvent) !void { 368 var entries: std.ArrayList(cbor.Value.MapEntry) = .empty; 369 defer entries.deinit(allocator); 370 371 try entries.append(allocator, .{ .key = "did", .value = .{ .text = identity.did } }); 372 if (identity.handle) |h| { 373 try entries.append(allocator, .{ .key = "handle", .value = .{ .text = h } }); 374 } 375 try entries.append(allocator, .{ .key = "seq", .value = .{ .unsigned = @intCast(identity.seq) } }); 376 try entries.append(allocator, .{ .key = "time", .value = .{ .text = identity.time } }); 377 378 try cbor.encode(allocator, writer, .{ .map = entries.items }); 379} 380 381fn encodeAccountPayload(allocator: Allocator, writer: anytype, account: AccountEvent) !void { 382 var entries: std.ArrayList(cbor.Value.MapEntry) = .empty; 383 defer entries.deinit(allocator); 384 385 if (!account.active) { 386 try entries.append(allocator, .{ .key = "active", .value = .{ .boolean = false } }); 387 } 388 try entries.append(allocator, .{ .key = "did", .value = .{ .text = account.did } }); 389 try entries.append(allocator, .{ .key = "seq", .value = .{ .unsigned = @intCast(account.seq) } }); 390 if (account.status) |s| { 391 try entries.append(allocator, .{ .key = "status", .value = .{ .text = @tagName(s) } }); 392 } 393 try entries.append(allocator, .{ .key = "time", .value = .{ .text = account.time } }); 394 395 try cbor.encode(allocator, writer, .{ .map = entries.items }); 396} 397 398fn encodeInfoPayload(allocator: Allocator, writer: anytype, info: InfoEvent) !void { 399 var entries: std.ArrayList(cbor.Value.MapEntry) = .empty; 400 defer entries.deinit(allocator); 401 402 if (info.message) |m| { 403 try entries.append(allocator, .{ .key = "message", .value = .{ .text = m } }); 404 } 405 if (info.name) |n| { 406 try entries.append(allocator, .{ .key = "name", .value = .{ .text = n } }); 407 } 408 409 try cbor.encode(allocator, writer, .{ .map = entries.items }); 410} 411 412pub const FirehoseClient = struct { 413 io: Io, 414 allocator: Allocator, 415 options: Options, 416 last_seq: ?i64 = null, 417 418 pub fn init(io: Io, allocator: Allocator, options: Options) FirehoseClient { 419 return .{ 420 .io = io, 421 .allocator = allocator, 422 .options = options, 423 .last_seq = if (options.cursor) |c| c else null, 424 }; 425 } 426 427 pub fn deinit(_: *FirehoseClient) void {} 428 429 /// subscribe with a user-provided handler. 430 /// handler must implement: fn onEvent(*@TypeOf(handler), Event) void 431 /// optional: fn onError(*@TypeOf(handler), anyerror) void 432 /// blocks forever — reconnects with exponential backoff on disconnect. 433 /// rotates through hosts on each reconnect attempt. 434 pub fn subscribe(self: *FirehoseClient, handler: anytype) Io.Cancelable!void { 435 var backoff: u64 = 1; 436 var host_index: usize = 0; 437 const max_backoff: u64 = 60; 438 var prev_host_index: usize = 0; 439 440 while (true) { 441 const host = self.options.hosts[host_index % self.options.hosts.len]; 442 const effective_index = host_index % self.options.hosts.len; 443 444 // reset backoff on host switch (fresh host deserves a fresh chance) 445 if (host_index > 0 and effective_index != prev_host_index) { 446 backoff = 1; 447 } 448 449 log.info("connecting to host {d}/{d}: {s}", .{ effective_index + 1, self.options.hosts.len, host }); 450 451 self.connectAndRead(host, handler) catch |err| { 452 if (comptime @hasDecl(@TypeOf(handler.*), "onError")) { 453 handler.onError(err); 454 } else { 455 log.err("firehose error: {s}, reconnecting in {d}s...", .{ @errorName(err), backoff }); 456 } 457 }; 458 459 prev_host_index = effective_index; 460 host_index += 1; 461 try self.io.sleep(Io.Duration.fromSeconds(@intCast(backoff)), .awake); 462 backoff = @min(backoff * 2, max_backoff); 463 } 464 } 465 466 fn connectAndRead(self: *FirehoseClient, host: []const u8, handler: anytype) !void { 467 var path_buf: [256]u8 = undefined; 468 var w: std.Io.Writer = .fixed(&path_buf); 469 470 try w.writeAll("/xrpc/com.atproto.sync.subscribeRepos"); 471 if (self.last_seq) |cursor| { 472 try w.print("?cursor={d}", .{cursor}); 473 } 474 const path = w.buffered(); 475 476 log.info("connecting to wss://{s}{s}", .{ host, path }); 477 478 var client = try websocket.Client.init(self.io, self.allocator, .{ 479 .host = host, 480 .port = 443, 481 .tls = true, 482 .max_size = self.options.max_message_size, 483 }); 484 defer client.deinit(); 485 486 var host_header_buf: [256]u8 = undefined; 487 const host_header = std.fmt.bufPrint(&host_header_buf, "Host: {s}\r\n", .{host}) catch host; 488 489 try client.handshake(path, .{ .headers = host_header }); 490 configureKeepalive(&client); 491 492 log.info("firehose connected to {s}", .{host}); 493 494 var ws_handler = WsHandler(@TypeOf(handler.*)){ 495 .allocator = self.allocator, 496 .handler = handler, 497 .client_state = self, 498 }; 499 try client.readLoop(&ws_handler); 500 } 501}; 502 503fn WsHandler(comptime H: type) type { 504 return struct { 505 allocator: Allocator, 506 handler: *H, 507 client_state: *FirehoseClient, 508 509 const Self = @This(); 510 511 pub fn serverMessage(self: *Self, data: []const u8) !void { 512 var arena = std.heap.ArenaAllocator.init(self.allocator); 513 defer arena.deinit(); 514 515 const event = decodeFrame(arena.allocator(), data) catch |err| { 516 log.debug("frame decode error: {s}", .{@errorName(err)}); 517 return; 518 }; 519 520 if (event.seq()) |s| { 521 self.client_state.last_seq = s; 522 } 523 524 self.handler.onEvent(event); 525 } 526 527 pub fn close(_: *Self) void { 528 log.info("firehose connection closed", .{}); 529 } 530 }; 531} 532 533/// enable TCP keepalive so reads don't block forever when a peer 534/// disappears without FIN/RST (network partition, crash, power loss). 535/// detection time: 10s idle + 5s × 2 probes = 20s. 536fn configureKeepalive(client: *websocket.Client) void { 537 const fd = client.stream.stream.socket.handle; 538 const builtin = @import("builtin"); 539 posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.KEEPALIVE, &std.mem.toBytes(@as(i32, 1))) catch return; 540 const tcp: i32 = @intCast(posix.IPPROTO.TCP); 541 if (builtin.os.tag == .linux) { 542 posix.setsockopt(fd, tcp, posix.TCP.KEEPIDLE, &std.mem.toBytes(@as(i32, 10))) catch return; 543 } else if (builtin.os.tag == .macos) { 544 posix.setsockopt(fd, tcp, posix.TCP.KEEPALIVE, &std.mem.toBytes(@as(i32, 10))) catch return; 545 } 546 posix.setsockopt(fd, tcp, posix.TCP.KEEPINTVL, &std.mem.toBytes(@as(i32, 5))) catch return; 547 posix.setsockopt(fd, tcp, posix.TCP.KEEPCNT, &std.mem.toBytes(@as(i32, 2))) catch return; 548} 549 550// === tests === 551 552test "decode frame header" { 553 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 554 defer arena.deinit(); 555 const alloc = arena.allocator(); 556 557 // simulate a frame: header {op: 1, t: "#info"} + payload {name: "OutdatedCursor"} 558 const header_bytes = [_]u8{ 559 0xa2, // map(2) 560 0x62, 'o', 'p', 0x01, // "op": 1 561 0x61, 't', 0x65, '#', 'i', 'n', 'f', 'o', // "t": "#info" 562 }; 563 const payload_bytes = [_]u8{ 564 0xa1, // map(1) 565 0x64, 'n', 'a', 'm', 'e', // "name" 566 0x6e, 'O', 'u', 't', 'd', 'a', 't', 'e', 'd', 'C', 'u', 'r', 's', 'o', 'r', // "OutdatedCursor" 567 }; 568 569 var frame: [header_bytes.len + payload_bytes.len]u8 = undefined; 570 @memcpy(frame[0..header_bytes.len], &header_bytes); 571 @memcpy(frame[header_bytes.len..], &payload_bytes); 572 573 const event = try decodeFrame(alloc, &frame); 574 const info = event.info; 575 try std.testing.expectEqualStrings("OutdatedCursor", info.name.?); 576} 577 578test "decode identity frame" { 579 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 580 defer arena.deinit(); 581 const alloc = arena.allocator(); 582 583 // build frame via encoder for cleaner test 584 const original = Event{ .identity = .{ 585 .seq = 42, 586 .did = "did:plc:test", 587 .time = "2024-01-15T10:30:00Z", 588 } }; 589 const frame = try encodeFrame(alloc, original); 590 591 const event = try decodeFrame(alloc, frame); 592 const identity = event.identity; 593 try std.testing.expectEqual(@as(i64, 42), identity.seq); 594 try std.testing.expectEqualStrings("did:plc:test", identity.did); 595 try std.testing.expectEqualStrings("2024-01-15T10:30:00Z", identity.time); 596} 597 598test "Event.seq works" { 599 const info_event = Event{ .info = .{ .name = "test" } }; 600 try std.testing.expect(info_event.seq() == null); 601 602 const identity_event = Event{ .identity = .{ 603 .seq = 42, 604 .did = "did:plc:test", 605 .time = "2024-01-15T10:30:00Z", 606 } }; 607 try std.testing.expectEqual(@as(i64, 42), identity_event.seq().?); 608} 609 610// === encoder tests === 611 612test "encode → decode info frame" { 613 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 614 defer arena.deinit(); 615 const alloc = arena.allocator(); 616 617 const original = Event{ .info = .{ 618 .name = "OutdatedCursor", 619 .message = "cursor is behind", 620 } }; 621 622 const frame = try encodeFrame(alloc, original); 623 const decoded = try decodeFrame(alloc, frame); 624 625 try std.testing.expectEqualStrings("OutdatedCursor", decoded.info.name.?); 626 try std.testing.expectEqualStrings("cursor is behind", decoded.info.message.?); 627} 628 629test "encode → decode identity frame" { 630 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 631 defer arena.deinit(); 632 const alloc = arena.allocator(); 633 634 const original = Event{ .identity = .{ 635 .seq = 42, 636 .did = "did:plc:test123", 637 .time = "2024-01-15T10:30:00Z", 638 .handle = "alice.bsky.social", 639 } }; 640 641 const frame = try encodeFrame(alloc, original); 642 const decoded = try decodeFrame(alloc, frame); 643 644 const id = decoded.identity; 645 try std.testing.expectEqual(@as(i64, 42), id.seq); 646 try std.testing.expectEqualStrings("did:plc:test123", id.did); 647 try std.testing.expectEqualStrings("2024-01-15T10:30:00Z", id.time); 648 try std.testing.expectEqualStrings("alice.bsky.social", id.handle.?); 649} 650 651test "encode → decode account frame" { 652 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 653 defer arena.deinit(); 654 const alloc = arena.allocator(); 655 656 const original = Event{ .account = .{ 657 .seq = 100, 658 .did = "did:plc:suspended", 659 .time = "2024-01-15T10:30:00Z", 660 .active = false, 661 .status = .suspended, 662 } }; 663 664 const frame = try encodeFrame(alloc, original); 665 const decoded = try decodeFrame(alloc, frame); 666 667 const acct = decoded.account; 668 try std.testing.expectEqual(@as(i64, 100), acct.seq); 669 try std.testing.expectEqualStrings("did:plc:suspended", acct.did); 670 try std.testing.expectEqualStrings("2024-01-15T10:30:00Z", acct.time); 671 try std.testing.expectEqual(false, acct.active); 672 try std.testing.expectEqual(AccountStatus.suspended, acct.status.?); 673} 674 675test "encode → decode commit frame with record" { 676 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 677 defer arena.deinit(); 678 const alloc = arena.allocator(); 679 680 const record: cbor.Value = .{ .map = &.{ 681 .{ .key = "$type", .value = .{ .text = "app.bsky.feed.post" } }, 682 .{ .key = "text", .value = .{ .text = "hello firehose" } }, 683 } }; 684 685 const original = Event{ .commit = .{ 686 .seq = 999, 687 .repo = "did:plc:poster", 688 .rev = "3k2abc000000", 689 .time = "2024-01-15T10:30:00Z", 690 .since = "3k2abd000000", 691 .ops = &.{.{ 692 .action = .create, 693 .collection = "app.bsky.feed.post", 694 .rkey = "3k2abc", 695 .record = record, 696 }}, 697 } }; 698 699 const frame = try encodeFrame(alloc, original); 700 const decoded = try decodeFrame(alloc, frame); 701 702 const commit = decoded.commit; 703 try std.testing.expectEqual(@as(i64, 999), commit.seq); 704 try std.testing.expectEqualStrings("did:plc:poster", commit.repo); 705 try std.testing.expectEqualStrings("3k2abc000000", commit.rev); 706 try std.testing.expectEqualStrings("2024-01-15T10:30:00Z", commit.time); 707 try std.testing.expectEqualStrings("3k2abd000000", commit.since.?); 708 try std.testing.expectEqual(@as(usize, 0), commit.blobs.len); 709 try std.testing.expectEqual(@as(usize, 1), commit.ops.len); 710 711 const op = commit.ops[0]; 712 try std.testing.expectEqual(CommitAction.create, op.action); 713 try std.testing.expectEqualStrings("app.bsky.feed.post", op.collection); 714 try std.testing.expectEqualStrings("3k2abc", op.rkey); 715 try std.testing.expect(op.cid != null); 716 717 // record should be decoded from the CAR blocks 718 const rec = op.record.?; 719 try std.testing.expectEqualStrings("hello firehose", rec.getString("text").?); 720 try std.testing.expectEqualStrings("app.bsky.feed.post", rec.getString("$type").?); 721} 722 723test "encode → decode commit with delete (no record)" { 724 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 725 defer arena.deinit(); 726 const alloc = arena.allocator(); 727 728 const original = Event{ .commit = .{ 729 .seq = 500, 730 .repo = "did:plc:deleter", 731 .rev = "3k2xyz000000", 732 .time = "2024-01-15T10:30:00Z", 733 .ops = &.{.{ 734 .action = .delete, 735 .collection = "app.bsky.feed.post", 736 .rkey = "abc123", 737 .record = null, 738 }}, 739 } }; 740 741 const frame = try encodeFrame(alloc, original); 742 const decoded = try decodeFrame(alloc, frame); 743 744 try std.testing.expectEqual(@as(i64, 500), decoded.commit.seq); 745 try std.testing.expectEqualStrings("3k2xyz000000", decoded.commit.rev); 746 try std.testing.expectEqualStrings("2024-01-15T10:30:00Z", decoded.commit.time); 747 try std.testing.expectEqual(@as(usize, 1), decoded.commit.ops.len); 748 try std.testing.expectEqual(CommitAction.delete, decoded.commit.ops[0].action); 749 try std.testing.expect(decoded.commit.ops[0].cid == null); 750 try std.testing.expect(decoded.commit.ops[0].record == null); 751}