this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: replace k256 path dep with URL, bump zat to v0.2.18

path dependency on ../../k256 broke builds for anyone who doesn't
have the exact local directory layout. use tagged URL deps for both
k256 (v0.0.4) and zat (v0.2.18). also pins minimum_zig_version to
0.15.2 and includes relay benchmark scaffolding.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz 42e1fed1 b73280f5

+859 -4
+38
zig/build.zig
··· 87 87 const verify_step = b.step("run-verify", "run full AT Protocol trust chain verification"); 88 88 verify_step.dependOn(&run_verify.step); 89 89 90 + // relay corpus capture tool 91 + const capture_relay = b.addExecutable(.{ 92 + .name = "capture-relay", 93 + .root_module = b.createModule(.{ 94 + .root_source_file = b.path("src/capture_relay.zig"), 95 + .target = target, 96 + .optimize = optimize, 97 + .imports = &.{ 98 + .{ .name = "zat", .module = zat_mod }, 99 + .{ .name = "websocket", .module = ws_mod }, 100 + }, 101 + }), 102 + }); 103 + b.installArtifact(capture_relay); 104 + 105 + const run_capture_relay = b.addRunArtifact(capture_relay); 106 + const capture_relay_step = b.step("run-capture-relay", "capture relay benchmark corpus from live network"); 107 + capture_relay_step.dependOn(&run_capture_relay.step); 108 + 90 109 // k256 — optimized secp256k1 verification 91 110 const k256_mod = b.dependency("k256", .{ 92 111 .target = target, ··· 111 130 const run_bench_sigs = b.addRunArtifact(bench_sigs); 112 131 const bench_sigs_step = b.step("run-bench-sigs", "run sig verify benchmarks"); 113 132 bench_sigs_step.dependOn(&run_bench_sigs.step); 133 + 134 + // relay hot-path benchmark 135 + const bench_relay = b.addExecutable(.{ 136 + .name = "bench-relay", 137 + .root_module = b.createModule(.{ 138 + .root_source_file = b.path("src/bench_relay.zig"), 139 + .target = target, 140 + .optimize = optimize, 141 + .imports = &.{ 142 + .{ .name = "zat", .module = zat_mod }, 143 + .{ .name = "k256", .module = k256_mod }, 144 + }, 145 + }), 146 + }); 147 + b.installArtifact(bench_relay); 148 + 149 + const run_bench_relay = b.addRunArtifact(bench_relay); 150 + const bench_relay_step = b.step("run-bench-relay", "run relay hot-path benchmarks"); 151 + bench_relay_step.dependOn(&run_bench_relay.step); 114 152 }
+5 -4
zig/build.zig.zon
··· 2 2 .name = .atproto_bench, 3 3 .version = "0.0.1", 4 4 .fingerprint = 0x7cdfbb8d616ff9e0, 5 - .minimum_zig_version = "0.15.0", 5 + .minimum_zig_version = "0.15.2", 6 6 .dependencies = .{ 7 7 .zat = .{ 8 - .url = "https://tangled.sh/zat.dev/zat/archive/v0.2.6", 9 - .hash = "zat-0.2.6-5PuC7m5IBADrKBpmzfx23NYdlzAmKk_iKvJi8VNd06F8", 8 + .url = "https://tangled.sh/zat.dev/zat/archive/v0.2.18", 9 + .hash = "zat-0.2.18-5PuC7iVfBQAV6IUlytvAWrpwrVUbwbLlAcSTvkXwncb_", 10 10 }, 11 11 .k256 = .{ 12 - .path = "../../k256", 12 + .url = "https://tangled.sh/zzstoatzz.io/k256/archive/v0.0.4", 13 + .hash = "k256-0.0.4-w2pjn6wLAQDn0o3KDRsJeNblLQOmBb6Eklw7VniewZgD", 13 14 }, 14 15 }, 15 16 .paths = .{
+471
zig/src/bench_relay.zig
··· 1 + //! relay hot-path benchmark 2 + //! 3 + //! measures the full pipeline a relay executes per incoming firehose frame: 4 + //! 1. CBOR decode header (peek at event type) 5 + //! 2. CBOR decode payload 6 + //! 3. CAR parse (extract blocks, verify CID hashes) 7 + //! 4. find commit block, CBOR decode it 8 + //! 5. strip sig, re-encode unsigned commit 9 + //! 6. ECDSA verify against cached public key 10 + //! 11 + //! uses a pre-captured corpus of raw frames + pre-resolved signing keys 12 + //! so the benchmark is pure CPU — no network calls. 13 + //! 14 + //! reports frames/sec with variance across passes, plus stage breakdown. 15 + 16 + const std = @import("std"); 17 + const zat = @import("zat"); 18 + const k256 = @import("k256"); 19 + 20 + const Allocator = std.mem.Allocator; 21 + const cbor = zat.cbor; 22 + const car = zat.car; 23 + 24 + const warmup_passes: usize = 2; 25 + const measured_passes: usize = 5; 26 + const fixtures_dir = "../fixtures"; 27 + 28 + const CorpusEntry = struct { 29 + frame: []const u8, 30 + curve_type: u8, 31 + pubkey: []const u8, 32 + }; 33 + 34 + const CorpusInfo = struct { 35 + entries: []const CorpusEntry, 36 + total_frame_bytes: usize, 37 + }; 38 + 39 + const PassResult = struct { 40 + frames: usize, 41 + verified: usize, 42 + errors: usize, 43 + elapsed_ns: u64, 44 + }; 45 + 46 + /// per-stage timing accumulated across all frames in a pass 47 + const StageTimings = struct { 48 + decode_ns: u64 = 0, // CBOR header + payload 49 + car_ns: u64 = 0, // CAR parse 50 + verify_ns: u64 = 0, // commit decode + sig strip + re-encode + ECDSA 51 + }; 52 + 53 + pub fn main() !void { 54 + const allocator = std.heap.c_allocator; 55 + 56 + const corpus = try loadCorpus(allocator, fixtures_dir ++ "/relay-corpus.bin"); 57 + 58 + std.debug.print("\n=== relay hot-path benchmark ===\n\n", .{}); 59 + std.debug.print("corpus: {d} frames, {d:.1} MB\n", .{ 60 + corpus.entries.len, 61 + @as(f64, @floatFromInt(corpus.total_frame_bytes)) / (1024.0 * 1024.0), 62 + }); 63 + std.debug.print("passes: {d} warmup, {d} measured\n\n", .{ warmup_passes, measured_passes }); 64 + 65 + // sanity check first entry 66 + { 67 + var arena = std.heap.ArenaAllocator.init(allocator); 68 + defer arena.deinit(); 69 + const ok = relayValidateFrame(arena.allocator(), corpus.entries[0]); 70 + std.debug.print("first frame: {s}\n\n", .{if (ok) "OK" else "FAIL"}); 71 + } 72 + 73 + // full pipeline (arena reuse — production pattern) 74 + const reuse_results, const reuse_timings = try benchRelay(allocator, corpus, .reuse); 75 + 76 + // full pipeline (fresh arena per frame — fair comparison) 77 + const alloc_results, const alloc_timings = try benchRelay(allocator, corpus, .alloc); 78 + 79 + // write JSON 80 + writeJson(allocator, corpus, reuse_results, alloc_results); 81 + 82 + // stage breakdown from median pass 83 + std.debug.print("\n--- stage breakdown (median pass, arena reuse) ---\n", .{}); 84 + printStageBreakdown(corpus, reuse_timings); 85 + std.debug.print("\n--- stage breakdown (median pass, arena alloc) ---\n", .{}); 86 + printStageBreakdown(corpus, alloc_timings); 87 + 88 + std.debug.print("\n", .{}); 89 + } 90 + 91 + const AllocStrategy = enum { reuse, alloc }; 92 + 93 + fn benchRelay( 94 + allocator: Allocator, 95 + corpus: CorpusInfo, 96 + strategy: AllocStrategy, 97 + ) !struct { [measured_passes]PassResult, [measured_passes]StageTimings } { 98 + const label = if (strategy == .reuse) "relay (reuse)" else "relay (alloc)"; 99 + 100 + // warmup 101 + for (0..warmup_passes) |_| { 102 + if (strategy == .reuse) { 103 + var arena = std.heap.ArenaAllocator.init(allocator); 104 + defer arena.deinit(); 105 + for (corpus.entries) |entry| { 106 + _ = arena.reset(.retain_capacity); 107 + _ = relayValidateFrame(arena.allocator(), entry); 108 + } 109 + } else { 110 + for (corpus.entries) |entry| { 111 + var arena = std.heap.ArenaAllocator.init(allocator); 112 + defer arena.deinit(); 113 + _ = relayValidateFrame(arena.allocator(), entry); 114 + } 115 + } 116 + } 117 + 118 + // measured passes 119 + var pass_results: [measured_passes]PassResult = undefined; 120 + var pass_timings: [measured_passes]StageTimings = undefined; 121 + 122 + for (0..measured_passes) |pass| { 123 + var verified: usize = 0; 124 + var errors: usize = 0; 125 + var timings: StageTimings = .{}; 126 + 127 + var timer = try std.time.Timer.start(); 128 + 129 + if (strategy == .reuse) { 130 + var arena = std.heap.ArenaAllocator.init(allocator); 131 + defer arena.deinit(); 132 + for (corpus.entries) |entry| { 133 + _ = arena.reset(.retain_capacity); 134 + const result = relayValidateFrameTimed(arena.allocator(), entry); 135 + if (result.ok) verified += 1 else errors += 1; 136 + timings.decode_ns += result.decode_ns; 137 + timings.car_ns += result.car_ns; 138 + timings.verify_ns += result.verify_ns; 139 + } 140 + } else { 141 + for (corpus.entries) |entry| { 142 + var arena = std.heap.ArenaAllocator.init(allocator); 143 + const result = relayValidateFrameTimed(arena.allocator(), entry); 144 + arena.deinit(); 145 + if (result.ok) verified += 1 else errors += 1; 146 + timings.decode_ns += result.decode_ns; 147 + timings.car_ns += result.car_ns; 148 + timings.verify_ns += result.verify_ns; 149 + } 150 + } 151 + 152 + pass_results[pass] = .{ 153 + .frames = corpus.entries.len, 154 + .verified = verified, 155 + .errors = errors, 156 + .elapsed_ns = timer.read(), 157 + }; 158 + pass_timings[pass] = timings; 159 + } 160 + 161 + reportResult(label, corpus, &pass_results); 162 + return .{ pass_results, pass_timings }; 163 + } 164 + 165 + /// the relay hot path: decode → CAR → verify commit signature 166 + fn relayValidateFrame(allocator: Allocator, entry: CorpusEntry) bool { 167 + const result = relayValidateFrameTimed(allocator, entry); 168 + return result.ok; 169 + } 170 + 171 + const TimedResult = struct { 172 + ok: bool, 173 + decode_ns: u64 = 0, 174 + car_ns: u64 = 0, 175 + verify_ns: u64 = 0, 176 + }; 177 + 178 + fn relayValidateFrameTimed(allocator: Allocator, entry: CorpusEntry) TimedResult { 179 + // stage 1: CBOR decode header + payload 180 + var t0 = std.time.Timer.start() catch return .{ .ok = false }; 181 + const header_result = cbor.decode(allocator, entry.frame) catch return .{ .ok = false }; 182 + const payload = cbor.decodeAll(allocator, entry.frame[header_result.consumed..]) catch return .{ .ok = false }; 183 + const decode_ns = t0.read(); 184 + 185 + // stage 2: CAR parse with CID hash verification 186 + var t1 = std.time.Timer.start() catch return .{ .ok = false, .decode_ns = decode_ns }; 187 + const blocks_bytes = payload.getBytes("blocks") orelse return .{ .ok = false, .decode_ns = decode_ns }; 188 + const parsed_car = car.readWithOptions(allocator, blocks_bytes, .{ 189 + .verify_block_hashes = true, 190 + }) catch return .{ .ok = false, .decode_ns = decode_ns }; 191 + const car_ns = t1.read(); 192 + 193 + // stage 3: find commit, verify signature 194 + var t2 = std.time.Timer.start() catch return .{ .ok = false, .decode_ns = decode_ns, .car_ns = car_ns }; 195 + const ok = verifyCommitInCar(allocator, parsed_car, entry.curve_type, entry.pubkey); 196 + const verify_ns = t2.read(); 197 + 198 + return .{ .ok = ok, .decode_ns = decode_ns, .car_ns = car_ns, .verify_ns = verify_ns }; 199 + } 200 + 201 + fn verifyCommitInCar(allocator: Allocator, parsed_car: car.Car, curve_type: u8, pubkey: []const u8) bool { 202 + for (parsed_car.blocks) |block| { 203 + const value = cbor.decodeAll(allocator, block.data) catch continue; 204 + const sig_bytes = value.getBytes("sig") orelse continue; 205 + if (value.getString("did") == null) continue; 206 + if (sig_bytes.len != 64) continue; 207 + 208 + // strip sig field, re-encode 209 + const map_entries = switch (value) { 210 + .map => |m| m, 211 + else => continue, 212 + }; 213 + var unsigned: std.ArrayListUnmanaged(cbor.Value.MapEntry) = .{}; 214 + for (map_entries) |e| { 215 + if (!std.mem.eql(u8, e.key, "sig")) unsigned.append(allocator, e) catch continue; 216 + } 217 + const unsigned_bytes = cbor.encodeAlloc(allocator, .{ .map = unsigned.items }) catch continue; 218 + 219 + return if (curve_type == 0) 220 + verifyP256(unsigned_bytes, sig_bytes, pubkey) 221 + else 222 + verifySecp256k1(unsigned_bytes, sig_bytes, pubkey); 223 + } 224 + return false; 225 + } 226 + 227 + // --- crypto (same as bench_sigs) --- 228 + 229 + const P256Scheme = std.crypto.sign.ecdsa.EcdsaP256Sha256; 230 + const K256Scheme = k256.EcdsaSecp256k1Sha256; 231 + 232 + fn verifyP256(message: []const u8, sig_bytes: []const u8, pubkey_bytes: []const u8) bool { 233 + const sig = P256Scheme.Signature.fromBytes(sig_bytes[0..64].*); 234 + if (isHighS(p256_half_order, sig.s)) return false; 235 + const pk = P256Scheme.PublicKey.fromSec1(pubkey_bytes) catch return false; 236 + sig.verify(message, pk) catch return false; 237 + return true; 238 + } 239 + 240 + fn verifySecp256k1(message: []const u8, sig_bytes: []const u8, pubkey_bytes: []const u8) bool { 241 + const sig = K256Scheme.Signature.fromBytes(sig_bytes[0..64].*); 242 + if (isHighS(secp256k1_half_order, sig.s)) return false; 243 + const pk = K256Scheme.PublicKey.fromSec1(pubkey_bytes) catch return false; 244 + sig.verifyMsg(message, pk) catch return false; 245 + return true; 246 + } 247 + 248 + fn isHighS(half_order: [32]u8, s: [32]u8) bool { 249 + for (0..32) |i| { 250 + if (s[i] < half_order[i]) return false; 251 + if (s[i] > half_order[i]) return true; 252 + } 253 + return false; 254 + } 255 + 256 + const p256_half_order = [32]u8{ 257 + 0x7f, 0xff, 0xff, 0xff, 0x80, 0x00, 0x00, 0x00, 258 + 0xa0, 0x75, 0x9b, 0xc5, 0xaa, 0x00, 0xe3, 0xb2, 259 + 0xde, 0x73, 0x7d, 0x56, 0xd3, 0x8b, 0xcf, 0x42, 260 + 0x79, 0xdc, 0xe5, 0x61, 0x7e, 0x31, 0x92, 0xa8, 261 + }; 262 + 263 + const secp256k1_half_order = [32]u8{ 264 + 0x7f, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 265 + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xfe, 266 + 0xba, 0xae, 0xdc, 0xe6, 0xaf, 0x48, 0xa0, 0x3b, 267 + 0xbf, 0xd2, 0x5e, 0x8c, 0xd0, 0x36, 0x41, 0x41, 268 + }; 269 + 270 + // --- reporting --- 271 + 272 + fn reportResult(name: []const u8, corpus: CorpusInfo, pass_results: []const PassResult) void { 273 + var fps_values: [measured_passes]f64 = undefined; 274 + var total_verified: usize = 0; 275 + var total_errors: usize = 0; 276 + 277 + for (pass_results, 0..) |r, i| { 278 + const elapsed_s = @as(f64, @floatFromInt(r.elapsed_ns)) / 1_000_000_000.0; 279 + fps_values[i] = @as(f64, @floatFromInt(r.frames)) / elapsed_s; 280 + total_verified += r.verified; 281 + total_errors += r.errors; 282 + } 283 + 284 + std.mem.sort(f64, &fps_values, {}, std.sort.asc(f64)); 285 + 286 + const median = fps_values[measured_passes / 2]; 287 + const throughput_mb = median * @as(f64, @floatFromInt(corpus.total_frame_bytes)) / @as(f64, @floatFromInt(corpus.entries.len)) / (1024.0 * 1024.0); 288 + 289 + // headroom over production firehose (~600 events/sec) 290 + const headroom = median / 600.0; 291 + 292 + std.debug.print("{s: <16} {d:>8.0} frames/sec {d:>6.1} MB/s {d:.0}x headroom verified={d} errors={d}\n", .{ 293 + name, 294 + median, 295 + throughput_mb, 296 + headroom, 297 + total_verified, 298 + total_errors, 299 + }); 300 + std.debug.print("{s: <16} variance: min={d:.0} median={d:.0} max={d:.0} frames/sec\n", .{ 301 + "", 302 + fps_values[0], 303 + median, 304 + fps_values[measured_passes - 1], 305 + }); 306 + } 307 + 308 + fn printStageBreakdown(corpus: CorpusInfo, timings: [measured_passes]StageTimings) void { 309 + // use median pass (sort by total time) 310 + var total_per_pass: [measured_passes]u64 = undefined; 311 + var indices: [measured_passes]usize = undefined; 312 + for (0..measured_passes) |i| { 313 + total_per_pass[i] = timings[i].decode_ns + timings[i].car_ns + timings[i].verify_ns; 314 + indices[i] = i; 315 + } 316 + 317 + // simple selection of median 318 + var sorted_totals: [measured_passes]u64 = total_per_pass; 319 + std.mem.sort(u64, &sorted_totals, {}, std.sort.asc(u64)); 320 + const median_total = sorted_totals[measured_passes / 2]; 321 + 322 + // find the pass with that total 323 + var median_idx: usize = 0; 324 + for (0..measured_passes) |i| { 325 + if (total_per_pass[i] == median_total) { 326 + median_idx = i; 327 + break; 328 + } 329 + } 330 + 331 + const t = timings[median_idx]; 332 + const total = t.decode_ns + t.car_ns + t.verify_ns; 333 + const n = corpus.entries.len; 334 + 335 + std.debug.print(" decode (cbor): {d:>6}µs/frame {d:>5.1}%\n", .{ 336 + t.decode_ns / (n * 1000), 337 + @as(f64, @floatFromInt(t.decode_ns)) / @as(f64, @floatFromInt(total)) * 100.0, 338 + }); 339 + std.debug.print(" car parse: {d:>6}µs/frame {d:>5.1}%\n", .{ 340 + t.car_ns / (n * 1000), 341 + @as(f64, @floatFromInt(t.car_ns)) / @as(f64, @floatFromInt(total)) * 100.0, 342 + }); 343 + std.debug.print(" sig verify: {d:>6}µs/frame {d:>5.1}%\n", .{ 344 + t.verify_ns / (n * 1000), 345 + @as(f64, @floatFromInt(t.verify_ns)) / @as(f64, @floatFromInt(total)) * 100.0, 346 + }); 347 + std.debug.print(" total: {d:>6}µs/frame\n", .{total / (n * 1000)}); 348 + } 349 + 350 + fn writeJson( 351 + allocator: Allocator, 352 + corpus: CorpusInfo, 353 + reuse_results: [measured_passes]PassResult, 354 + alloc_results: [measured_passes]PassResult, 355 + ) void { 356 + const variants = [_]struct { name: []const u8, file: []const u8, results: [measured_passes]PassResult }{ 357 + .{ .name = "relay-reuse", .file = fixtures_dir ++ "/relay-reuse-zig.json", .results = reuse_results }, 358 + .{ .name = "relay-alloc", .file = fixtures_dir ++ "/relay-alloc-zig.json", .results = alloc_results }, 359 + }; 360 + 361 + for (variants) |v| { 362 + const json = buildJson(allocator, v.name, corpus, &v.results) catch continue; 363 + defer allocator.free(json); 364 + const file = std.fs.cwd().createFile(v.file, .{}) catch continue; 365 + defer file.close(); 366 + file.writeAll(json) catch continue; 367 + } 368 + } 369 + 370 + fn buildJson( 371 + allocator: Allocator, 372 + variant: []const u8, 373 + corpus: CorpusInfo, 374 + pass_results: []const PassResult, 375 + ) ![]u8 { 376 + var fps_values: [measured_passes]f64 = undefined; 377 + for (pass_results, 0..) |r, i| { 378 + const elapsed_s = @as(f64, @floatFromInt(r.elapsed_ns)) / 1_000_000_000.0; 379 + fps_values[i] = @as(f64, @floatFromInt(r.frames)) / elapsed_s; 380 + } 381 + std.mem.sort(f64, &fps_values, {}, std.sort.asc(f64)); 382 + 383 + var buf: std.ArrayListUnmanaged(u8) = .{}; 384 + errdefer buf.deinit(allocator); 385 + const w = buf.writer(allocator); 386 + 387 + try w.print( 388 + \\{{ 389 + \\ "benchmark": "relay-pipeline", 390 + \\ "sdk": "zig (zat)", 391 + \\ "variant": "{s}", 392 + \\ "corpus": {{ "frames": {d}, "total_bytes": {d} }}, 393 + \\ "passes": [ 394 + \\ 395 + , .{ variant, corpus.entries.len, corpus.total_frame_bytes }); 396 + 397 + for (pass_results, 0..) |r, i| { 398 + const elapsed_s = @as(f64, @floatFromInt(r.elapsed_ns)) / 1_000_000_000.0; 399 + const fps = @as(f64, @floatFromInt(r.frames)) / elapsed_s; 400 + try w.print( 401 + \\ {{ "frames_per_sec": {d:.0}, "verified": {d}, "errors": {d} }} 402 + , .{ fps, r.verified, r.errors }); 403 + if (i < pass_results.len - 1) try w.writeAll(","); 404 + try w.writeAll("\n"); 405 + } 406 + 407 + try w.print( 408 + \\ ], 409 + \\ "median_frames_per_sec": {d:.0}, 410 + \\ "min_frames_per_sec": {d:.0}, 411 + \\ "max_frames_per_sec": {d:.0} 412 + \\}} 413 + \\ 414 + , .{ 415 + fps_values[measured_passes / 2], 416 + fps_values[0], 417 + fps_values[measured_passes - 1], 418 + }); 419 + 420 + return try buf.toOwnedSlice(allocator); 421 + } 422 + 423 + // --- corpus loading --- 424 + 425 + fn loadCorpus(allocator: Allocator, path: []const u8) !CorpusInfo { 426 + const file = std.fs.cwd().openFile(path, .{}) catch |err| { 427 + std.debug.print("cannot open {s}: {s}\n", .{ path, @errorName(err) }); 428 + std.debug.print("run `zig build run-capture-relay` first to generate corpus\n", .{}); 429 + return err; 430 + }; 431 + defer file.close(); 432 + const data = try file.readToEndAlloc(allocator, 100 * 1024 * 1024); 433 + 434 + if (data.len < 4) return error.InvalidFormat; 435 + 436 + const entry_count = std.mem.readInt(u32, data[0..4], .big); 437 + var entries: std.ArrayListUnmanaged(CorpusEntry) = .{}; 438 + var pos: usize = 4; 439 + var total_frame_bytes: usize = 0; 440 + 441 + for (0..entry_count) |_| { 442 + // frame 443 + if (pos + 4 > data.len) return error.InvalidFormat; 444 + const frame_len = std.mem.readInt(u32, data[pos..][0..4], .big); 445 + pos += 4; 446 + if (pos + frame_len > data.len) return error.InvalidFormat; 447 + const frame = data[pos..][0..frame_len]; 448 + pos += frame_len; 449 + total_frame_bytes += frame_len; 450 + 451 + // key 452 + if (pos + 2 > data.len) return error.InvalidFormat; 453 + const curve_type = data[pos]; 454 + const pubkey_len = data[pos + 1]; 455 + pos += 2; 456 + if (pos + pubkey_len > data.len) return error.InvalidFormat; 457 + const pubkey = data[pos..][0..pubkey_len]; 458 + pos += pubkey_len; 459 + 460 + try entries.append(allocator, .{ 461 + .frame = frame, 462 + .curve_type = curve_type, 463 + .pubkey = pubkey, 464 + }); 465 + } 466 + 467 + return .{ 468 + .entries = try entries.toOwnedSlice(allocator), 469 + .total_frame_bytes = total_frame_bytes, 470 + }; 471 + }
+345
zig/src/capture_relay.zig
··· 1 + //! capture relay benchmark corpus from live AT Protocol firehose. 2 + //! 3 + //! pairs raw firehose frames with pre-resolved signing keys so that 4 + //! bench_relay can run the full relay hot path without network access. 5 + //! 6 + //! corpus format: 7 + //! [u32 BE entry_count] 8 + //! per entry: 9 + //! [u32 BE frame_len][frame_bytes...] // raw firehose frame 10 + //! [u8 curve_type] // 0 = P-256, 1 = secp256k1 11 + //! [u8 pubkey_len][pubkey_bytes...] // compressed public key 12 + 13 + const std = @import("std"); 14 + const zat = @import("zat"); 15 + const websocket = @import("websocket"); 16 + 17 + const Allocator = std.mem.Allocator; 18 + const cbor = zat.cbor; 19 + const car = zat.car; 20 + 21 + const fixtures_dir = "../fixtures"; 22 + const capture_duration_s = 10; 23 + const ns_per_s = std.time.ns_per_s; 24 + const max_resolve_threads = 16; 25 + 26 + const CachedKey = struct { 27 + curve_type: u8, 28 + raw: []const u8, 29 + }; 30 + 31 + const CorpusEntry = struct { 32 + frame: []const u8, 33 + curve_type: u8, 34 + pubkey: []const u8, 35 + }; 36 + 37 + pub fn main() !void { 38 + var gpa: std.heap.GeneralPurposeAllocator(.{}) = .{}; 39 + defer _ = gpa.deinit(); 40 + const allocator = gpa.allocator(); 41 + 42 + // phase 1: capture raw firehose frames 43 + std.debug.print("capturing frames from firehose...\n", .{}); 44 + const raw_frames = try captureRawFrames(allocator); 45 + defer { 46 + for (raw_frames) |f| allocator.free(f); 47 + allocator.free(raw_frames); 48 + } 49 + std.debug.print("\ncaptured {d} frames\n", .{raw_frames.len}); 50 + 51 + // phase 2: extract DIDs from frames (need signing keys) 52 + var frame_dids: std.ArrayListUnmanaged(struct { idx: usize, did: []const u8 }) = .{}; 53 + defer frame_dids.deinit(allocator); 54 + 55 + for (raw_frames, 0..) |frame, i| { 56 + var arena = std.heap.ArenaAllocator.init(allocator); 57 + defer arena.deinit(); 58 + if (extractCommitDid(arena.allocator(), frame)) |did| { 59 + try frame_dids.append(allocator, .{ .idx = i, .did = try allocator.dupe(u8, did) }); 60 + } 61 + } 62 + defer for (frame_dids.items) |fd| allocator.free(fd.did); 63 + 64 + std.debug.print("{d} frames have commits\n", .{frame_dids.items.len}); 65 + 66 + // phase 3: collect unique DIDs and resolve concurrently 67 + var unique_set = std.StringHashMapUnmanaged(usize){}; 68 + defer unique_set.deinit(allocator); 69 + var unique_dids: std.ArrayListUnmanaged([]const u8) = .{}; 70 + defer unique_dids.deinit(allocator); 71 + 72 + for (frame_dids.items) |fd| { 73 + const gop = try unique_set.getOrPut(allocator, fd.did); 74 + if (!gop.found_existing) { 75 + gop.value_ptr.* = unique_dids.items.len; 76 + try unique_dids.append(allocator, fd.did); 77 + } 78 + } 79 + 80 + const n_dids = unique_dids.items.len; 81 + const n_threads = @min(max_resolve_threads, n_dids); 82 + std.debug.print("{d} unique DIDs, resolving with {d} threads...\n", .{ n_dids, n_threads }); 83 + 84 + const resolve_results = try allocator.alloc(?CachedKey, n_dids); 85 + defer { 86 + for (resolve_results) |r| if (r) |key| allocator.free(key.raw); 87 + allocator.free(resolve_results); 88 + } 89 + @memset(resolve_results, null); 90 + 91 + if (n_threads > 0) { 92 + var threads: [max_resolve_threads]std.Thread = undefined; 93 + for (0..n_threads) |t| { 94 + const start = t * n_dids / n_threads; 95 + const end = (t + 1) * n_dids / n_threads; 96 + threads[t] = try std.Thread.spawn(.{}, resolveWorker, .{ 97 + allocator, unique_dids.items, resolve_results, start, end, 98 + }); 99 + } 100 + for (0..n_threads) |t| threads[t].join(); 101 + } 102 + 103 + var resolved: usize = 0; 104 + for (resolve_results) |r| if (r != null) { 105 + resolved += 1; 106 + }; 107 + std.debug.print("resolved {d}/{d} DIDs\n", .{ resolved, n_dids }); 108 + 109 + // phase 4: validate each frame's signature, build corpus 110 + var entries: std.ArrayListUnmanaged(CorpusEntry) = .{}; 111 + defer entries.deinit(allocator); 112 + var verify_ok: usize = 0; 113 + var verify_fail: usize = 0; 114 + 115 + for (frame_dids.items) |fd| { 116 + const did_idx = unique_set.get(fd.did) orelse continue; 117 + const key = resolve_results[did_idx] orelse continue; 118 + 119 + // verify the frame's commit signature before including in corpus 120 + var arena = std.heap.ArenaAllocator.init(allocator); 121 + defer arena.deinit(); 122 + if (verifyFrameCommit(arena.allocator(), raw_frames[fd.idx], key)) { 123 + try entries.append(allocator, .{ 124 + .frame = raw_frames[fd.idx], 125 + .curve_type = key.curve_type, 126 + .pubkey = key.raw, 127 + }); 128 + verify_ok += 1; 129 + } else { 130 + verify_fail += 1; 131 + } 132 + } 133 + 134 + std.debug.print("validated: {d} pass, {d} fail\n", .{ verify_ok, verify_fail }); 135 + 136 + if (entries.items.len == 0) { 137 + std.debug.print("no valid entries — cannot write corpus\n", .{}); 138 + return; 139 + } 140 + 141 + // phase 5: write corpus 142 + try writeCorpus(entries.items); 143 + } 144 + 145 + fn writeCorpus(entries: []const CorpusEntry) !void { 146 + const path = fixtures_dir ++ "/relay-corpus.bin"; 147 + const file = try std.fs.cwd().createFile(path, .{}); 148 + defer file.close(); 149 + 150 + var count_buf: [4]u8 = undefined; 151 + std.mem.writeInt(u32, &count_buf, @intCast(entries.len), .big); 152 + try file.writeAll(&count_buf); 153 + 154 + var total_bytes: usize = 4; 155 + for (entries) |entry| { 156 + // frame 157 + var frame_len: [4]u8 = undefined; 158 + std.mem.writeInt(u32, &frame_len, @intCast(entry.frame.len), .big); 159 + try file.writeAll(&frame_len); 160 + try file.writeAll(entry.frame); 161 + total_bytes += 4 + entry.frame.len; 162 + 163 + // key 164 + try file.writeAll(&.{entry.curve_type}); 165 + try file.writeAll(&.{@as(u8, @intCast(entry.pubkey.len))}); 166 + try file.writeAll(entry.pubkey); 167 + total_bytes += 2 + entry.pubkey.len; 168 + } 169 + 170 + std.debug.print("wrote {d} entries ({d} bytes) to {s}\n", .{ entries.len, total_bytes, path }); 171 + } 172 + 173 + // --- firehose capture --- 174 + 175 + fn captureRawFrames(allocator: Allocator) ![]const []const u8 { 176 + const host = "bsky.network"; 177 + var client = try websocket.Client.init(allocator, .{ 178 + .host = host, 179 + .port = 443, 180 + .tls = true, 181 + .max_size = 5 * 1024 * 1024, 182 + }); 183 + defer client.deinit(); 184 + 185 + var host_buf: [256]u8 = undefined; 186 + const host_header = try std.fmt.bufPrint(&host_buf, "Host: {s}\r\n", .{host}); 187 + 188 + try client.handshake("/xrpc/com.atproto.sync.subscribeRepos", .{ 189 + .headers = host_header, 190 + }); 191 + 192 + std.debug.print("connected, capturing for ~{d}s...\n", .{capture_duration_s}); 193 + 194 + var handler = CaptureHandler{ 195 + .allocator = allocator, 196 + .deadline = @as(i128, std.time.nanoTimestamp()) + @as(i128, capture_duration_s) * ns_per_s, 197 + }; 198 + client.readLoop(&handler) catch |err| switch (err) { 199 + error.ConnectionClosed => {}, 200 + else => return err, 201 + }; 202 + 203 + return try handler.frames.toOwnedSlice(allocator); 204 + } 205 + 206 + const CaptureHandler = struct { 207 + allocator: Allocator, 208 + deadline: i128, 209 + frames: std.ArrayListUnmanaged([]const u8) = .{}, 210 + 211 + pub fn serverMessage(self: *CaptureHandler, data: []const u8) !void { 212 + if (std.time.nanoTimestamp() >= self.deadline) return error.ConnectionClosed; 213 + 214 + // only capture commits with ops 215 + var arena = std.heap.ArenaAllocator.init(self.allocator); 216 + defer arena.deinit(); 217 + if (!isCommitWithOps(arena.allocator(), data)) return; 218 + 219 + try self.frames.append(self.allocator, try self.allocator.dupe(u8, data)); 220 + } 221 + 222 + pub fn close(_: *CaptureHandler) void {} 223 + }; 224 + 225 + fn isCommitWithOps(allocator: Allocator, data: []const u8) bool { 226 + const header_result = cbor.decode(allocator, data) catch return false; 227 + const t = header_result.value.getString("t") orelse return false; 228 + if (!std.mem.eql(u8, t, "#commit")) return false; 229 + const payload = cbor.decodeAll(allocator, data[header_result.consumed..]) catch return false; 230 + const ops = payload.getArray("ops") orelse return false; 231 + return ops.len > 0; 232 + } 233 + 234 + fn extractCommitDid(allocator: Allocator, data: []const u8) ?[]const u8 { 235 + const header_result = cbor.decode(allocator, data) catch return null; 236 + const t = header_result.value.getString("t") orelse return null; 237 + if (!std.mem.eql(u8, t, "#commit")) return null; 238 + const payload = cbor.decodeAll(allocator, data[header_result.consumed..]) catch return null; 239 + return payload.getString("repo"); 240 + } 241 + 242 + // --- DID resolution --- 243 + 244 + fn resolveWorker(allocator: Allocator, dids: []const []const u8, results: []?CachedKey, start: usize, end: usize) void { 245 + var resolver = zat.DidResolver.init(allocator); 246 + defer resolver.deinit(); 247 + for (start..end) |i| { 248 + results[i] = resolveDidKey(allocator, &resolver, dids[i]); 249 + } 250 + } 251 + 252 + fn resolveDidKey(allocator: Allocator, resolver: *zat.DidResolver, did_str: []const u8) ?CachedKey { 253 + var arena = std.heap.ArenaAllocator.init(allocator); 254 + defer arena.deinit(); 255 + 256 + const did = zat.Did.parse(did_str) orelse return null; 257 + var doc = resolver.resolve(did) catch return null; 258 + defer doc.deinit(); 259 + 260 + const vm = doc.signingKey() orelse return null; 261 + const decoded = zat.multibase.decode(arena.allocator(), vm.public_key_multibase) catch return null; 262 + const parsed = zat.multicodec.parsePublicKey(decoded) catch return null; 263 + 264 + return .{ 265 + .curve_type = switch (parsed.key_type) { 266 + .p256 => 0, 267 + .secp256k1 => 1, 268 + }, 269 + .raw = allocator.dupe(u8, parsed.raw) catch return null, 270 + }; 271 + } 272 + 273 + // --- frame-level signature verification --- 274 + 275 + fn verifyFrameCommit(allocator: Allocator, frame: []const u8, key: CachedKey) bool { 276 + const header_result = cbor.decode(allocator, frame) catch return false; 277 + const payload = cbor.decodeAll(allocator, frame[header_result.consumed..]) catch return false; 278 + const blocks_bytes = payload.getBytes("blocks") orelse return false; 279 + const parsed_car = car.read(allocator, blocks_bytes) catch return false; 280 + 281 + // find commit block (has sig + did) 282 + for (parsed_car.blocks) |block| { 283 + const value = cbor.decodeAll(allocator, block.data) catch continue; 284 + const sig_bytes = value.getBytes("sig") orelse continue; 285 + if (value.getString("did") == null) continue; 286 + if (sig_bytes.len != 64) continue; 287 + 288 + // strip sig, re-encode 289 + const map_entries = switch (value) { 290 + .map => |m| m, 291 + else => continue, 292 + }; 293 + var unsigned: std.ArrayListUnmanaged(cbor.Value.MapEntry) = .{}; 294 + for (map_entries) |e| { 295 + if (!std.mem.eql(u8, e.key, "sig")) unsigned.append(allocator, e) catch continue; 296 + } 297 + const unsigned_bytes = cbor.encodeAlloc(allocator, .{ .map = unsigned.items }) catch continue; 298 + 299 + return if (key.curve_type == 0) 300 + verifyP256(unsigned_bytes, sig_bytes, key.raw) 301 + else 302 + verifySecp256k1(unsigned_bytes, sig_bytes, key.raw); 303 + } 304 + return false; 305 + } 306 + 307 + fn verifyP256(message: []const u8, sig_bytes: []const u8, pubkey_bytes: []const u8) bool { 308 + const Scheme = std.crypto.sign.ecdsa.EcdsaP256Sha256; 309 + const sig = Scheme.Signature.fromBytes(sig_bytes[0..64].*); 310 + if (isHighS(p256_half_order, sig.s)) return false; 311 + const pk = Scheme.PublicKey.fromSec1(pubkey_bytes) catch return false; 312 + sig.verify(message, pk) catch return false; 313 + return true; 314 + } 315 + 316 + fn verifySecp256k1(message: []const u8, sig_bytes: []const u8, pubkey_bytes: []const u8) bool { 317 + const Scheme = std.crypto.sign.ecdsa.EcdsaSecp256k1Sha256; 318 + const sig = Scheme.Signature.fromBytes(sig_bytes[0..64].*); 319 + if (isHighS(secp256k1_half_order, sig.s)) return false; 320 + const pk = Scheme.PublicKey.fromSec1(pubkey_bytes) catch return false; 321 + sig.verify(message, pk) catch return false; 322 + return true; 323 + } 324 + 325 + fn isHighS(half_order: [32]u8, s: [32]u8) bool { 326 + for (0..32) |i| { 327 + if (s[i] < half_order[i]) return false; 328 + if (s[i] > half_order[i]) return true; 329 + } 330 + return false; 331 + } 332 + 333 + const p256_half_order = [32]u8{ 334 + 0x7f, 0xff, 0xff, 0xff, 0x80, 0x00, 0x00, 0x00, 335 + 0xa0, 0x75, 0x9b, 0xc5, 0xaa, 0x00, 0xe3, 0xb2, 336 + 0xde, 0x73, 0x7d, 0x56, 0xd3, 0x8b, 0xcf, 0x42, 337 + 0x79, 0xdc, 0xe5, 0x61, 0x7e, 0x31, 0x92, 0xa8, 338 + }; 339 + 340 + const secp256k1_half_order = [32]u8{ 341 + 0x7f, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 342 + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xfe, 343 + 0xba, 0xae, 0xdc, 0xe6, 0xaf, 0x48, 0xa0, 0x3b, 344 + 0xbf, 0xd2, 0x5e, 0x8c, 0xd0, 0x36, 0x41, 0x41, 345 + };