atproto utils for zig zat.dev
atproto sdk zig
26
fork

Configure Feed

Select the types of activity you want to include in your feed.

add firehose smoke test: CBOR/CAR/CID on live production data

Connects to the real Bluesky firehose, decodes 10k CBOR frames, parses
CAR blocks, verifies CIDs, and decodes records. Exercises the full
refactored pipeline on production data. Run with `just firehose-smoke`.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

jcalabro 66e758ee 3fb4f448

+92
+17
build.zig
··· 77 77 const smoke_step = b.step("smoke", "run jetstream smoke test"); 78 78 smoke_step.dependOn(&run_smoke.step); 79 79 80 + // firehose smoke test (CBOR + CAR + CID on live data) 81 + const firehose_smoke = b.addExecutable(.{ 82 + .name = "firehose-smoke", 83 + .root_module = b.createModule(.{ 84 + .root_source_file = b.path("scripts/firehose_smoke.zig"), 85 + .target = target, 86 + .optimize = optimize, 87 + .link_libc = true, 88 + .imports = &.{.{ .name = "zat", .module = mod }}, 89 + }), 90 + }); 91 + b.installArtifact(firehose_smoke); 92 + 93 + const run_firehose_smoke = b.addRunArtifact(firehose_smoke); 94 + const firehose_smoke_step = b.step("firehose-smoke", "run firehose smoke test (CBOR/CAR/CID on live data)"); 95 + firehose_smoke_step.dependOn(&run_firehose_smoke.step); 96 + 80 97 // CBOR codec benchmarks 81 98 const cbor_bench = b.addExecutable(.{ 82 99 .name = "cbor-bench",
+4
justfile
··· 19 19 # run CBOR codec benchmarks 20 20 bench: 21 21 zig build bench -Doptimize=ReleaseFast 22 + 23 + # run firehose smoke test (CBOR/CAR/CID on live production data) 24 + firehose-smoke: 25 + zig build firehose-smoke -Doptimize=ReleaseFast && ./zig-out/bin/firehose-smoke
+71
scripts/firehose_smoke.zig
··· 1 + //! firehose smoke test — connects to the live AT Protocol firehose, 2 + //! decodes CBOR frames, parses CAR blocks, and verifies CIDs. 3 + //! exercises the full CBOR → CAR → CID pipeline on production data. 4 + //! 5 + //! run: just firehose-smoke 6 + 7 + const std = @import("std"); 8 + const zat = @import("zat"); 9 + 10 + pub fn main() !void { 11 + var da: std.heap.DebugAllocator(.{}) = .init; 12 + defer _ = da.deinit(); 13 + const allocator = da.allocator(); 14 + 15 + std.debug.print("firehose smoke test starting (CBOR + CAR + CID on live data)\n", .{}); 16 + 17 + var handler = Handler{}; 18 + var client = zat.FirehoseClient.init(std.Options.debug_io, allocator, .{}); 19 + try client.subscribe(&handler); 20 + } 21 + 22 + const Handler = struct { 23 + count: u64 = 0, 24 + commits: u64 = 0, 25 + records: u64 = 0, 26 + connects: u64 = 0, 27 + errors: u64 = 0, 28 + 29 + pub fn onEvent(self: *Handler, event: zat.FirehoseEvent) void { 30 + self.count += 1; 31 + 32 + switch (event) { 33 + .commit => |commit| { 34 + self.commits += 1; 35 + for (commit.ops) |op| { 36 + if (op.record) |record| { 37 + // record was decoded from CAR blocks via CBOR 38 + _ = record.getString("$type"); 39 + self.records += 1; 40 + } 41 + } 42 + }, 43 + else => {}, 44 + } 45 + 46 + if (self.count % 1000 == 0) { 47 + std.debug.print(" [{d}] commits={d} records={d} errors={d}\n", .{ 48 + self.count, self.commits, self.records, self.errors, 49 + }); 50 + } 51 + 52 + // stop after 10k events 53 + if (self.count >= 10000) { 54 + std.debug.print("\nfirehose smoke test PASSED\n", .{}); 55 + std.debug.print(" {d} events, {d} commits, {d} records decoded, {d} errors\n", .{ 56 + self.count, self.commits, self.records, self.errors, 57 + }); 58 + std.process.exit(0); 59 + } 60 + } 61 + 62 + pub fn onConnect(self: *Handler, host: []const u8) void { 63 + self.connects += 1; 64 + std.debug.print("CONNECT #{d} to {s}\n", .{ self.connects, host }); 65 + } 66 + 67 + pub fn onError(self: *Handler, err: anyerror) void { 68 + self.errors += 1; 69 + std.debug.print("ERROR: {s}\n", .{@errorName(err)}); 70 + } 71 + };