forked from
zat.dev/zat
atproto utils for zig
1//! firehose smoke test — connects to the live AT Protocol firehose,
2//! decodes CBOR frames, parses CAR blocks, and verifies CIDs.
3//! exercises the full CBOR → CAR → CID pipeline on production data.
4//!
5//! run: just firehose-smoke
6
7const std = @import("std");
8const zat = @import("zat");
9
10pub fn main() !void {
11 var da: std.heap.DebugAllocator(.{}) = .init;
12 defer _ = da.deinit();
13 const allocator = da.allocator();
14
15 std.debug.print("firehose smoke test starting (CBOR + CAR + CID on live data)\n", .{});
16
17 var handler = Handler{};
18 var client = zat.FirehoseClient.init(std.Options.debug_io, allocator, .{});
19 try client.subscribe(&handler);
20}
21
22const Handler = struct {
23 count: u64 = 0,
24 commits: u64 = 0,
25 records: u64 = 0,
26 connects: u64 = 0,
27 errors: u64 = 0,
28
29 pub fn onEvent(self: *Handler, event: zat.FirehoseEvent) void {
30 self.count += 1;
31
32 switch (event) {
33 .commit => |commit| {
34 self.commits += 1;
35 for (commit.ops) |op| {
36 if (op.record) |record| {
37 // record was decoded from CAR blocks via CBOR
38 _ = record.getString("$type");
39 self.records += 1;
40 }
41 }
42 },
43 else => {},
44 }
45
46 if (self.count % 1000 == 0) {
47 std.debug.print(" [{d}] commits={d} records={d} errors={d}\n", .{
48 self.count, self.commits, self.records, self.errors,
49 });
50 }
51
52 // stop after 10k events
53 if (self.count >= 10000) {
54 std.debug.print("\nfirehose smoke test PASSED\n", .{});
55 std.debug.print(" {d} events, {d} commits, {d} records decoded, {d} errors\n", .{
56 self.count, self.commits, self.records, self.errors,
57 });
58 std.process.exit(0);
59 }
60 }
61
62 pub fn onConnect(self: *Handler, host: []const u8) void {
63 self.connects += 1;
64 std.debug.print("CONNECT #{d} to {s}\n", .{ self.connects, host });
65 }
66
67 pub fn onError(self: *Handler, err: anyerror) void {
68 self.errors += 1;
69 std.debug.print("ERROR: {s}\n", .{@errorName(err)});
70 }
71};