this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: infallible decode in all SDKs, honest capture coupling disclosure

- rust/python/go: decode_full never propagates errors — catches
frame-level failures and counts them, matching zig's pattern
- go: distinguish io.EOF from real errors in CAR block iteration
- blocks/frame precision bumped from .1 to .2 (9.98 not 10.0)
- README: disclose zat CBOR decoder used in capture filter,
acknowledge 30-40% run-to-run variance in methodology

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz e6d818c2 bbe840f6

+63 -57
+5 -3
README.md
··· 47 47 - **go** indigo — bluesky's own production relay — is the slowest despite using code-generated CBOR unmarshal (no reflection). the cost is GC pressure: every string, byte slice, and block is a heap allocation that the garbage collector has to sweep. at ~10 blocks/frame, that's a lot of short-lived objects per decode 48 48 - **python** is faster than the rust and go benchmarks despite being "Python" — its hot path is `libipld` (Rust via PyO3), which does the entire CAR parse + per-block DAG-CBOR decode in one synchronous C-extension call. it uses a different (and faster) Rust library than the rust benchmark does 49 49 - **rust** pays async overhead (tokio runtime + iroh-car's async `CarReader`) even though the I/O is an in-memory buffer. every `next_block().await` goes through poll/wake per block (~10 blocks/frame). this is why the rust benchmark is slower than python's libipld, which does the same work synchronously. there's no sync alternative in the iroh-car library 50 - - errors are tracked and reported, not swallowed. SDKs continue on per-block decode failures but count them 50 + - **error handling**: all SDKs use infallible decode functions that never abort on failure — errors are counted and the frame is skipped. this means a corrupted frame doesn't invalidate the entire benchmark run 51 + - **capture coupling**: the corpus capture tool uses zat's CBOR decoder for the commit-with-ops header peek. this is standard CBOR parsing (not zat's typed firehose decoder), but it does mean frames that zat's CBOR decoder rejects won't appear in the corpus. in practice, CBOR header parsing is the least likely step to diverge across implementations 51 52 52 53 ## corpus format 53 54 ··· 60 61 ... 61 62 ``` 62 63 63 - frames are captured from ~10 seconds of live firehose traffic, filtered to commits with ops using a minimal CBOR header peek (no SDK-specific decode). this gives a realistic mix of frame sizes and record types without biasing the corpus toward any particular SDK's decoder. 64 + frames are captured from ~10 seconds of live firehose traffic, filtered to commits with ops using a minimal CBOR header peek (zat's CBOR decoder parses just the header map and checks `t == "#commit"` + non-empty `ops` array). this is standard CBOR, not zat's typed firehose decoder, but see fairness notes for the coupling caveat. 64 65 65 66 ## when this matters 66 67 ··· 90 91 91 92 ## methodology 92 93 93 - - `just capture` connects to the live firehose for ~10 seconds, filters for commits with ops via minimal CBOR header peek, writes a length-prefixed corpus 94 + - `just capture` connects to the live firehose for ~10 seconds, filters for commits with ops via CBOR header peek (uses zat's CBOR decoder — see fairness notes), writes a length-prefixed corpus 94 95 - each benchmark decodes every frame fully: header → payload → CAR → decode every block as DAG-CBOR 95 96 - 2 warmup passes, 5 measured passes over the full corpus 96 97 - zig builds with `-Doptimize=ReleaseFast`, rust with `opt-level=3 lto=true` 97 98 - go and python use their standard release toolchains 98 99 - reported numbers: median frames/sec across passes, plus min/max for variance. block counts and error counts verify work parity across SDKs 100 + - **run-to-run variance is significant** (~30-40% between separate invocations due to system load, thermal state, etc.). ratios between SDKs should be compared within a single `just bench` run, not across runs
+22 -18
go/main.go
··· 8 8 "bytes" 9 9 "encoding/binary" 10 10 "fmt" 11 + "io" 11 12 "os" 12 13 "path/filepath" 13 14 "sort" ··· 61 62 62 63 // verify first frame 63 64 if len(corpus.frames) > 0 { 64 - result, err := decodeFull(corpus.frames[0]) 65 - if err != nil { 66 - fmt.Printf("first frame verify failed: %v\n", err) 67 - return 68 - } 65 + result := decodeFull(corpus.frames[0]) 69 66 fmt.Printf("first frame: blocks=%d errors=%d\n", result.blocks, result.errors) 70 67 } 71 68 fmt.Println() ··· 75 72 fmt.Println() 76 73 } 77 74 78 - func decodeFull(data []byte) (*decodeResult, error) { 75 + // decodeFull decodes a single firehose frame. never returns error — 76 + // counts failures and continues, matching the behavior of all other SDKs. 77 + func decodeFull(data []byte) *decodeResult { 78 + result := &decodeResult{} 79 + 79 80 var evt events.XRPCStreamEvent 80 81 if err := evt.Deserialize(bytes.NewReader(data)); err != nil { 81 - return nil, err 82 + result.errors++ 83 + return result 82 84 } 83 85 84 - result := &decodeResult{} 85 86 commit := evt.RepoCommit 86 87 if commit != nil && len(commit.Blocks) > 0 { 87 88 reader, err := car.NewBlockReader(bytes.NewReader([]byte(commit.Blocks))) 88 89 if err != nil { 89 - return result, err 90 + result.errors++ 91 + return result 90 92 } 91 93 92 94 for { 93 95 block, err := reader.Next() 96 + if err == io.EOF { 97 + break 98 + } 94 99 if err != nil { 95 - break 100 + result.errors++ 101 + continue 96 102 } 97 103 98 104 var v interface{} ··· 104 110 } 105 111 } 106 112 107 - return result, nil 113 + return result 108 114 } 109 115 110 116 func benchDecode(corpus *corpusInfo) { 111 117 for i := 0; i < warmupPasses; i++ { 112 118 for _, frame := range corpus.frames { 113 - decodeFull(frame) 119 + _ = decodeFull(frame) 114 120 } 115 121 } 116 122 ··· 120 126 var passBlocks, passErrors int 121 127 start := time.Now() 122 128 for _, frame := range corpus.frames { 123 - result, _ := decodeFull(frame) 124 - if result != nil { 125 - passBlocks += result.blocks 126 - passErrors += result.errors 127 - } 129 + result := decodeFull(frame) 130 + passBlocks += result.blocks 131 + passErrors += result.errors 128 132 } 129 133 elapsed := time.Since(start) 130 134 passResults[i] = passResult{ ··· 164 168 medianFps := fpsValues[measuredPasses/2] 165 169 maxFps := fpsValues[measuredPasses-1] 166 170 167 - fmt.Printf("%-14s %10.0f frames/sec %8.1f MB/s blocks=%d (%.1f/frame) errors=%d\n", 171 + fmt.Printf("%-14s %10.0f frames/sec %8.1f MB/s blocks=%d (%.2f/frame) errors=%d\n", 168 172 name, medianFps, throughputMb, totalBlocks, blocksPerFrame, totalErrors) 169 173 fmt.Printf("%-14s variance: min=%.0f median=%.0f max=%.0f frames/sec\n", 170 174 "", minFps, medianFps, maxFps)
+14 -14
python/bench.py
··· 84 84 85 85 86 86 def decode_full(frame_data: bytes) -> DecodeResult: 87 - """Full decode: frame parse + typed commit + CAR block extraction + per-block decode.""" 88 - frame = firehose_models.Frame.from_bytes(frame_data) 89 - msg = parse_subscribe_repos_message(frame) 87 + """Full decode: frame parse + typed commit + CAR block extraction + per-block decode. 88 + 89 + Never raises — counts failures and continues. 90 + """ 91 + try: 92 + frame = firehose_models.Frame.from_bytes(frame_data) 93 + msg = parse_subscribe_repos_message(frame) 94 + except Exception: 95 + return DecodeResult(blocks=0, errors=1) 90 96 91 97 blocks = 0 92 98 errors = 0 ··· 153 159 154 160 print( 155 161 f"{name:<14} {median_fps:>10.0f} frames/sec {throughput_mb:>8.1f} MB/s" 156 - f" blocks={total_blocks} ({blocks_per_frame:.1f}/frame) errors={total_errors}" 162 + f" blocks={total_blocks} ({blocks_per_frame:.2f}/frame) errors={total_errors}" 157 163 ) 158 164 print( 159 165 f"{'':14} variance: min={min_fps:.0f} median={median_fps:.0f} max={max_fps:.0f} frames/sec" ··· 175 181 print() 176 182 177 183 # verify first frame 178 - try: 179 - result = decode_full(corpus.frames[0]) 180 - print(f"first frame: blocks={result.blocks} errors={result.errors}") 181 - print() 182 - except Exception as e: 183 - print(f"first frame verify failed: {e}") 184 + result = decode_full(corpus.frames[0]) 185 + print(f"first frame: blocks={result.blocks} errors={result.errors}") 186 + print() 184 187 185 - try: 186 - bench_decode(corpus) 187 - except Exception as e: 188 - print(f"decode: SKIP ({e})") 188 + bench_decode(corpus) 189 189 190 190 print() 191 191
+21 -21
rust/src/main.rs
··· 69 69 70 70 // verify first frame 71 71 if let Some(first) = corpus.frames().next() { 72 - match decode_full(first).await { 73 - Ok(result) => { 74 - println!( 75 - "first frame: blocks={} errors={}", 76 - result.blocks, result.errors, 77 - ); 78 - } 79 - Err(e) => println!("first frame verify failed: {e}"), 80 - } 72 + let result = decode_full(first).await; 73 + println!( 74 + "first frame: blocks={} errors={}", 75 + result.blocks, result.errors, 76 + ); 81 77 } 82 78 println!(); 83 79 84 - if let Err(e) = bench_decode(&corpus).await { 85 - println!("decode: SKIP ({e})"); 86 - } 80 + bench_decode(&corpus).await; 87 81 88 82 println!(); 89 83 } 90 84 91 85 /// full decode: jacquard decode_framed → parse_car_bytes → decode every block. 92 - async fn decode_full(data: &[u8]) -> Result<DecodeResult, Box<dyn std::error::Error>> { 93 - let msg = SubscribeReposMessage::decode_framed(data)?; 86 + /// never panics or propagates errors — counts failures and continues. 87 + async fn decode_full(data: &[u8]) -> DecodeResult { 88 + let msg = match SubscribeReposMessage::decode_framed(data) { 89 + Ok(m) => m, 90 + Err(_) => return DecodeResult { blocks: 0, errors: 1 }, 91 + }; 94 92 95 93 let mut blocks = 0usize; 96 94 let mut errors = 0usize; 97 95 98 96 if let SubscribeReposMessage::Commit(commit) = msg { 99 97 if !commit.blocks.is_empty() { 100 - let parsed = jacquard_repo::car::parse_car_bytes(&commit.blocks).await?; 98 + let parsed = match jacquard_repo::car::parse_car_bytes(&commit.blocks).await { 99 + Ok(p) => p, 100 + Err(_) => return DecodeResult { blocks: 0, errors: 1 }, 101 + }; 101 102 for (_cid, block_data) in &parsed.blocks { 102 103 match serde_ipld_dagcbor::from_slice::<Ipld>(block_data) { 103 104 Ok(_) => blocks += 1, ··· 107 108 } 108 109 } 109 110 110 - Ok(DecodeResult { blocks, errors }) 111 + DecodeResult { blocks, errors } 111 112 } 112 113 113 114 // --- benchmark --- 114 115 115 - async fn bench_decode(corpus: &CorpusInfo) -> Result<(), Box<dyn std::error::Error>> { 116 + async fn bench_decode(corpus: &CorpusInfo) { 116 117 for _ in 0..WARMUP_PASSES { 117 118 for frame in corpus.frames() { 118 - decode_full(frame).await?; 119 + decode_full(frame).await; 119 120 } 120 121 } 121 122 ··· 126 127 let mut pass_errors = 0usize; 127 128 let start = Instant::now(); 128 129 for frame in corpus.frames() { 129 - let result = decode_full(frame).await?; 130 + let result = decode_full(frame).await; 130 131 pass_blocks += result.blocks; 131 132 pass_errors += result.errors; 132 133 } ··· 140 141 } 141 142 142 143 report_result("decode", corpus, &pass_results); 143 - Ok(()) 144 144 } 145 145 146 146 // --- util --- ··· 166 166 let max_fps = fps_values[MEASURED_PASSES - 1]; 167 167 168 168 println!( 169 - "{:<14} {:>10.0} frames/sec {:>8.1} MB/s blocks={} ({:.1}/frame) errors={}", 169 + "{:<14} {:>10.0} frames/sec {:>8.1} MB/s blocks={} ({:.2}/frame) errors={}", 170 170 name, median_fps, throughput_mb, total_blocks, blocks_per_frame, total_errors, 171 171 ); 172 172 println!(
+1 -1
zig/src/bench.zig
··· 211 211 const throughput_mb = total_bytes / (1024.0 * 1024.0) / elapsed_s; 212 212 const blocks_per_frame = @as(f64, @floatFromInt(total_blocks)) / @as(f64, @floatFromInt(total_frames)); 213 213 214 - std.debug.print("{s: <14} {d:>10.0} frames/sec {d:>8.1} MB/s blocks={d} ({d:.1}/frame) errors={d}\n", .{ 214 + std.debug.print("{s: <14} {d:>10.0} frames/sec {d:>8.1} MB/s blocks={d} ({d:.2}/frame) errors={d}\n", .{ 215 215 name, 216 216 median_fps, 217 217 throughput_mb,