atproto utils for zig zat.dev
atproto sdk zig
26
fork

Configure Feed

Select the types of activity you want to include in your feed.

release: v0.3.0-alpha.24

add car.streamBlocks + BlockIterator for zero-alloc CAR iteration over
large repos. strictly additive — read/readWithOptions/Car/findBlock are
unchanged. shares parser helpers (readUvarint, cidLength, verifyBlockHash)
and the CarError set with the existing path, so future CAR fixes benefit
both.

motivated by ken (semantic search over a PDS) which trips the 200k block
guard on repos like pfrazee.com (~72 MB, 248k blocks). the existing
read() path eagerly builds a StringHashMapUnmanaged indexing every CID —
~12 MB of auxiliary metadata on top of whatever the caller is doing.
streamBlocks skips the hashmap and Block[] entirely; iterator is O(1)
space beyond the input buffer, and offset() lets callers build a CID →
byte-offset index that's strictly smaller.

tests:
- round-trip equivalence against read (same block ordering + bytes)
- corruption test confirms BadBlockHash still fires through the iterator
- ZAT_STRESS=1 gated test against cached /tmp/pfrazee.car (72 MB, 248k
blocks) — asserts stream + read see identical block count, total bytes,
and cid xor. not in CI.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+254 -1
+1
CHANGELOG.md
··· 8 8 - **fix**: `DidResolver.resolve` and `HttpTransport.fetch` propagate the underlying `std.http.Client.fetch` error instead of collapsing every transport-layer failure (DNS, connect, TLS) to `error.DidResolutionFailed` / `error.RequestFailed` — callers can distinguish failure modes via `@errorName(err)`. soft-breaking: the inferred error set widens 9 9 - **feat**: `Io.Timestamp` replaces libc `gettimeofday` in JWT/OAuth 10 10 - **feat**: `io.sleep()` replaces libc `nanosleep` in reconnect backoff (cancellation-aware) 11 + - **feat**: `car.streamBlocks` + `BlockIterator` — pull-style, zero-allocation CAR iteration for large repos (sync.getRepo). shares parser helpers with `read`/`readWithOptions`; yields slices into the input buffer and exposes `offset()` so callers can build their own CID → byte-offset index. strictly additive, no existing consumers affected. pass `max_size = data.len` for sync.getRepo responses — the 2 MB default targets firehose frames. 11 12 - **docs**: [devlog 008](devlog/008-the-io-migration.md) — the 0.16 migration 12 13 13 14 ## 0.2.18
+1 -1
build.zig.zon
··· 1 1 .{ 2 2 .name = .zat, 3 - .version = "0.3.0-alpha.23", 3 + .version = "0.3.0-alpha.24", 4 4 .fingerprint = 0x8da9db57ee82fbe4, 5 5 .minimum_zig_version = "0.16.0-dev.3070+b22eb176b", 6 6 .dependencies = .{
+252
src/internal/repo/car.zig
··· 148 148 }; 149 149 } 150 150 151 + // === streaming iterator === 152 + 153 + /// a single block yielded by BlockIterator. slices borrow from the input 154 + /// buffer and stay valid for the iterator's entire lifetime (the iterator 155 + /// never overwrites input data). 156 + pub const BlockEntry = struct { 157 + cid_raw: []const u8, 158 + data: []const u8, 159 + }; 160 + 161 + /// pull-style iterator over CAR blocks. yields one block per `next()` call 162 + /// and does zero allocation in the hot path — no hashmap, no block list. 163 + /// caller picks what to remember (CID → offset index, MST nodes only, etc.). 164 + /// 165 + /// blocks are slices into the original input buffer, so this is strictly 166 + /// cheaper than `read()` for streaming walks where random access isn't 167 + /// required upfront. 168 + pub const BlockIterator = struct { 169 + data: []const u8, 170 + pos: usize, 171 + verify: bool, 172 + max_per_block: usize, 173 + 174 + pub fn next(self: *BlockIterator) CarError!?BlockEntry { 175 + if (self.pos >= self.data.len) return null; 176 + 177 + const block_len = cbor.readUvarint(self.data, &self.pos) orelse return error.InvalidVarint; 178 + const block_len_usize = std.math.cast(usize, block_len) orelse return error.InvalidHeader; 179 + if (block_len_usize == 0) return error.InvalidCid; 180 + if (block_len_usize > self.max_per_block) return error.BlockTooLarge; 181 + const block_end = self.pos + block_len_usize; 182 + if (block_end > self.data.len) return error.UnexpectedEof; 183 + 184 + const block_data = self.data[self.pos..block_end]; 185 + const cid_len = cidLength(block_data) orelse return error.InvalidCid; 186 + if (cid_len > block_data.len) return error.InvalidCid; 187 + 188 + const cid_bytes = block_data[0..cid_len]; 189 + const content = block_data[cid_len..]; 190 + 191 + if (self.verify) try verifyBlockHash(cid_bytes, content); 192 + 193 + self.pos = block_end; 194 + return .{ .cid_raw = cid_bytes, .data = content }; 195 + } 196 + 197 + /// byte offset within the original input of the next block to be read. 198 + /// useful for building a CID → offset index while iterating. 199 + pub fn offset(self: BlockIterator) usize { 200 + return self.pos; 201 + } 202 + }; 203 + 204 + pub const StreamOptions = struct { 205 + verify_block_hashes: bool = true, 206 + /// max total CAR size in bytes. null = use default (2 MB). for a 207 + /// trusted response you fetched yourself, pass `data.len` to disable. 208 + max_size: ?usize = null, 209 + /// max size of any single block. null = use default (1 MB). 210 + max_block_size: ?usize = null, 211 + }; 212 + 213 + pub const StreamResult = struct { 214 + roots: []const cbor.Cid, 215 + iter: BlockIterator, 216 + }; 217 + 218 + /// parse only the CAR header and return an iterator over the remaining 219 + /// blocks. unlike `read`, this does not build a hashmap or collect a 220 + /// `Block[]` — it's O(1) space beyond the input buffer. zero-copy: the 221 + /// iterator yields slices into `data`. 222 + /// 223 + /// the caller owns `result.roots` (backed by `allocator`) and should free 224 + /// it via `allocator.free(result.roots)` when done. the iterator itself 225 + /// holds no allocations. 226 + pub fn streamBlocks(allocator: Allocator, data: []const u8, options: StreamOptions) CarError!StreamResult { 227 + if (data.len > (options.max_size orelse max_blocks_size)) return error.BlocksTooLarge; 228 + 229 + var pos: usize = 0; 230 + 231 + const header_len = cbor.readUvarint(data, &pos) orelse return error.InvalidVarint; 232 + const header_len_usize = std.math.cast(usize, header_len) orelse return error.InvalidHeader; 233 + const header_end = pos + header_len_usize; 234 + if (header_end > data.len) return error.UnexpectedEof; 235 + 236 + // decode header into a throwaway arena — we only need the roots array, 237 + // which we deep-copy (via cbor.Cid.raw slices pointing into `data`). 238 + var header_arena = std.heap.ArenaAllocator.init(allocator); 239 + defer header_arena.deinit(); 240 + const header = cbor.decodeAll(header_arena.allocator(), data[pos..header_end]) catch return error.InvalidHeader; 241 + 242 + const version = header.getUint("version") orelse return error.InvalidHeader; 243 + if (version != 1) return error.InvalidHeader; 244 + 245 + var roots: std.ArrayList(cbor.Cid) = .empty; 246 + errdefer roots.deinit(allocator); 247 + const root_values = header.getArray("roots") orelse return error.InvalidHeader; 248 + for (root_values) |root_val| { 249 + switch (root_val) { 250 + .cid => |c| try roots.append(allocator, c), 251 + else => return error.InvalidHeader, 252 + } 253 + } 254 + if (roots.items.len == 0) return error.InvalidHeader; 255 + 256 + return .{ 257 + .roots = try roots.toOwnedSlice(allocator), 258 + .iter = .{ 259 + .data = data, 260 + .pos = header_end, 261 + .verify = options.verify_block_hashes, 262 + .max_per_block = options.max_block_size orelse max_block_size, 263 + }, 264 + }; 265 + } 266 + 151 267 /// verify that block content hashes to the digest in its CID 152 268 fn verifyBlockHash(cid_bytes: []const u8, content: []const u8) CarError!void { 153 269 const cid = cbor.Cid{ .raw = cid_bytes }; ··· 422 538 const c = Car{ .roots = &.{}, .blocks = &.{} }; 423 539 try std.testing.expect(findBlock(c, "nonexistent") == null); 424 540 } 541 + 542 + test "streamBlocks matches read output" { 543 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 544 + defer arena.deinit(); 545 + const alloc = arena.allocator(); 546 + 547 + // build a CAR with a handful of real DAG-CBOR blocks so the iterator has 548 + // something non-trivial to chew on 549 + const b1 = try cbor.encodeAlloc(alloc, .{ .map = &.{.{ .key = "k", .value = .{ .text = "v1" } }} }); 550 + const b2 = try cbor.encodeAlloc(alloc, .{ .map = &.{.{ .key = "k", .value = .{ .text = "v2" } }} }); 551 + const b3 = try cbor.encodeAlloc(alloc, .{ .map = &.{.{ .key = "k", .value = .{ .text = "v3" } }} }); 552 + const c1 = try cbor.Cid.forDagCbor(alloc, b1); 553 + const c2 = try cbor.Cid.forDagCbor(alloc, b2); 554 + const c3 = try cbor.Cid.forDagCbor(alloc, b3); 555 + 556 + const src = Car{ 557 + .roots = &.{c1}, 558 + .blocks = &.{ 559 + .{ .cid_raw = c1.raw, .data = b1 }, 560 + .{ .cid_raw = c2.raw, .data = b2 }, 561 + .{ .cid_raw = c3.raw, .data = b3 }, 562 + }, 563 + }; 564 + const car_bytes = try writeAlloc(alloc, src); 565 + 566 + // reference: full read 567 + const parsed = try read(alloc, car_bytes); 568 + 569 + // streaming: should see every block in order, with identical bytes 570 + var stream = try streamBlocks(alloc, car_bytes, .{}); 571 + defer alloc.free(stream.roots); 572 + 573 + try std.testing.expectEqual(@as(usize, 1), stream.roots.len); 574 + try std.testing.expectEqualSlices(u8, parsed.roots[0].raw, stream.roots[0].raw); 575 + 576 + var i: usize = 0; 577 + while (try stream.iter.next()) |block| : (i += 1) { 578 + try std.testing.expect(i < parsed.blocks.len); 579 + try std.testing.expectEqualSlices(u8, parsed.blocks[i].cid_raw, block.cid_raw); 580 + try std.testing.expectEqualSlices(u8, parsed.blocks[i].data, block.data); 581 + } 582 + try std.testing.expectEqual(parsed.blocks.len, i); 583 + } 584 + 585 + // stress test: iterate pfrazee.com's CAR via streamBlocks and compare 586 + // against the full `read` path. gated on ZAT_STRESS=1 and expects the 587 + // CAR cached at /tmp/pfrazee.car (fetch once via curl — see ken's 588 + // streaming CAR notes). not in CI: too slow, requires network fetch. 589 + test "streamBlocks: pfrazee.com stress" { 590 + const stress_c = std.c.getenv("ZAT_STRESS") orelse return; 591 + const stress = std.mem.span(stress_c); 592 + if (!std.mem.eql(u8, stress, "1")) return; 593 + 594 + const io = std.Options.debug_io; 595 + const file = std.Io.Dir.openFileAbsolute(io, "/tmp/pfrazee.car", .{}) catch |err| { 596 + std.debug.print("pfrazee.car not found ({t}) — skip. fetch via:\n", .{err}); 597 + std.debug.print(" curl -L -o /tmp/pfrazee.car 'https://bsky.network/xrpc/com.atproto.sync.getRepo?did=did:plc:ragtjsm2j2vknwkz3zp4oxrd'\n", .{}); 598 + return; 599 + }; 600 + defer file.close(io); 601 + 602 + const file_len = try file.length(io); 603 + var mmap = try std.Io.File.MemoryMap.create(io, file, .{ 604 + .len = @intCast(file_len), 605 + .protection = .{ .read = true, .write = false }, 606 + .offset = 0, 607 + }); 608 + defer mmap.destroy(io); 609 + const bytes: []const u8 = mmap.memory[0..@intCast(file_len)]; 610 + 611 + std.debug.print("\npfrazee CAR: {d:.2} MB (mmap'd)\n", .{@as(f64, @floatFromInt(bytes.len)) / (1024.0 * 1024.0)}); 612 + 613 + // --- streaming path: iterate blocks, accumulate a few invariants --- 614 + var stream = try streamBlocks(std.testing.allocator, bytes, .{ 615 + .verify_block_hashes = true, 616 + .max_size = bytes.len, 617 + }); 618 + defer std.testing.allocator.free(stream.roots); 619 + 620 + var stream_block_count: usize = 0; 621 + var stream_total_bytes: usize = 0; 622 + var stream_cid_xor: u8 = 0; 623 + while (try stream.iter.next()) |block| { 624 + stream_block_count += 1; 625 + stream_total_bytes += block.data.len; 626 + for (block.cid_raw) |b| stream_cid_xor ^= b; 627 + } 628 + 629 + // --- reference path: full read (builds hashmap) --- 630 + var ref_arena = std.heap.ArenaAllocator.init(std.testing.allocator); 631 + defer ref_arena.deinit(); 632 + const ref = try readWithOptions(ref_arena.allocator(), bytes, .{ 633 + .verify_block_hashes = true, 634 + .max_size = bytes.len, 635 + .max_blocks = bytes.len, 636 + }); 637 + 638 + var ref_total_bytes: usize = 0; 639 + var ref_cid_xor: u8 = 0; 640 + for (ref.blocks) |block| { 641 + ref_total_bytes += block.data.len; 642 + for (block.cid_raw) |b| ref_cid_xor ^= b; 643 + } 644 + 645 + std.debug.print( 646 + "stream: {d:>8} blocks, {d:>10} bytes, xor=0x{x:0>2}\n", 647 + .{ stream_block_count, stream_total_bytes, stream_cid_xor }, 648 + ); 649 + std.debug.print( 650 + "read: {d:>8} blocks, {d:>10} bytes, xor=0x{x:0>2}\n", 651 + .{ ref.blocks.len, ref_total_bytes, ref_cid_xor }, 652 + ); 653 + 654 + try std.testing.expectEqual(ref.blocks.len, stream_block_count); 655 + try std.testing.expectEqual(ref_total_bytes, stream_total_bytes); 656 + try std.testing.expectEqual(ref_cid_xor, stream_cid_xor); 657 + } 658 + 659 + test "streamBlocks rejects corrupted block with verify on" { 660 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 661 + defer arena.deinit(); 662 + const alloc = arena.allocator(); 663 + 664 + const real = try cbor.encodeAlloc(alloc, .{ .map = &.{.{ .key = "k", .value = .{ .text = "original" } }} }); 665 + const cid = try cbor.Cid.forDagCbor(alloc, real); 666 + 667 + const tampered = Car{ 668 + .roots = &.{cid}, 669 + .blocks = &.{.{ .cid_raw = cid.raw, .data = "tampered" }}, 670 + }; 671 + const car_bytes = try writeAlloc(alloc, tampered); 672 + 673 + var stream = try streamBlocks(alloc, car_bytes, .{}); 674 + defer alloc.free(stream.roots); 675 + try std.testing.expectError(error.BadBlockHash, stream.iter.next()); 676 + }