this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: upgrade zig benches to 0.16

+182 -158
+8 -8
README.md
··· 195 195 196 196 | variant | frames/sec (median) | MB/s | live firehose headroom | verified | errors | 197 197 |---------|--------------------:|-----:|-----------------------:|---------:|------:| 198 - | zig relay hot path (arena reuse) | 9,793 | 49.5 | 16x | 15,865 | 0 | 199 - | zig relay hot path (alloc per frame) | 10,194 | 51.6 | 17x | 15,865 | 0 | 198 + | zig relay hot path (arena reuse) | 10,275 | 52.0 | 17x | 15,865 | 0 | 199 + | zig relay hot path (alloc per frame) | 10,510 | 53.2 | 18x | 15,865 | 0 | 200 200 201 201 stage breakdown from the median arena-reuse pass: 202 202 203 203 | stage | time/frame | share | 204 204 |-------|-----------:|------:| 205 - | CBOR decode | <1µs | 0.3% | 206 - | CAR parse + CID verification | 3µs | 3.4% | 207 - | signature verification | 98µs | 96.2% | 205 + | CBOR decode | <1µs | 0.5% | 206 + | CAR parse + CID verification | 3µs | 3.7% | 207 + | signature verification | 93µs | 95.7% | 208 208 209 209 signature verification dominates the combined path. this is why the standalone decode benchmarks show very large SDK differences, while the full relay hot path is bounded by ECDSA throughput once commit signatures are included. 210 210 ··· 241 241 242 242 | lang | SDK | version | CBOR engine | CAR engine | 243 243 |------|-----|---------|-------------|------------| 244 - | zig | [zat](https://tangled.sh/@zzstoatzz.io/zat) v0.2.18 + [k256](https://tangled.sh/@zzstoatzz.io/k256) v0.0.4 | — | hand-rolled | hand-rolled (+ SHA-256 CID verify, size limits) | 244 + | zig | [zat](https://tangled.sh/@zzstoatzz.io/zat) v0.3.0-alpha.25 + [k256](https://tangled.sh/@zzstoatzz.io/k256) v0.0.4 | Zig 0.16.0-dev.3070+b22eb176b | hand-rolled | hand-rolled (+ SHA-256 CID verify, size limits) | 245 245 | rust | [rsky](https://github.com/blacksky-algorithms/rsky) stack | — | [ciborium](https://crates.io/crates/ciborium) (header) + [serde_ipld_dagcbor](https://crates.io/crates/serde_ipld_dagcbor) (body) | [rs-car-sync](https://crates.io/crates/rs-car-sync) (+ SHA-256 CID verify) | 246 246 | rust | raw (minicbor + bumpalo) | — | [minicbor](https://crates.io/crates/minicbor) (zero-copy) | hand-rolled (sync) | 247 247 | rust | [jacquard](https://tangled.sh/@nonbinary.computer/jacquard) | 0.9 | [ciborium](https://crates.io/crates/ciborium) (header) + [serde_ipld_dagcbor](https://crates.io/crates/serde_ipld_dagcbor) (body) | [iroh-car](https://crates.io/crates/iroh-car) (async) | ··· 314 314 just chart pfrazee.com # run + generate SVG charts to docs/ 315 315 ``` 316 316 317 - the Zig package currently targets Zig 0.15.x (`zig/build.zig.zon` sets `minimum_zig_version = "0.15.2"`). if your default `zig` is a 0.16 development build, point the just recipes at a 0.15 binary: 317 + the Zig package targets Zig 0.16.0-dev.3070+b22eb176b (`zig/build.zig.zon` sets the minimum version). the just recipes use `zig` by default and respect `ZIG` when you need to pin a local compiler: 318 318 319 319 ```sh 320 - ZIG=/path/to/zig-0.15 just bench-relay 320 + ZIG=/path/to/zig-0.16 just bench-relay 321 321 ``` 322 322 323 323 ## methodology
+3 -3
zig/build.zig.zon
··· 2 2 .name = .atproto_bench, 3 3 .version = "0.0.1", 4 4 .fingerprint = 0x7cdfbb8d616ff9e0, 5 - .minimum_zig_version = "0.15.2", 5 + .minimum_zig_version = "0.16.0-dev.3070+b22eb176b", 6 6 .dependencies = .{ 7 7 .zat = .{ 8 - .url = "https://tangled.sh/zat.dev/zat/archive/v0.2.18", 9 - .hash = "zat-0.2.18-5PuC7iVfBQAV6IUlytvAWrpwrVUbwbLlAcSTvkXwncb_", 8 + .url = "https://tangled.sh/zat.dev/zat/archive/v0.3.0-alpha.25", 9 + .hash = "zat-0.3.0-alpha.24-5PuC7vB8CACd9OTAS8jDuwVN4hQBYiAdfArjFjo_sWyD", 10 10 }, 11 11 .k256 = .{ 12 12 .url = "https://tangled.sh/zzstoatzz.io/k256/archive/v0.0.4",
+19 -15
zig/src/bench.zig
··· 139 139 for (0..measured_passes) |pass| { 140 140 var pass_blocks: usize = 0; 141 141 var pass_errors: usize = 0; 142 - var timer = try std.time.Timer.start(); 142 + const start_ns = nowNs(); 143 143 for (corpus.frames) |frame| { 144 144 _ = arena.reset(.retain_capacity); 145 145 const result = decodeFullFrame(arena.allocator(), frame, options); 146 146 pass_blocks += result.blocks; 147 147 pass_errors += result.errors; 148 148 } 149 - const elapsed = timer.read(); 149 + const elapsed = nowNs() - start_ns; 150 150 pass_results[pass] = .{ 151 151 .frames = corpus.frames.len, 152 152 .blocks = pass_blocks, ··· 179 179 for (0..measured_passes) |pass| { 180 180 var pass_blocks: usize = 0; 181 181 var pass_errors: usize = 0; 182 - var timer = try std.time.Timer.start(); 182 + const start_ns = nowNs(); 183 183 for (corpus.frames) |frame| { 184 184 var arena = std.heap.ArenaAllocator.init(allocator); 185 185 defer arena.deinit(); ··· 187 187 pass_blocks += result.blocks; 188 188 pass_errors += result.errors; 189 189 } 190 - const elapsed = timer.read(); 190 + const elapsed = nowNs() - start_ns; 191 191 pass_results[pass] = .{ 192 192 .frames = corpus.frames.len, 193 193 .blocks = pass_blocks, ··· 202 202 return pass_results; 203 203 } 204 204 205 + fn nowNs() u64 { 206 + return @intCast(std.Io.Clock.awake.now(std.Options.debug_io).toNanoseconds()); 207 + } 208 + 205 209 fn reportResult( 206 210 name: []const u8, 207 211 corpus: CorpusInfo, ··· 265 269 const results = v.results orelse continue; 266 270 const json = buildDecodeJson(allocator, v.name, corpus, &results) catch continue; 267 271 defer allocator.free(json); 268 - const file = std.fs.cwd().createFile(v.file, .{}) catch continue; 269 - defer file.close(); 270 - file.writeAll(json) catch continue; 272 + const io = std.Options.debug_io; 273 + const file = std.Io.Dir.cwd().createFile(io, v.file, .{}) catch continue; 274 + defer file.close(io); 275 + file.writeStreamingAll(io, json) catch continue; 271 276 } 272 277 } 273 278 ··· 284 289 } 285 290 std.mem.sort(f64, &fps_values, {}, std.sort.asc(f64)); 286 291 287 - var buf: std.ArrayListUnmanaged(u8) = .{}; 288 - errdefer buf.deinit(allocator); 289 - const w = buf.writer(allocator); 292 + var aw: std.Io.Writer.Allocating = .init(allocator); 293 + errdefer aw.deinit(); 294 + const w = &aw.writer; 290 295 291 296 try w.print( 292 297 \\{{ ··· 321 326 fps_values[measured_passes - 1], 322 327 }); 323 328 324 - return try buf.toOwnedSlice(allocator); 329 + return try aw.toOwnedSlice(); 325 330 } 326 331 327 332 fn loadCorpus(allocator: Allocator, path: []const u8) !CorpusInfo { 328 - const file = std.fs.cwd().openFile(path, .{}) catch |err| { 333 + const io = std.Options.debug_io; 334 + const data = std.Io.Dir.cwd().readFileAlloc(io, path, allocator, .limited(50 * 1024 * 1024)) catch |err| { 329 335 std.debug.print("cannot open {s}: {s}\n", .{ path, @errorName(err) }); 330 336 std.debug.print("run `just capture` first to generate fixtures\n", .{}); 331 337 return err; 332 338 }; 333 - defer file.close(); 334 - const data = try file.readToEndAlloc(allocator, 50 * 1024 * 1024); 335 339 336 340 if (data.len < 4) return error.InvalidFormat; 337 341 338 342 const frame_count = std.mem.readInt(u32, data[0..4], .big); 339 - var frames: std.ArrayListUnmanaged([]const u8) = .{}; 343 + var frames: std.ArrayListUnmanaged([]const u8) = .empty; 340 344 var pos: usize = 4; 341 345 var total_bytes: usize = 0; 342 346 var min_frame: usize = std.math.maxInt(usize);
+24 -20
zig/src/bench_relay.zig
··· 124 124 var errors: usize = 0; 125 125 var timings: StageTimings = .{}; 126 126 127 - var timer = try std.time.Timer.start(); 127 + const start_ns = nowNs(); 128 128 129 129 if (strategy == .reuse) { 130 130 var arena = std.heap.ArenaAllocator.init(allocator); ··· 153 153 .frames = corpus.entries.len, 154 154 .verified = verified, 155 155 .errors = errors, 156 - .elapsed_ns = timer.read(), 156 + .elapsed_ns = nowNs() - start_ns, 157 157 }; 158 158 pass_timings[pass] = timings; 159 159 } ··· 177 177 178 178 fn relayValidateFrameTimed(allocator: Allocator, entry: CorpusEntry) TimedResult { 179 179 // stage 1: CBOR decode header + payload 180 - var t0 = std.time.Timer.start() catch return .{ .ok = false }; 180 + const t0 = nowNs(); 181 181 const header_result = cbor.decode(allocator, entry.frame) catch return .{ .ok = false }; 182 182 const payload = cbor.decodeAll(allocator, entry.frame[header_result.consumed..]) catch return .{ .ok = false }; 183 - const decode_ns = t0.read(); 183 + const decode_ns = nowNs() - t0; 184 184 185 185 // stage 2: CAR parse with CID hash verification 186 - var t1 = std.time.Timer.start() catch return .{ .ok = false, .decode_ns = decode_ns }; 186 + const t1 = nowNs(); 187 187 const blocks_bytes = payload.getBytes("blocks") orelse return .{ .ok = false, .decode_ns = decode_ns }; 188 188 const parsed_car = car.readWithOptions(allocator, blocks_bytes, .{ 189 189 .verify_block_hashes = true, 190 190 }) catch return .{ .ok = false, .decode_ns = decode_ns }; 191 - const car_ns = t1.read(); 191 + const car_ns = nowNs() - t1; 192 192 193 193 // stage 3: find commit, verify signature 194 - var t2 = std.time.Timer.start() catch return .{ .ok = false, .decode_ns = decode_ns, .car_ns = car_ns }; 194 + const t2 = nowNs(); 195 195 const ok = verifyCommitInCar(allocator, parsed_car, entry.curve_type, entry.pubkey); 196 - const verify_ns = t2.read(); 196 + const verify_ns = nowNs() - t2; 197 197 198 198 return .{ .ok = ok, .decode_ns = decode_ns, .car_ns = car_ns, .verify_ns = verify_ns }; 199 + } 200 + 201 + fn nowNs() u64 { 202 + return @intCast(std.Io.Clock.awake.now(std.Options.debug_io).toNanoseconds()); 199 203 } 200 204 201 205 fn verifyCommitInCar(allocator: Allocator, parsed_car: car.Car, curve_type: u8, pubkey: []const u8) bool { ··· 210 214 .map => |m| m, 211 215 else => continue, 212 216 }; 213 - var unsigned: std.ArrayListUnmanaged(cbor.Value.MapEntry) = .{}; 217 + var unsigned: std.ArrayListUnmanaged(cbor.Value.MapEntry) = .empty; 214 218 for (map_entries) |e| { 215 219 if (!std.mem.eql(u8, e.key, "sig")) unsigned.append(allocator, e) catch continue; 216 220 } ··· 361 365 for (variants) |v| { 362 366 const json = buildJson(allocator, v.name, corpus, &v.results) catch continue; 363 367 defer allocator.free(json); 364 - const file = std.fs.cwd().createFile(v.file, .{}) catch continue; 365 - defer file.close(); 366 - file.writeAll(json) catch continue; 368 + const io = std.Options.debug_io; 369 + const file = std.Io.Dir.cwd().createFile(io, v.file, .{}) catch continue; 370 + defer file.close(io); 371 + file.writeStreamingAll(io, json) catch continue; 367 372 } 368 373 } 369 374 ··· 380 385 } 381 386 std.mem.sort(f64, &fps_values, {}, std.sort.asc(f64)); 382 387 383 - var buf: std.ArrayListUnmanaged(u8) = .{}; 384 - errdefer buf.deinit(allocator); 385 - const w = buf.writer(allocator); 388 + var aw: std.Io.Writer.Allocating = .init(allocator); 389 + errdefer aw.deinit(); 390 + const w = &aw.writer; 386 391 387 392 try w.print( 388 393 \\{{ ··· 417 422 fps_values[measured_passes - 1], 418 423 }); 419 424 420 - return try buf.toOwnedSlice(allocator); 425 + return try aw.toOwnedSlice(); 421 426 } 422 427 423 428 // --- corpus loading --- 424 429 425 430 fn loadCorpus(allocator: Allocator, path: []const u8) !CorpusInfo { 426 - const file = std.fs.cwd().openFile(path, .{}) catch |err| { 431 + const io = std.Options.debug_io; 432 + const data = std.Io.Dir.cwd().readFileAlloc(io, path, allocator, .limited(100 * 1024 * 1024)) catch |err| { 427 433 std.debug.print("cannot open {s}: {s}\n", .{ path, @errorName(err) }); 428 434 std.debug.print("run `zig build run-capture-relay` first to generate corpus\n", .{}); 429 435 return err; 430 436 }; 431 - defer file.close(); 432 - const data = try file.readToEndAlloc(allocator, 100 * 1024 * 1024); 433 437 434 438 if (data.len < 4) return error.InvalidFormat; 435 439 436 440 const entry_count = std.mem.readInt(u32, data[0..4], .big); 437 - var entries: std.ArrayListUnmanaged(CorpusEntry) = .{}; 441 + var entries: std.ArrayListUnmanaged(CorpusEntry) = .empty; 438 442 var pos: usize = 4; 439 443 var total_frame_bytes: usize = 0; 440 444
+26 -22
zig/src/bench_sigs.zig
··· 85 85 for (0..measured_passes) |pass| { 86 86 var pass_verifies: usize = 0; 87 87 var pass_errors: usize = 0; 88 - var timer = try std.time.Timer.start(); 88 + const start_ns = nowNs(); 89 89 for (corpus.entries) |entry| { 90 90 _ = arena.reset(.retain_capacity); 91 91 if (verifyFullPipeline(arena.allocator(), entry)) { ··· 97 97 pass_results[pass] = .{ 98 98 .verifies = pass_verifies, 99 99 .errors = pass_errors, 100 - .elapsed_ns = timer.read(), 100 + .elapsed_ns = nowNs() - start_ns, 101 101 }; 102 102 } 103 103 ··· 128 128 129 129 fn benchCryptoOnly(allocator: Allocator, corpus: CorpusInfo) ![measured_passes]PassResult { 130 130 // pre-compute unsigned bytes for all entries 131 - var precomputed: std.ArrayListUnmanaged(PrecomputedEntry) = .{}; 131 + var precomputed: std.ArrayListUnmanaged(PrecomputedEntry) = .empty; 132 132 defer { 133 133 for (precomputed.items) |p| { 134 134 allocator.free(p.unsigned_bytes); ··· 148 148 else => continue, 149 149 }; 150 150 151 - var unsigned_entries: std.ArrayListUnmanaged(cbor.Value.MapEntry) = .{}; 151 + var unsigned_entries: std.ArrayListUnmanaged(cbor.Value.MapEntry) = .empty; 152 152 for (map_entries) |map_entry| { 153 153 if (!std.mem.eql(u8, map_entry.key, "sig")) { 154 154 unsigned_entries.append(arena.allocator(), map_entry) catch continue; ··· 177 177 for (0..measured_passes) |pass| { 178 178 var pass_verifies: usize = 0; 179 179 var pass_errors: usize = 0; 180 - var timer = try std.time.Timer.start(); 180 + const start_ns = nowNs(); 181 181 for (precomputed.items) |entry| { 182 182 if (verifyCryptoOnly(entry)) { 183 183 pass_verifies += 1; ··· 188 188 pass_results[pass] = .{ 189 189 .verifies = pass_verifies, 190 190 .errors = pass_errors, 191 - .elapsed_ns = timer.read(), 191 + .elapsed_ns = nowNs() - start_ns, 192 192 }; 193 193 } 194 194 ··· 198 198 199 199 fn benchPreparsedKey(allocator: Allocator, corpus: CorpusInfo) ![measured_passes]PassResult { 200 200 // pre-compute unsigned bytes AND parse keys 201 - var preparsed: std.ArrayListUnmanaged(PreparsedEntry) = .{}; 201 + var preparsed: std.ArrayListUnmanaged(PreparsedEntry) = .empty; 202 202 defer { 203 203 for (preparsed.items) |p| { 204 204 allocator.free(p.unsigned_bytes); ··· 218 218 else => continue, 219 219 }; 220 220 221 - var unsigned_entries: std.ArrayListUnmanaged(cbor.Value.MapEntry) = .{}; 221 + var unsigned_entries: std.ArrayListUnmanaged(cbor.Value.MapEntry) = .empty; 222 222 for (map_entries) |map_entry| { 223 223 if (!std.mem.eql(u8, map_entry.key, "sig")) { 224 224 unsigned_entries.append(arena.allocator(), map_entry) catch continue; ··· 251 251 for (0..measured_passes) |pass| { 252 252 var pass_verifies: usize = 0; 253 253 var pass_errors: usize = 0; 254 - var timer = try std.time.Timer.start(); 254 + const start_ns = nowNs(); 255 255 for (preparsed.items) |entry| { 256 256 if (verifyPreparsed(entry)) { 257 257 pass_verifies += 1; ··· 262 262 pass_results[pass] = .{ 263 263 .verifies = pass_verifies, 264 264 .errors = pass_errors, 265 - .elapsed_ns = timer.read(), 265 + .elapsed_ns = nowNs() - start_ns, 266 266 }; 267 267 } 268 268 269 269 reportResult("preparsed-key", corpus, &pass_results); 270 270 return pass_results; 271 + } 272 + 273 + fn nowNs() u64 { 274 + return @intCast(std.Io.Clock.awake.now(std.Options.debug_io).toNanoseconds()); 271 275 } 272 276 273 277 // --- verification --- ··· 286 290 else => return false, 287 291 }; 288 292 289 - var unsigned_entries: std.ArrayListUnmanaged(cbor.Value.MapEntry) = .{}; 293 + var unsigned_entries: std.ArrayListUnmanaged(cbor.Value.MapEntry) = .empty; 290 294 for (map_entries) |map_entry| { 291 295 if (!std.mem.eql(u8, map_entry.key, "sig")) { 292 296 unsigned_entries.append(allocator, map_entry) catch return false; ··· 422 426 for (variants) |v| { 423 427 const json = buildJsonVariant(allocator, v.name, corpus, &v.results) catch continue; 424 428 defer allocator.free(json); 425 - const file = std.fs.cwd().createFile(v.file, .{}) catch continue; 426 - defer file.close(); 427 - file.writeAll(json) catch continue; 429 + const io = std.Options.debug_io; 430 + const file = std.Io.Dir.cwd().createFile(io, v.file, .{}) catch continue; 431 + defer file.close(io); 432 + file.writeStreamingAll(io, json) catch continue; 428 433 } 429 434 } 430 435 ··· 441 446 } 442 447 std.mem.sort(f64, &vps_values, {}, std.sort.asc(f64)); 443 448 444 - var buf: std.ArrayListUnmanaged(u8) = .{}; 445 - errdefer buf.deinit(allocator); 446 - const w = buf.writer(allocator); 449 + var aw: std.Io.Writer.Allocating = .init(allocator); 450 + errdefer aw.deinit(); 451 + const w = &aw.writer; 447 452 448 453 try w.print( 449 454 \\{{ ··· 478 483 vps_values[measured_passes - 1], 479 484 }); 480 485 481 - return try buf.toOwnedSlice(allocator); 486 + return try aw.toOwnedSlice(); 482 487 } 483 488 484 489 // --- corpus loading --- 485 490 486 491 fn loadCorpus(allocator: Allocator, path: []const u8) !CorpusInfo { 487 - const file = std.fs.cwd().openFile(path, .{}) catch |err| { 492 + const io = std.Options.debug_io; 493 + const data = std.Io.Dir.cwd().readFileAlloc(io, path, allocator, .limited(50 * 1024 * 1024)) catch |err| { 488 494 std.debug.print("cannot open {s}: {s}\n", .{ path, @errorName(err) }); 489 495 std.debug.print("run `just capture-sigs` first to generate corpus\n", .{}); 490 496 return err; 491 497 }; 492 - defer file.close(); 493 - const data = try file.readToEndAlloc(allocator, 50 * 1024 * 1024); 494 498 495 499 if (data.len < 4) return error.InvalidFormat; 496 500 497 501 const entry_count = std.mem.readInt(u32, data[0..4], .big); 498 - var entries: std.ArrayListUnmanaged(CorpusEntry) = .{}; 502 + var entries: std.ArrayListUnmanaged(CorpusEntry) = .empty; 499 503 var pos: usize = 4; 500 504 var total_bytes: usize = 0; 501 505 var p256_count: usize = 0;
+21 -15
zig/src/capture.zig
··· 14 14 const ns_per_s = std.time.ns_per_s; 15 15 16 16 pub fn main() !void { 17 - var gpa: std.heap.GeneralPurposeAllocator(.{}) = .{}; 17 + var gpa: std.heap.DebugAllocator(.{}) = .init; 18 18 defer _ = gpa.deinit(); 19 19 const allocator = gpa.allocator(); 20 20 ··· 30 30 std.debug.print("connecting to jetstream...\n", .{}); 31 31 32 32 const host = "jetstream1.us-east.bsky.network"; 33 - var client = try websocket.Client.init(allocator, .{ 33 + var client = try websocket.Client.init(std.Options.debug_io, allocator, .{ 34 34 .host = host, 35 35 .port = 443, 36 36 .tls = true, ··· 79 79 std.debug.print("connecting to firehose...\n", .{}); 80 80 81 81 const host = "bsky.network"; 82 - var client = try websocket.Client.init(allocator, .{ 82 + var client = try websocket.Client.init(std.Options.debug_io, allocator, .{ 83 83 .host = host, 84 84 .port = 443, 85 85 .tls = true, ··· 98 98 99 99 var handler = FirehoseCaptureHandler{ 100 100 .allocator = allocator, 101 - .deadline = @as(i128, std.time.nanoTimestamp()) + @as(i128, capture_duration_s) * ns_per_s, 101 + .deadline = nowNs() + capture_duration_s * ns_per_s, 102 102 }; 103 103 client.readLoop(&handler) catch |err| switch (err) { 104 104 error.ConnectionClosed => {}, ··· 118 118 std.debug.print("\ncaptured {d} frames ({d} bytes total)\n", .{ frames.len, total_bytes }); 119 119 std.debug.print("writing {s}/firehose-frames.bin...\n", .{fixtures_dir}); 120 120 121 - const file = try std.fs.cwd().createFile(fixtures_dir ++ "/firehose-frames.bin", .{}); 122 - defer file.close(); 121 + const io = std.Options.debug_io; 122 + const file = try std.Io.Dir.cwd().createFile(io, fixtures_dir ++ "/firehose-frames.bin", .{}); 123 + defer file.close(io); 123 124 124 125 // header: frame count as u32 BE 125 126 var count_buf: [4]u8 = undefined; 126 127 std.mem.writeInt(u32, &count_buf, @intCast(frames.len), .big); 127 - try file.writeAll(&count_buf); 128 + try file.writeStreamingAll(io, &count_buf); 128 129 129 130 // each frame: [u32 BE length][frame bytes] 130 131 for (frames) |frame| { 131 132 var len_buf: [4]u8 = undefined; 132 133 std.mem.writeInt(u32, &len_buf, @intCast(frame.len), .big); 133 - try file.writeAll(&len_buf); 134 - try file.writeAll(frame); 134 + try file.writeStreamingAll(io, &len_buf); 135 + try file.writeStreamingAll(io, frame); 135 136 } 136 137 137 138 std.debug.print("done — wrote {d} frames to firehose-frames.bin\n", .{frames.len}); ··· 143 144 144 145 const FirehoseCaptureHandler = struct { 145 146 allocator: Allocator, 146 - deadline: i128, 147 - frames: std.ArrayListUnmanaged([]const u8) = .{}, 147 + deadline: u64, 148 + frames: std.ArrayListUnmanaged([]const u8) = .empty, 148 149 total_bytes: usize = 0, 149 150 150 151 pub fn serverMessage(self: *FirehoseCaptureHandler, data: []const u8) !void { 151 152 // check deadline 152 - if (std.time.nanoTimestamp() >= self.deadline) { 153 + if (nowNs() >= self.deadline) { 153 154 return error.ConnectionClosed; 154 155 } 155 156 ··· 196 197 } 197 198 198 199 fn saveFile(path: []const u8, data: []const u8) !void { 199 - const file = try std.fs.cwd().createFile(path, .{}); 200 - defer file.close(); 201 - try file.writeAll(data); 200 + const io = std.Options.debug_io; 201 + const file = try std.Io.Dir.cwd().createFile(io, path, .{}); 202 + defer file.close(io); 203 + try file.writeStreamingAll(io, data); 204 + } 205 + 206 + fn nowNs() u64 { 207 + return @intCast(std.Io.Clock.awake.now(std.Options.debug_io).toNanoseconds()); 202 208 }
+24 -19
zig/src/capture_relay.zig
··· 35 35 }; 36 36 37 37 pub fn main() !void { 38 - var gpa: std.heap.GeneralPurposeAllocator(.{}) = .{}; 38 + var gpa: std.heap.DebugAllocator(.{}) = .init; 39 39 defer _ = gpa.deinit(); 40 40 const allocator = gpa.allocator(); 41 41 ··· 49 49 std.debug.print("\ncaptured {d} frames\n", .{raw_frames.len}); 50 50 51 51 // phase 2: extract DIDs from frames (need signing keys) 52 - var frame_dids: std.ArrayListUnmanaged(struct { idx: usize, did: []const u8 }) = .{}; 52 + var frame_dids: std.ArrayListUnmanaged(struct { idx: usize, did: []const u8 }) = .empty; 53 53 defer frame_dids.deinit(allocator); 54 54 55 55 for (raw_frames, 0..) |frame, i| { ··· 66 66 // phase 3: collect unique DIDs and resolve concurrently 67 67 var unique_set = std.StringHashMapUnmanaged(usize){}; 68 68 defer unique_set.deinit(allocator); 69 - var unique_dids: std.ArrayListUnmanaged([]const u8) = .{}; 69 + var unique_dids: std.ArrayListUnmanaged([]const u8) = .empty; 70 70 defer unique_dids.deinit(allocator); 71 71 72 72 for (frame_dids.items) |fd| { ··· 107 107 std.debug.print("resolved {d}/{d} DIDs\n", .{ resolved, n_dids }); 108 108 109 109 // phase 4: validate each frame's signature, build corpus 110 - var entries: std.ArrayListUnmanaged(CorpusEntry) = .{}; 110 + var entries: std.ArrayListUnmanaged(CorpusEntry) = .empty; 111 111 defer entries.deinit(allocator); 112 112 var verify_ok: usize = 0; 113 113 var verify_fail: usize = 0; ··· 144 144 145 145 fn writeCorpus(entries: []const CorpusEntry) !void { 146 146 const path = fixtures_dir ++ "/relay-corpus.bin"; 147 - const file = try std.fs.cwd().createFile(path, .{}); 148 - defer file.close(); 147 + const io = std.Options.debug_io; 148 + const file = try std.Io.Dir.cwd().createFile(io, path, .{}); 149 + defer file.close(io); 149 150 150 151 var count_buf: [4]u8 = undefined; 151 152 std.mem.writeInt(u32, &count_buf, @intCast(entries.len), .big); 152 - try file.writeAll(&count_buf); 153 + try file.writeStreamingAll(io, &count_buf); 153 154 154 155 var total_bytes: usize = 4; 155 156 for (entries) |entry| { 156 157 // frame 157 158 var frame_len: [4]u8 = undefined; 158 159 std.mem.writeInt(u32, &frame_len, @intCast(entry.frame.len), .big); 159 - try file.writeAll(&frame_len); 160 - try file.writeAll(entry.frame); 160 + try file.writeStreamingAll(io, &frame_len); 161 + try file.writeStreamingAll(io, entry.frame); 161 162 total_bytes += 4 + entry.frame.len; 162 163 163 164 // key 164 - try file.writeAll(&.{entry.curve_type}); 165 - try file.writeAll(&.{@as(u8, @intCast(entry.pubkey.len))}); 166 - try file.writeAll(entry.pubkey); 165 + try file.writeStreamingAll(io, &.{entry.curve_type}); 166 + try file.writeStreamingAll(io, &.{@as(u8, @intCast(entry.pubkey.len))}); 167 + try file.writeStreamingAll(io, entry.pubkey); 167 168 total_bytes += 2 + entry.pubkey.len; 168 169 } 169 170 ··· 174 175 175 176 fn captureRawFrames(allocator: Allocator) ![]const []const u8 { 176 177 const host = "bsky.network"; 177 - var client = try websocket.Client.init(allocator, .{ 178 + var client = try websocket.Client.init(std.Options.debug_io, allocator, .{ 178 179 .host = host, 179 180 .port = 443, 180 181 .tls = true, ··· 193 194 194 195 var handler = CaptureHandler{ 195 196 .allocator = allocator, 196 - .deadline = @as(i128, std.time.nanoTimestamp()) + @as(i128, capture_duration_s) * ns_per_s, 197 + .deadline = nowNs() + capture_duration_s * ns_per_s, 197 198 }; 198 199 client.readLoop(&handler) catch |err| switch (err) { 199 200 error.ConnectionClosed => {}, ··· 205 206 206 207 const CaptureHandler = struct { 207 208 allocator: Allocator, 208 - deadline: i128, 209 - frames: std.ArrayListUnmanaged([]const u8) = .{}, 209 + deadline: u64, 210 + frames: std.ArrayListUnmanaged([]const u8) = .empty, 210 211 211 212 pub fn serverMessage(self: *CaptureHandler, data: []const u8) !void { 212 - if (std.time.nanoTimestamp() >= self.deadline) return error.ConnectionClosed; 213 + if (nowNs() >= self.deadline) return error.ConnectionClosed; 213 214 214 215 // only capture commits with ops 215 216 var arena = std.heap.ArenaAllocator.init(self.allocator); ··· 242 243 // --- DID resolution --- 243 244 244 245 fn resolveWorker(allocator: Allocator, dids: []const []const u8, results: []?CachedKey, start: usize, end: usize) void { 245 - var resolver = zat.DidResolver.init(allocator); 246 + var resolver = zat.DidResolver.init(std.Options.debug_io, allocator); 246 247 defer resolver.deinit(); 247 248 for (start..end) |i| { 248 249 results[i] = resolveDidKey(allocator, &resolver, dids[i]); ··· 290 291 .map => |m| m, 291 292 else => continue, 292 293 }; 293 - var unsigned: std.ArrayListUnmanaged(cbor.Value.MapEntry) = .{}; 294 + var unsigned: std.ArrayListUnmanaged(cbor.Value.MapEntry) = .empty; 294 295 for (map_entries) |e| { 295 296 if (!std.mem.eql(u8, e.key, "sig")) unsigned.append(allocator, e) catch continue; 296 297 } ··· 343 344 0xba, 0xae, 0xdc, 0xe6, 0xaf, 0x48, 0xa0, 0x3b, 344 345 0xbf, 0xd2, 0x5e, 0x8c, 0xd0, 0x36, 0x41, 0x41, 345 346 }; 347 + 348 + fn nowNs() u64 { 349 + return @intCast(std.Io.Clock.awake.now(std.Options.debug_io).toNanoseconds()); 350 + }
+25 -20
zig/src/capture_sigs.zig
··· 41 41 }; 42 42 43 43 pub fn main() !void { 44 - var gpa: std.heap.GeneralPurposeAllocator(.{}) = .{}; 44 + var gpa: std.heap.DebugAllocator(.{}) = .init; 45 45 defer _ = gpa.deinit(); 46 46 const allocator = gpa.allocator(); 47 47 ··· 56 56 std.debug.print("\ncaptured {d} raw frames\n", .{raw_frames.len}); 57 57 58 58 // phase 2: extract signed commits from frames 59 - var commits: std.ArrayListUnmanaged(RawCommit) = .{}; 59 + var commits: std.ArrayListUnmanaged(RawCommit) = .empty; 60 60 defer { 61 61 for (commits.items) |c| { 62 62 allocator.free(c.did); ··· 84 84 // collect unique DIDs 85 85 var unique_set = std.StringHashMapUnmanaged(usize){}; 86 86 defer unique_set.deinit(allocator); 87 - var unique_dids: std.ArrayListUnmanaged([]const u8) = .{}; 87 + var unique_dids: std.ArrayListUnmanaged([]const u8) = .empty; 88 88 defer unique_dids.deinit(allocator); 89 89 90 90 for (commits.items) |commit| { ··· 132 132 std.debug.print("resolved {d}/{d} DIDs ({d} errors)\n", .{ resolved, n_dids, resolve_errors }); 133 133 134 134 // build corpus entries from commits + resolved keys 135 - var entries: std.ArrayListUnmanaged(CorpusEntry) = .{}; 135 + var entries: std.ArrayListUnmanaged(CorpusEntry) = .empty; 136 136 defer { 137 137 for (entries.items) |e| { 138 138 allocator.free(e.signed_bytes); ··· 153 153 std.debug.print("built {d} corpus entries\n", .{entries.items.len}); 154 154 155 155 // phase 4: validate — verify each signature, drop failures 156 - var valid: std.ArrayListUnmanaged(CorpusEntry) = .{}; 156 + var valid: std.ArrayListUnmanaged(CorpusEntry) = .empty; 157 157 defer valid.deinit(allocator); 158 158 var verify_failures: usize = 0; 159 159 ··· 189 189 190 190 fn writeCorpus(entries: []const CorpusEntry) !void { 191 191 const path = fixtures_dir ++ "/sig-verify-corpus.bin"; 192 - const file = try std.fs.cwd().createFile(path, .{}); 193 - defer file.close(); 192 + const io = std.Options.debug_io; 193 + const file = try std.Io.Dir.cwd().createFile(io, path, .{}); 194 + defer file.close(io); 194 195 195 196 var count_buf: [4]u8 = undefined; 196 197 std.mem.writeInt(u32, &count_buf, @intCast(entries.len), .big); 197 - try file.writeAll(&count_buf); 198 + try file.writeStreamingAll(io, &count_buf); 198 199 199 200 var total_bytes: usize = 4; 200 201 for (entries) |entry| { 201 - try file.writeAll(&.{entry.curve_type}); 202 + try file.writeStreamingAll(io, &.{entry.curve_type}); 202 203 total_bytes += 1; 203 204 204 205 var signed_len: [2]u8 = undefined; 205 206 std.mem.writeInt(u16, &signed_len, @intCast(entry.signed_bytes.len), .big); 206 - try file.writeAll(&signed_len); 207 - try file.writeAll(entry.signed_bytes); 207 + try file.writeStreamingAll(io, &signed_len); 208 + try file.writeStreamingAll(io, entry.signed_bytes); 208 209 total_bytes += 2 + entry.signed_bytes.len; 209 210 210 211 var pubkey_len: [2]u8 = undefined; 211 212 std.mem.writeInt(u16, &pubkey_len, @intCast(entry.pubkey_bytes.len), .big); 212 - try file.writeAll(&pubkey_len); 213 - try file.writeAll(entry.pubkey_bytes); 213 + try file.writeStreamingAll(io, &pubkey_len); 214 + try file.writeStreamingAll(io, entry.pubkey_bytes); 214 215 total_bytes += 2 + entry.pubkey_bytes.len; 215 216 } 216 217 ··· 225 226 std.debug.print("connecting to firehose...\n", .{}); 226 227 227 228 const host = "bsky.network"; 228 - var client = try websocket.Client.init(allocator, .{ 229 + var client = try websocket.Client.init(std.Options.debug_io, allocator, .{ 229 230 .host = host, 230 231 .port = 443, 231 232 .tls = true, ··· 244 245 245 246 var handler = FirehoseCaptureHandler{ 246 247 .allocator = allocator, 247 - .deadline = @as(i128, std.time.nanoTimestamp()) + @as(i128, capture_duration_s) * ns_per_s, 248 + .deadline = nowNs() + capture_duration_s * ns_per_s, 248 249 }; 249 250 client.readLoop(&handler) catch |err| switch (err) { 250 251 error.ConnectionClosed => {}, ··· 258 259 259 260 const FirehoseCaptureHandler = struct { 260 261 allocator: Allocator, 261 - deadline: i128, 262 - frames: std.ArrayListUnmanaged([]const u8) = .{}, 262 + deadline: u64, 263 + frames: std.ArrayListUnmanaged([]const u8) = .empty, 263 264 total_bytes: usize = 0, 264 265 265 266 pub fn serverMessage(self: *FirehoseCaptureHandler, data: []const u8) !void { 266 - if (std.time.nanoTimestamp() >= self.deadline) { 267 + if (nowNs() >= self.deadline) { 267 268 return error.ConnectionClosed; 268 269 } 269 270 ··· 331 332 // --- concurrent DID resolution --- 332 333 333 334 fn resolveWorker(allocator: Allocator, dids: []const []const u8, results: []?CachedKey, start: usize, end: usize) void { 334 - var resolver = zat.DidResolver.init(allocator); 335 + var resolver = zat.DidResolver.init(std.Options.debug_io, allocator); 335 336 defer resolver.deinit(); 336 337 337 338 for (start..end) |i| { ··· 378 379 else => return false, 379 380 }; 380 381 381 - var unsigned_entries: std.ArrayListUnmanaged(cbor.Value.MapEntry) = .{}; 382 + var unsigned_entries: std.ArrayListUnmanaged(cbor.Value.MapEntry) = .empty; 382 383 defer unsigned_entries.deinit(allocator); 383 384 for (map_entries) |map_entry| { 384 385 if (!std.mem.eql(u8, map_entry.key, "sig")) { ··· 437 438 0xba, 0xae, 0xdc, 0xe6, 0xaf, 0x48, 0xa0, 0x3b, 438 439 0xbf, 0xd2, 0x5e, 0x8c, 0xd0, 0x36, 0x41, 0x41, 439 440 }; 441 + 442 + fn nowNs() u64 { 443 + return @intCast(std.Io.Clock.awake.now(std.Options.debug_io).toNanoseconds()); 444 + }
+32 -36
zig/src/verify.zig
··· 4 4 const Allocator = std.mem.Allocator; 5 5 const crypto = std.crypto; 6 6 7 - pub fn main() !void { 8 - var gpa: std.heap.GeneralPurposeAllocator(.{}) = .init; 9 - defer _ = gpa.deinit(); 10 - const allocator = gpa.allocator(); 11 - 12 - const args = try std.process.argsAlloc(allocator); 13 - defer std.process.argsFree(allocator, args); 7 + pub fn main(init: std.process.Init) !void { 8 + const allocator = init.gpa; 9 + const args = try init.minimal.args.toSlice(init.arena.allocator()); 14 10 15 11 if (args.len < 2) { 16 12 std.debug.print("usage: verify <handle-or-did>\n", .{}); ··· 21 17 const p = std.debug.print; 22 18 p("\n=== zat: full AT Protocol trust chain ===\n\n", .{}); 23 19 24 - var total_timer = try std.time.Timer.start(); 20 + const total_start_ns = nowNs(); 25 21 26 22 var arena = std.heap.ArenaAllocator.init(allocator); 27 23 defer arena.deinit(); 28 24 const alloc = arena.allocator(); 29 25 30 26 // --- step 1: resolve identifier to DID --- 31 - var step_timer = try std.time.Timer.start(); 27 + var step_start_ns = nowNs(); 32 28 33 29 const is_did = zat.Did.parse(identifier) != null; 34 30 const did_str = if (is_did) ··· 38 34 std.debug.print("error: invalid handle or DID: {s}\n", .{identifier}); 39 35 std.process.exit(1); 40 36 }; 41 - var resolver = zat.HandleResolver.init(alloc); 37 + var resolver = zat.HandleResolver.init(std.Options.debug_io, alloc); 42 38 defer resolver.deinit(); 43 39 break :blk try resolver.resolve(handle); 44 40 }; 45 41 46 - var handle_ms = step_timer.read(); 42 + var handle_ms = nowNs() - step_start_ns; 47 43 48 44 if (is_did) { 49 45 p("DID {s}\n", .{identifier}); ··· 55 51 } 56 52 57 53 // --- step 2: resolve DID document --- 58 - step_timer = try std.time.Timer.start(); 54 + step_start_ns = nowNs(); 59 55 60 56 const did = zat.Did.parse(did_str) orelse { 61 57 std.debug.print("error: invalid DID: {s}\n", .{did_str}); 62 58 std.process.exit(1); 63 59 }; 64 60 65 - var did_resolver = zat.DidResolver.init(alloc); 61 + var did_resolver = zat.DidResolver.init(std.Options.debug_io, alloc); 66 62 defer did_resolver.deinit(); 67 63 var did_doc = try did_resolver.resolve(did); 68 64 defer did_doc.deinit(); ··· 80 76 std.process.exit(1); 81 77 }; 82 78 83 - const did_ns = step_timer.read(); 79 + const did_ns = nowNs() - step_start_ns; 84 80 const key_type_str: []const u8 = switch (public_key.key_type) { 85 81 .secp256k1 => "secp256k1", 86 82 .p256 => "p256", ··· 96 92 p(" → PDS: {s}\n", .{pds_endpoint}); 97 93 98 94 // --- step 3: fetch repo CAR --- 99 - step_timer = try std.time.Timer.start(); 95 + step_start_ns = nowNs(); 100 96 101 97 const url = try std.fmt.allocPrint(alloc, "{s}/xrpc/com.atproto.sync.getRepo?did={s}", .{ pds_endpoint, did_str }); 102 98 103 - var http_client: std.http.Client = .{ .allocator = alloc }; 104 - defer http_client.deinit(); 105 - 106 - var aw: std.Io.Writer.Allocating = .init(alloc); 107 - 108 - const result = http_client.fetch(.{ 109 - .location = .{ .url = url }, 110 - .response_writer = &aw.writer, 111 - .headers = .{ .accept_encoding = .{ .override = "identity" } }, 99 + var transport = zat.HttpTransport.init(std.Options.debug_io, alloc); 100 + defer transport.deinit(); 101 + const result = transport.fetch(.{ 102 + .url = url, 103 + .accept = "application/vnd.ipld.car", 112 104 }) catch { 113 105 std.debug.print("error: failed to fetch repo from {s}\n", .{pds_endpoint}); 114 106 std.process.exit(1); ··· 119 111 std.process.exit(1); 120 112 } 121 113 122 - const car_bytes = aw.toArrayList().items; 123 - const fetch_ns = step_timer.read(); 114 + const car_bytes = result.body; 115 + const fetch_ns = nowNs() - step_start_ns; 124 116 125 117 p("\nrepo com.atproto.sync.getRepo\n", .{}); 126 118 { ··· 131 123 } 132 124 133 125 // --- step 4: parse CAR (inline parser — no size limits for full repos) --- 134 - step_timer = try std.time.Timer.start(); 126 + step_start_ns = nowNs(); 135 127 136 128 const repo_car = parseCar(alloc, car_bytes) catch { 137 129 std.debug.print("error: failed to parse CAR\n", .{}); ··· 143 135 std.process.exit(1); 144 136 } 145 137 146 - const car_ns = step_timer.read(); 138 + const car_ns = nowNs() - step_start_ns; 147 139 148 140 // --- step 5: find + decode commit --- 149 141 const commit_data = findBlock(repo_car.block_map, repo_car.roots[0].raw) orelse { ··· 174 166 }; 175 167 176 168 // --- step 6: verify signature --- 177 - step_timer = try std.time.Timer.start(); 169 + step_start_ns = nowNs(); 178 170 179 171 const unsigned_commit_bytes = try encodeUnsignedCommit(alloc, commit); 180 172 try verifySig(public_key.key_type, unsigned_commit_bytes, sig_bytes, public_key.raw); 181 173 182 - const sig_ns = step_timer.read(); 174 + const sig_ns = nowNs() - step_start_ns; 183 175 184 176 // --- step 7: walk + verify MST (specialized decoder + key height checks) --- 185 - step_timer = try std.time.Timer.start(); 177 + step_start_ns = nowNs(); 186 178 187 179 const record_count = try walkAndVerifyMst(alloc, repo_car.block_map, data_cid.raw); 188 180 189 - const walk_verify_ns = step_timer.read(); 181 + const walk_verify_ns = nowNs() - step_start_ns; 190 182 191 183 // --- output --- 192 184 const rev_short = if (commit_rev.len > 6) commit_rev[0..6] else commit_rev; ··· 211 203 printPadded(desc.len, walk_verify_ns); 212 204 } 213 205 214 - const total_ns = total_timer.read(); 206 + const total_ns = nowNs() - total_start_ns; 215 207 const network_ns = handle_ms + did_ns + fetch_ns; 216 208 const compute_ns = car_ns + sig_ns + walk_verify_ns; 217 209 ··· 230 222 else => return error.InvalidCommit, 231 223 }; 232 224 233 - var unsigned_entries: std.ArrayList(zat.cbor.Value.MapEntry) = .{}; 225 + var unsigned_entries: std.ArrayList(zat.cbor.Value.MapEntry) = .empty; 234 226 for (entries) |entry| { 235 227 if (!std.mem.eql(u8, entry.key, "sig")) { 236 228 try unsigned_entries.append(allocator, entry); ··· 241 233 return zat.cbor.encodeAlloc(allocator, unsigned_value); 242 234 } 243 235 236 + fn nowNs() u64 { 237 + return @intCast(std.Io.Clock.awake.now(std.Options.debug_io).toNanoseconds()); 238 + } 239 + 244 240 /// walk + verify MST using specialized decoder + key height checks. 245 241 /// combined with CAR block CID verification, this proves canonical structure 246 242 /// without a full rebuild. ··· 311 307 312 308 const header = zat.cbor.decodeAll(allocator, data[pos..header_end]) catch return error.InvalidCar; 313 309 314 - var roots: std.ArrayList(zat.cbor.Cid) = .{}; 310 + var roots: std.ArrayList(zat.cbor.Cid) = .empty; 315 311 if (header.getArray("roots")) |root_values| { 316 312 for (root_values) |root_val| { 317 313 switch (root_val) { ··· 324 320 pos = header_end; 325 321 326 322 // blocks — hash map keyed by CID bytes for O(1) lookup (like Go's TinyBlockstore) 327 - var block_map: BlockMap = .{}; 323 + var block_map: BlockMap = .empty; 328 324 var block_count: usize = 0; 329 325 while (pos < data.len) { 330 326 const block_len = zat.cbor.readUvarint(data, &pos) orelse return error.InvalidCar;