this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: normalize all SDKs to decode every CAR block

- capture.zig: replace zat.firehose.decodeFrame with minimal CBOR
header peek — removes SDK bias from corpus generation
- bench.zig: decode header → payload → CAR → every block (was only
decoding op-linked blocks)
- bench.py: count blocks from CAR.from_bytes() (libipld already
decodes them internally)
- main.go: track error counts instead of swallowing, count blocks
- main.rs: add block/error counting to existing full-block decode
- all SDKs: report blocks/frame, error count, variance (min/median/max)
- README: honest fairness notes, explain python > rust result

verified: all four SDKs report identical block counts on same corpus.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz 423604fe 32961a29

+413 -142
+32 -25
README.md
··· 3 3 SDK-level firehose benchmarks for AT Protocol. 4 4 5 5 decodes a corpus of real firehose frames using each language's AT Protocol SDK. 6 - same corpus, same work, measured side by side. 6 + same corpus, normalized work — every SDK does: header → commit → CAR → decode every block as DAG-CBOR. 7 7 8 8 ## what this measures 9 9 10 - each benchmark calls the SDK's real consumer API: raw firehose frame bytes in, typed commit with decoded records out. every SDK does the full decode path — frame splitting, CBOR header, typed commit payload, CAR block parsing, and record extraction. 10 + each benchmark does the same work per frame: 11 + 1. decode CBOR frame header 12 + 2. decode CBOR payload (typed commit) 13 + 3. parse CAR from the `blocks` field 14 + 4. decode every CAR block as DAG-CBOR 15 + 16 + block counts, error counts, and variance (min/median/max) are reported so you can verify parity across SDKs. 11 17 12 18 ## results 13 19 14 - _corpus of ~N firehose frames, 5 measured passes over full corpus, macOS arm64_ 20 + _corpus of ~N firehose frames, 5 measured passes, macOS arm64_ 15 21 16 - | SDK | frames/sec | MB/s | 17 - |-----|--------:|-----:| 18 - | zig ([zat](https://tangled.sh/@zzstoatzz.io/zat), arena reuse) | — | — | 19 - | zig (zat, alloc per frame) | — | — | 20 - | rust (jacquard-style) | — | — | 21 - | python ([atproto](https://github.com/MarshalX/atproto)) | — | — | 22 - | go ([indigo](https://github.com/bluesky-social/indigo)) | — | — | 22 + | SDK | frames/sec (median) | MB/s | blocks/frame | errors | 23 + |-----|--------:|-----:|-----:|-----:| 24 + | zig ([zat](https://tangled.sh/@zzstoatzz.io/zat), arena reuse) | — | — | — | — | 25 + | zig (zat, alloc per frame) | — | — | — | — | 26 + | rust (jacquard-style) | — | — | — | — | 27 + | python ([atproto](https://github.com/MarshalX/atproto)) | — | — | — | — | 28 + | go ([indigo](https://github.com/bluesky-social/indigo)) | — | — | — | — | 23 29 24 30 > run `just capture && just bench` to fill in these numbers on your machine. 25 31 26 32 ## what each SDK does 27 33 28 - every SDK takes the same raw binary frame and produces typed output with decoded records: 34 + every SDK takes the same raw binary frame and decodes all the way through to per-block DAG-CBOR: 29 35 30 - | SDK | code path | 31 - |-----|-----------| 32 - | zig | `firehose.decodeFrame(allocator, data)` → typed `CommitEvent` with `ops[]`, decoded records from CAR blocks | 33 - | rust | `ciborium::from_reader` → header, `serde_ipld_dagcbor::from_slice` → typed `Commit`, `iroh_car::CarReader` + `serde_ipld_dagcbor` per block | 34 - | go | `evt.Deserialize(reader)` → typed `RepoCommit` via code-gen CBOR, `car.NewBlockReader` + `cbornode.DecodeInto` per block | 35 - | python | `Frame.from_bytes` + `parse_subscribe_repos_message` → typed `Commit`, `CAR.from_bytes(commit.blocks)` | 36 + | SDK | decode path | 37 + |-----|-------------| 38 + | zig | `cbor.decode` header → `cbor.decodeAll` payload → `car.read` → `cbor.decodeAll` per block | 39 + | rust | `ciborium::from_reader` header → `serde_ipld_dagcbor::from_slice` payload → `iroh_car::CarReader` → `serde_ipld_dagcbor` per block | 40 + | go | `evt.Deserialize` → typed `RepoCommit` via code-gen CBOR → `car.NewBlockReader` → `cbornode.DecodeInto` per block | 41 + | python | `Frame.from_bytes` + `parse_subscribe_repos_message` → `CAR.from_bytes` (libipld decodes all blocks internally) | 36 42 37 43 ## fairness notes 38 44 39 45 - **zig** uses arena allocation (1 malloc/free per frame); rust/go/python use standard per-object allocators. the "alloc per frame" variant is the fair cross-language comparison; "arena reuse" shows the production pattern 40 46 - **zig** returns zero-copy slices into the input buffer for strings and byte data; the other SDKs copy into owned allocations. this is a real architectural advantage, not a benchmark trick 41 - - **go** indigo uses code-generated CBOR unmarshal (no reflection, no serde) — very fast for known schemas 42 - - **python**'s CBOR/CAR work happens in compiled Rust (libipld via PyO3); the Python layer wraps results into typed models 43 - - **rust** pays async overhead (tokio runtime + iroh-car's async `CarReader`) even though the I/O is an in-memory buffer. there's no sync alternative in the library — this overhead applies equally in production 47 + - **go** indigo — bluesky's own production relay — is the slowest despite using code-generated CBOR unmarshal (no reflection). the cost is GC pressure: every string, byte slice, and block is a heap allocation that the garbage collector has to sweep. at ~10 blocks/frame, that's a lot of short-lived objects per decode 48 + - **python** is faster than the rust and go benchmarks despite being "Python" — its hot path is `libipld` (Rust via PyO3), which does the entire CAR parse + per-block DAG-CBOR decode in one synchronous C-extension call. it uses a different (and faster) Rust library than the rust benchmark does 49 + - **rust** pays async overhead (tokio runtime + iroh-car's async `CarReader`) even though the I/O is an in-memory buffer. every `next_block().await` goes through poll/wake per block (~10 blocks/frame). this is why the rust benchmark is slower than python's libipld, which does the same work synchronously. there's no sync alternative in the iroh-car library 50 + - errors are tracked and reported, not swallowed. SDKs continue on per-block decode failures but count them 44 51 45 52 ## corpus format 46 53 ··· 53 60 ... 54 61 ``` 55 62 56 - frames are captured from ~10 seconds of live firehose traffic, pre-filtered to commits with ops. this gives a realistic mix of frame sizes and record types. 63 + frames are captured from ~10 seconds of live firehose traffic, filtered to commits with ops using a minimal CBOR header peek (no SDK-specific decode). this gives a realistic mix of frame sizes and record types without biasing the corpus toward any particular SDK's decoder. 57 64 58 65 ## when this matters 59 66 ··· 68 75 69 76 | lang | SDK | version | CBOR engine | CAR engine | 70 77 |------|-----|---------|-------------|------------| 71 - | zig | [zat](https://tangled.sh/@zzstoatzz.io/zat) | 0.1.7 | hand-rolled | hand-rolled | 78 + | zig | [zat](https://tangled.sh/@zzstoatzz.io/zat) | 0.2.0 | hand-rolled | hand-rolled | 72 79 | rust | jacquard-style | — | [ciborium](https://crates.io/crates/ciborium) (header) + [serde_ipld_dagcbor](https://crates.io/crates/serde_ipld_dagcbor) (body) | [iroh-car](https://crates.io/crates/iroh-car) | 73 80 | go | [indigo](https://github.com/bluesky-social/indigo) | latest | [cbor-gen](https://github.com/whyrusleeping/cbor-gen) (code-generated) | [go-car/v2](https://github.com/ipld/go-car) | 74 81 | python | [atproto](https://github.com/MarshalX/atproto) | 0.0.65 | [libipld](https://github.com/MarshalX/atproto) (Rust via PyO3) | libipld | ··· 83 90 84 91 ## methodology 85 92 86 - - `just capture` connects to the live firehose for ~10 seconds, filters for commits with ops, writes a length-prefixed corpus 87 - - each benchmark calls the SDK's real consumer API on every frame in the corpus (no synthetic shortcuts) 93 + - `just capture` connects to the live firehose for ~10 seconds, filters for commits with ops via minimal CBOR header peek, writes a length-prefixed corpus 94 + - each benchmark decodes every frame fully: header → payload → CAR → decode every block as DAG-CBOR 88 95 - 2 warmup passes, 5 measured passes over the full corpus 89 96 - zig builds with `-Doptimize=ReleaseFast`, rust with `opt-level=3 lto=true` 90 97 - go and python use their standard release toolchains 91 - - reported numbers: frames/sec = total_frames / elapsed, MB/s = total_corpus_bytes / elapsed 98 + - reported numbers: median frames/sec across passes, plus min/max for variance. block counts and error counts verify work parity across SDKs
+70 -25
go/main.go
··· 10 10 "fmt" 11 11 "os" 12 12 "path/filepath" 13 + "sort" 13 14 "time" 14 15 15 16 comatproto "github.com/bluesky-social/indigo/api/atproto" ··· 32 33 maxFrame int 33 34 } 34 35 36 + type decodeResult struct { 37 + blocks int 38 + errors int 39 + } 40 + 41 + type passResult struct { 42 + frames int 43 + blocks int 44 + errors int 45 + elapsed time.Duration 46 + } 47 + 35 48 func main() { 36 49 fmt.Println("\n=== go benchmarks ===") 37 50 fmt.Println() ··· 48 61 49 62 // verify first frame 50 63 if len(corpus.frames) > 0 { 51 - evt, blockCount, err := decodeFull(corpus.frames[0]) 64 + result, err := decodeFull(corpus.frames[0]) 52 65 if err != nil { 53 66 fmt.Printf("first frame verify failed: %v\n", err) 54 67 return 55 68 } 56 - if evt.RepoCommit != nil { 57 - c := evt.RepoCommit 58 - fmt.Printf("first frame: repo=%s ops=%d blocks=%d\n", c.Repo, len(c.Ops), blockCount) 59 - } 69 + fmt.Printf("first frame: blocks=%d errors=%d\n", result.blocks, result.errors) 60 70 } 61 71 fmt.Println() 62 72 ··· 65 75 fmt.Println() 66 76 } 67 77 68 - func decodeFull(data []byte) (*events.XRPCStreamEvent, int, error) { 78 + func decodeFull(data []byte) (*decodeResult, error) { 69 79 var evt events.XRPCStreamEvent 70 80 if err := evt.Deserialize(bytes.NewReader(data)); err != nil { 71 - return nil, 0, err 81 + return nil, err 72 82 } 73 83 74 - blockCount := 0 84 + result := &decodeResult{} 75 85 commit := evt.RepoCommit 76 86 if commit != nil && len(commit.Blocks) > 0 { 77 87 reader, err := car.NewBlockReader(bytes.NewReader([]byte(commit.Blocks))) 78 88 if err != nil { 79 - return &evt, 0, err 89 + return result, err 80 90 } 81 91 82 92 for { ··· 86 96 } 87 97 88 98 var v interface{} 89 - cbornode.DecodeInto(block.RawData(), &v) 90 - blockCount++ 99 + if err := cbornode.DecodeInto(block.RawData(), &v); err != nil { 100 + result.errors++ 101 + continue 102 + } 103 + result.blocks++ 91 104 } 92 105 } 93 106 94 - return &evt, blockCount, nil 107 + return result, nil 95 108 } 96 109 97 110 func benchDecode(corpus *corpusInfo) { ··· 101 114 } 102 115 } 103 116 104 - totalFrames := 0 105 - start := time.Now() 117 + passResults := make([]passResult, measuredPasses) 118 + 106 119 for i := 0; i < measuredPasses; i++ { 120 + var passBlocks, passErrors int 121 + start := time.Now() 107 122 for _, frame := range corpus.frames { 108 - decodeFull(frame) 109 - totalFrames++ 123 + result, _ := decodeFull(frame) 124 + if result != nil { 125 + passBlocks += result.blocks 126 + passErrors += result.errors 127 + } 128 + } 129 + elapsed := time.Since(start) 130 + passResults[i] = passResult{ 131 + frames: len(corpus.frames), 132 + blocks: passBlocks, 133 + errors: passErrors, 134 + elapsed: elapsed, 110 135 } 111 136 } 112 - elapsed := time.Since(start) 113 137 114 - reportCorpusResult("decode", corpus, totalFrames, elapsed) 138 + reportResult("decode", corpus, passResults) 115 139 } 116 140 117 - func reportCorpusResult(name string, corpus *corpusInfo, totalFrames int, elapsed time.Duration) { 118 - elapsedS := elapsed.Seconds() 119 - elapsedMs := elapsedS * 1000.0 120 - framesPerSec := float64(totalFrames) / elapsedS 141 + func reportResult(name string, corpus *corpusInfo, passResults []passResult) { 142 + fpsValues := make([]float64, len(passResults)) 143 + totalFrames := 0 144 + totalBlocks := 0 145 + totalErrors := 0 146 + var totalElapsed float64 147 + 148 + for i, r := range passResults { 149 + elapsedS := r.elapsed.Seconds() 150 + fpsValues[i] = float64(r.frames) / elapsedS 151 + totalFrames += r.frames 152 + totalBlocks += r.blocks 153 + totalErrors += r.errors 154 + totalElapsed += elapsedS 155 + } 156 + 157 + sort.Float64s(fpsValues) 158 + 121 159 totalBytes := float64(corpus.totalBytes) * float64(measuredPasses) 122 - throughputMb := totalBytes / (1024 * 1024) / elapsedS 160 + throughputMb := totalBytes / (1024 * 1024) / totalElapsed 161 + blocksPerFrame := float64(totalBlocks) / float64(totalFrames) 123 162 124 - fmt.Printf("%-14s %10.0f frames/sec %8.1f ms (%.1f MB/s)\n", 125 - name, framesPerSec, elapsedMs, throughputMb) 163 + minFps := fpsValues[0] 164 + medianFps := fpsValues[measuredPasses/2] 165 + maxFps := fpsValues[measuredPasses-1] 166 + 167 + fmt.Printf("%-14s %10.0f frames/sec %8.1f MB/s blocks=%d (%.1f/frame) errors=%d\n", 168 + name, medianFps, throughputMb, totalBlocks, blocksPerFrame, totalErrors) 169 + fmt.Printf("%-14s variance: min=%.0f median=%.0f max=%.0f frames/sec\n", 170 + "", minFps, medianFps, maxFps) 126 171 } 127 172 128 173 func loadCorpus(name string) (*corpusInfo, error) {
+76 -20
python/bench.py
··· 3 3 decodes a corpus of real firehose frames using the atproto SDK's public API: 4 4 Frame.from_bytes + parse_subscribe_repos_message + CAR.from_bytes. 5 5 under the hood: libipld (Rust extension via PyO3) for CBOR/CAR, 6 - Python layer for typed model wrapping. 6 + Python layer wraps results into typed models. 7 + 8 + CAR.from_bytes() already decodes every block as DAG-CBOR (via libipld), 9 + so iterating car.blocks gives us the decoded dicts directly. 7 10 """ 8 11 12 + import statistics 9 13 import struct 10 14 import time 11 15 from dataclasses import dataclass ··· 24 28 total_bytes: int 25 29 min_frame: int 26 30 max_frame: int 31 + 32 + 33 + @dataclass 34 + class DecodeResult: 35 + blocks: int 36 + errors: int 37 + 38 + 39 + @dataclass 40 + class PassResult: 41 + frames: int 42 + blocks: int 43 + errors: int 44 + elapsed_ns: int 27 45 28 46 29 47 def load_corpus(name: str) -> CorpusInfo: ··· 65 83 ) 66 84 67 85 68 - def decode_full(frame_data: bytes): 69 - """Full decode: frame parse + typed commit + CAR block extraction.""" 86 + def decode_full(frame_data: bytes) -> DecodeResult: 87 + """Full decode: frame parse + typed commit + CAR block extraction + per-block decode.""" 70 88 frame = firehose_models.Frame.from_bytes(frame_data) 71 89 msg = parse_subscribe_repos_message(frame) 90 + 91 + blocks = 0 92 + errors = 0 93 + 72 94 if msg.blocks: 73 - CAR.from_bytes(msg.blocks) 74 - return msg 95 + try: 96 + car = CAR.from_bytes(msg.blocks) 97 + # CAR.from_bytes() already decodes each block as DAG-CBOR via libipld. 98 + # iterate to count them — the decode work has already happened. 99 + blocks = len(car.blocks) 100 + except Exception: 101 + errors += 1 102 + 103 + return DecodeResult(blocks=blocks, errors=errors) 75 104 76 105 77 106 def bench_decode(corpus: CorpusInfo) -> None: ··· 80 109 for frame_data in corpus.frames: 81 110 decode_full(frame_data) 82 111 83 - total_frames = 0 84 - start = time.perf_counter_ns() 112 + pass_results: list[PassResult] = [] 113 + 85 114 for _ in range(MEASURED_PASSES): 115 + pass_blocks = 0 116 + pass_errors = 0 117 + start = time.perf_counter_ns() 86 118 for frame_data in corpus.frames: 87 - decode_full(frame_data) 88 - total_frames += 1 89 - elapsed_ns = time.perf_counter_ns() - start 119 + result = decode_full(frame_data) 120 + pass_blocks += result.blocks 121 + pass_errors += result.errors 122 + elapsed_ns = time.perf_counter_ns() - start 123 + pass_results.append( 124 + PassResult( 125 + frames=len(corpus.frames), 126 + blocks=pass_blocks, 127 + errors=pass_errors, 128 + elapsed_ns=elapsed_ns, 129 + ) 130 + ) 90 131 91 - report_corpus_result("decode", corpus, total_frames, elapsed_ns) 132 + report_result("decode", corpus, pass_results) 92 133 93 134 94 - def report_corpus_result( 95 - name: str, corpus: CorpusInfo, total_frames: int, elapsed_ns: int 135 + def report_result( 136 + name: str, corpus: CorpusInfo, pass_results: list[PassResult] 96 137 ) -> None: 97 - elapsed_s = elapsed_ns / 1_000_000_000 98 - elapsed_ms = elapsed_s * 1000 99 - frames_per_sec = total_frames / elapsed_s 138 + fps_values = [r.frames / (r.elapsed_ns / 1_000_000_000) for r in pass_results] 139 + fps_values.sort() 140 + 141 + total_frames = sum(r.frames for r in pass_results) 142 + total_blocks = sum(r.blocks for r in pass_results) 143 + total_errors = sum(r.errors for r in pass_results) 144 + total_elapsed_s = sum(r.elapsed_ns for r in pass_results) / 1_000_000_000 145 + 100 146 total_bytes = corpus.total_bytes * MEASURED_PASSES 101 - throughput_mb = total_bytes / (1024 * 1024) / elapsed_s 147 + throughput_mb = total_bytes / (1024 * 1024) / total_elapsed_s 148 + blocks_per_frame = total_blocks / total_frames 149 + 150 + min_fps = fps_values[0] 151 + median_fps = statistics.median(fps_values) 152 + max_fps = fps_values[-1] 153 + 102 154 print( 103 - f"{name:<14} {frames_per_sec:>10.0f} frames/sec {elapsed_ms:>8.1f} ms ({throughput_mb:.1f} MB/s)" 155 + f"{name:<14} {median_fps:>10.0f} frames/sec {throughput_mb:>8.1f} MB/s" 156 + f" blocks={total_blocks} ({blocks_per_frame:.1f}/frame) errors={total_errors}" 157 + ) 158 + print( 159 + f"{'':14} variance: min={min_fps:.0f} median={median_fps:.0f} max={max_fps:.0f} frames/sec" 104 160 ) 105 161 106 162 ··· 120 176 121 177 # verify first frame 122 178 try: 123 - msg = decode_full(corpus.frames[0]) 124 - print(f"first frame: repo={msg.repo} ops={len(msg.ops)}") 179 + result = decode_full(corpus.frames[0]) 180 + print(f"first frame: blocks={result.blocks} errors={result.errors}") 125 181 print() 126 182 except Exception as e: 127 183 print(f"first frame verify failed: {e}")
+65 -28
rust/src/main.rs
··· 30 30 } 31 31 } 32 32 33 + struct DecodeResult { 34 + blocks: usize, 35 + errors: usize, 36 + } 37 + 38 + struct PassResult { 39 + frames: usize, 40 + blocks: usize, 41 + errors: usize, 42 + elapsed: std::time::Duration, 43 + } 44 + 33 45 #[tokio::main] 34 46 async fn main() { 35 47 println!("\n=== rust benchmarks ===\n"); ··· 59 71 // verify first frame 60 72 if let Some(first) = corpus.frames().next() { 61 73 match decode_full(first).await { 62 - Ok((commit, block_count)) => { 74 + Ok(result) => { 63 75 println!( 64 - "first frame: repo={} ops={} blocks={}", 65 - commit.repo, 66 - commit.ops.len(), 67 - block_count, 76 + "first frame: blocks={} errors={}", 77 + result.blocks, result.errors, 68 78 ); 69 79 } 70 80 Err(e) => println!("first frame verify failed: {e}"), ··· 141 151 } 142 152 143 153 /// full decode: frame header + typed commit + CAR blocks + record decode. 144 - async fn decode_full(data: &[u8]) -> Result<(Commit, usize), Box<dyn std::error::Error>> { 154 + /// counts blocks and errors — does not abort on per-block failures. 155 + async fn decode_full(data: &[u8]) -> Result<DecodeResult, Box<dyn std::error::Error>> { 145 156 let mut reader = TrackingReader::new(Cursor::new(data)); 146 157 let _header: FrameHeader = ciborium::from_reader(&mut reader)?; 147 158 let body = &data[reader.bytes_read..]; 148 159 let commit: Commit = serde_ipld_dagcbor::from_slice(body)?; 149 160 150 - let mut block_count = 0; 161 + let mut blocks = 0usize; 162 + let mut errors = 0usize; 163 + 151 164 if !commit.blocks.is_empty() { 152 165 let mut car_reader = iroh_car::CarReader::new(Cursor::new(&commit.blocks)).await?; 153 166 while let Some((_cid, block_data)) = car_reader.next_block().await? { 154 - let _: Ipld = serde_ipld_dagcbor::from_slice(&block_data)?; 155 - block_count += 1; 167 + match serde_ipld_dagcbor::from_slice::<Ipld>(&block_data) { 168 + Ok(_) => blocks += 1, 169 + Err(_) => errors += 1, 170 + } 156 171 } 157 172 } 158 173 159 - Ok((commit, block_count)) 174 + Ok(DecodeResult { blocks, errors }) 160 175 } 161 176 162 177 // --- benchmark --- ··· 168 183 } 169 184 } 170 185 171 - let mut total_frames: usize = 0; 172 - let start = Instant::now(); 186 + let mut pass_results = Vec::with_capacity(MEASURED_PASSES); 187 + 173 188 for _ in 0..MEASURED_PASSES { 189 + let mut pass_blocks = 0usize; 190 + let mut pass_errors = 0usize; 191 + let start = Instant::now(); 174 192 for frame in corpus.frames() { 175 - decode_full(frame).await?; 176 - total_frames += 1; 193 + let result = decode_full(frame).await?; 194 + pass_blocks += result.blocks; 195 + pass_errors += result.errors; 177 196 } 197 + let elapsed = start.elapsed(); 198 + pass_results.push(PassResult { 199 + frames: corpus.frame_ranges.len(), 200 + blocks: pass_blocks, 201 + errors: pass_errors, 202 + elapsed, 203 + }); 178 204 } 179 - let elapsed = start.elapsed(); 180 205 181 - report_corpus_result("decode", corpus, total_frames, elapsed); 206 + report_result("decode", corpus, &pass_results); 182 207 Ok(()) 183 208 } 184 209 185 210 // --- util --- 186 211 187 - fn report_corpus_result( 188 - name: &str, 189 - corpus: &CorpusInfo, 190 - total_frames: usize, 191 - elapsed: std::time::Duration, 192 - ) { 193 - let elapsed_s = elapsed.as_secs_f64(); 194 - let elapsed_ms = elapsed_s * 1000.0; 195 - let frames_per_sec = total_frames as f64 / elapsed_s; 212 + fn report_result(name: &str, corpus: &CorpusInfo, pass_results: &[PassResult]) { 213 + let mut fps_values: Vec<f64> = pass_results 214 + .iter() 215 + .map(|r| r.frames as f64 / r.elapsed.as_secs_f64()) 216 + .collect(); 217 + fps_values.sort_by(|a, b| a.partial_cmp(b).unwrap()); 218 + 219 + let total_frames: usize = pass_results.iter().map(|r| r.frames).sum(); 220 + let total_blocks: usize = pass_results.iter().map(|r| r.blocks).sum(); 221 + let total_errors: usize = pass_results.iter().map(|r| r.errors).sum(); 222 + let total_elapsed: f64 = pass_results.iter().map(|r| r.elapsed.as_secs_f64()).sum(); 223 + 196 224 let total_bytes = corpus.total_bytes as f64 * MEASURED_PASSES as f64; 197 - let throughput_mb = total_bytes / (1024.0 * 1024.0) / elapsed_s; 225 + let throughput_mb = total_bytes / (1024.0 * 1024.0) / total_elapsed; 226 + let blocks_per_frame = total_blocks as f64 / total_frames as f64; 227 + 228 + let min_fps = fps_values[0]; 229 + let median_fps = fps_values[MEASURED_PASSES / 2]; 230 + let max_fps = fps_values[MEASURED_PASSES - 1]; 198 231 199 232 println!( 200 - "{:<14} {:>10.0} frames/sec {:>8.1} ms ({:.1} MB/s)", 201 - name, frames_per_sec, elapsed_ms, throughput_mb, 233 + "{:<14} {:>10.0} frames/sec {:>8.1} MB/s blocks={} ({:.1}/frame) errors={}", 234 + name, median_fps, throughput_mb, total_blocks, blocks_per_frame, total_errors, 235 + ); 236 + println!( 237 + "{:<14} variance: min={:.0} median={:.0} max={:.0} frames/sec", 238 + "", min_fps, median_fps, max_fps, 202 239 ); 203 240 } 204 241
+143 -36
zig/src/bench.zig
··· 1 1 //! atproto firehose benchmarks using zat 2 2 //! 3 - //! decodes a corpus of real firehose frames using zat's public API: 4 - //! firehose.decodeFrame — the same call a real consumer makes. 5 - //! produces typed CommitEvent with ops and decoded records from CAR blocks. 3 + //! decodes a corpus of real firehose frames. for each frame: 4 + //! 1. cbor decode header 5 + //! 2. cbor decode payload (typed commit) 6 + //! 3. car.read(blocks) → all blocks 7 + //! 4. cbor decode every block 8 + //! 9 + //! reports frames/sec, block count, error count, and variance (min/median/max). 6 10 7 11 const std = @import("std"); 8 12 const zat = @import("zat"); ··· 20 24 max_frame: usize, 21 25 }; 22 26 27 + const PassResult = struct { 28 + frames: usize, 29 + blocks: usize, 30 + errors: usize, 31 + elapsed_ns: u64, 32 + }; 33 + 23 34 pub fn main() !void { 24 35 const allocator = std.heap.c_allocator; 25 36 ··· 30 41 std.debug.print(" frame sizes: {d}..{d} bytes\n", .{ corpus.min_frame, corpus.max_frame }); 31 42 std.debug.print(" passes: {d} warmup, {d} measured\n\n", .{ warmup_passes, measured_passes }); 32 43 33 - // verify first frame decodes correctly 44 + // verify first frame 34 45 { 35 46 var arena = std.heap.ArenaAllocator.init(allocator); 36 47 defer arena.deinit(); 37 - const event = try zat.firehose.decodeFrame(arena.allocator(), corpus.frames[0]); 38 - switch (event) { 39 - .commit => |c| { 40 - std.debug.print("first frame: repo={s} ops={d}\n\n", .{ 41 - c.repo, c.ops.len, 42 - }); 43 - }, 44 - else => { 45 - std.debug.print("first frame: not a commit event\n\n", .{}); 46 - }, 47 - } 48 + const result = decodeFullFrame(arena.allocator(), corpus.frames[0]); 49 + std.debug.print("first frame: blocks={d} errors={d}\n\n", .{ 50 + result.blocks, result.errors, 51 + }); 48 52 } 49 53 50 54 benchDecodeFrame(allocator, corpus) catch |err| { ··· 58 62 std.debug.print("\n", .{}); 59 63 } 60 64 65 + /// full decode of one frame: header → payload → CAR → decode every block. 66 + /// returns block count and error count — does not abort on per-block failures. 67 + fn decodeFullFrame(allocator: Allocator, data: []const u8) struct { blocks: usize, errors: usize } { 68 + const cbor = zat.cbor; 69 + const car = zat.car; 70 + 71 + var blocks: usize = 0; 72 + var errors: usize = 0; 73 + 74 + // 1. decode frame header (first CBOR value) 75 + const header_result = cbor.decode(allocator, data) catch { 76 + return .{ .blocks = 0, .errors = 1 }; 77 + }; 78 + 79 + // 2. decode payload (second CBOR value) 80 + const payload_data = data[header_result.consumed..]; 81 + const payload = cbor.decodeAll(allocator, payload_data) catch { 82 + return .{ .blocks = 0, .errors = 1 }; 83 + }; 84 + 85 + // 3. parse CAR from blocks field 86 + const blocks_bytes = payload.getBytes("blocks") orelse { 87 + return .{ .blocks = 0, .errors = 0 }; 88 + }; 89 + const parsed_car = car.read(allocator, blocks_bytes) catch { 90 + return .{ .blocks = 0, .errors = 1 }; 91 + }; 92 + 93 + // 4. decode every block as DAG-CBOR 94 + for (parsed_car.blocks) |block| { 95 + _ = cbor.decodeAll(allocator, block.data) catch { 96 + errors += 1; 97 + continue; 98 + }; 99 + blocks += 1; 100 + } 101 + 102 + return .{ .blocks = blocks, .errors = errors }; 103 + } 104 + 61 105 /// arena reuse: production allocation pattern — one arena, reset per frame. 62 106 fn benchDecodeFrame(allocator: Allocator, corpus: CorpusInfo) !void { 63 107 var arena = std.heap.ArenaAllocator.init(allocator); ··· 66 110 for (0..warmup_passes) |_| { 67 111 for (corpus.frames) |frame| { 68 112 _ = arena.reset(.retain_capacity); 69 - _ = try zat.firehose.decodeFrame(arena.allocator(), frame); 113 + _ = decodeFullFrame(arena.allocator(), frame); 70 114 } 71 115 } 72 116 73 - var total_frames: usize = 0; 74 - var timer = try std.time.Timer.start(); 75 - for (0..measured_passes) |_| { 117 + var pass_results: [measured_passes]PassResult = undefined; 118 + var total_blocks: usize = 0; 119 + var total_errors: usize = 0; 120 + 121 + for (0..measured_passes) |pass| { 122 + var pass_blocks: usize = 0; 123 + var pass_errors: usize = 0; 124 + var timer = try std.time.Timer.start(); 76 125 for (corpus.frames) |frame| { 77 126 _ = arena.reset(.retain_capacity); 78 - _ = try zat.firehose.decodeFrame(arena.allocator(), frame); 79 - total_frames += 1; 127 + const result = decodeFullFrame(arena.allocator(), frame); 128 + pass_blocks += result.blocks; 129 + pass_errors += result.errors; 80 130 } 131 + const elapsed = timer.read(); 132 + pass_results[pass] = .{ 133 + .frames = corpus.frames.len, 134 + .blocks = pass_blocks, 135 + .errors = pass_errors, 136 + .elapsed_ns = elapsed, 137 + }; 138 + total_blocks += pass_blocks; 139 + total_errors += pass_errors; 81 140 } 82 - reportCorpusResult("decode (reuse)", corpus, total_frames, timer.read()); 141 + 142 + reportResult("decode (reuse)", corpus, &pass_results, total_blocks, total_errors); 83 143 } 84 144 85 145 /// arena per-frame: fair cross-language comparison — fresh alloc/free per frame. ··· 88 148 for (corpus.frames) |frame| { 89 149 var arena = std.heap.ArenaAllocator.init(allocator); 90 150 defer arena.deinit(); 91 - _ = try zat.firehose.decodeFrame(arena.allocator(), frame); 151 + _ = decodeFullFrame(arena.allocator(), frame); 92 152 } 93 153 } 94 154 95 - var total_frames: usize = 0; 96 - var timer = try std.time.Timer.start(); 97 - for (0..measured_passes) |_| { 155 + var pass_results: [measured_passes]PassResult = undefined; 156 + var total_blocks: usize = 0; 157 + var total_errors: usize = 0; 158 + 159 + for (0..measured_passes) |pass| { 160 + var pass_blocks: usize = 0; 161 + var pass_errors: usize = 0; 162 + var timer = try std.time.Timer.start(); 98 163 for (corpus.frames) |frame| { 99 164 var arena = std.heap.ArenaAllocator.init(allocator); 100 165 defer arena.deinit(); 101 - _ = try zat.firehose.decodeFrame(arena.allocator(), frame); 102 - total_frames += 1; 166 + const result = decodeFullFrame(arena.allocator(), frame); 167 + pass_blocks += result.blocks; 168 + pass_errors += result.errors; 103 169 } 170 + const elapsed = timer.read(); 171 + pass_results[pass] = .{ 172 + .frames = corpus.frames.len, 173 + .blocks = pass_blocks, 174 + .errors = pass_errors, 175 + .elapsed_ns = elapsed, 176 + }; 177 + total_blocks += pass_blocks; 178 + total_errors += pass_errors; 104 179 } 105 - reportCorpusResult("decode (alloc)", corpus, total_frames, timer.read()); 180 + 181 + reportResult("decode (alloc)", corpus, &pass_results, total_blocks, total_errors); 106 182 } 107 183 108 - fn reportCorpusResult(name: []const u8, corpus: CorpusInfo, total_frames: usize, elapsed_ns: u64) void { 109 - const elapsed_s = @as(f64, @floatFromInt(elapsed_ns)) / 1_000_000_000.0; 110 - const elapsed_ms = elapsed_s * 1000.0; 111 - const frames_per_sec = @as(f64, @floatFromInt(total_frames)) / elapsed_s; 184 + fn reportResult( 185 + name: []const u8, 186 + corpus: CorpusInfo, 187 + pass_results: []const PassResult, 188 + total_blocks: usize, 189 + total_errors: usize, 190 + ) void { 191 + // compute per-pass fps for variance 192 + var fps_values: [measured_passes]f64 = undefined; 193 + var total_frames: usize = 0; 194 + var total_ns: u64 = 0; 195 + for (pass_results, 0..) |r, i| { 196 + const elapsed_s = @as(f64, @floatFromInt(r.elapsed_ns)) / 1_000_000_000.0; 197 + fps_values[i] = @as(f64, @floatFromInt(r.frames)) / elapsed_s; 198 + total_frames += r.frames; 199 + total_ns += r.elapsed_ns; 200 + } 201 + 202 + // sort for min/median/max 203 + std.mem.sort(f64, &fps_values, {}, std.sort.asc(f64)); 204 + 205 + const min_fps = fps_values[0]; 206 + const max_fps = fps_values[measured_passes - 1]; 207 + const median_fps = fps_values[measured_passes / 2]; 208 + 209 + const elapsed_s = @as(f64, @floatFromInt(total_ns)) / 1_000_000_000.0; 112 210 const total_bytes = @as(f64, @floatFromInt(corpus.total_bytes)) * @as(f64, @floatFromInt(measured_passes)); 113 211 const throughput_mb = total_bytes / (1024.0 * 1024.0) / elapsed_s; 212 + const blocks_per_frame = @as(f64, @floatFromInt(total_blocks)) / @as(f64, @floatFromInt(total_frames)); 114 213 115 - std.debug.print("{s: <14} {d:>10.0} frames/sec {d:>8.1} ms ({d:.1} MB/s)\n", .{ 214 + std.debug.print("{s: <14} {d:>10.0} frames/sec {d:>8.1} MB/s blocks={d} ({d:.1}/frame) errors={d}\n", .{ 116 215 name, 117 - frames_per_sec, 118 - elapsed_ms, 216 + median_fps, 119 217 throughput_mb, 218 + total_blocks, 219 + blocks_per_frame, 220 + total_errors, 221 + }); 222 + std.debug.print("{s: <14} variance: min={d:.0} median={d:.0} max={d:.0} frames/sec\n", .{ 223 + "", 224 + min_fps, 225 + median_fps, 226 + max_fps, 120 227 }); 121 228 } 122 229
+27 -8
zig/src/capture.zig
··· 153 153 return error.ConnectionClosed; 154 154 } 155 155 156 - // pre-filter: only save frames that decode as commits with ops 156 + // minimal header peek: just enough to identify commits with ops. 157 + // no SDK-specific decode — avoids baking zat's behavior into the corpus. 157 158 var arena = std.heap.ArenaAllocator.init(self.allocator); 158 159 defer arena.deinit(); 160 + const alloc = arena.allocator(); 159 161 160 - const event = zat.firehose.decodeFrame(arena.allocator(), data) catch return; 161 - switch (event) { 162 - .commit => |commit| { 163 - if (commit.ops.len == 0) return; 164 - }, 165 - else => return, 166 - } 162 + if (!isCommitWithOps(alloc, data)) return; 167 163 168 164 // dupe and buffer the raw frame 169 165 const duped = try self.allocator.dupe(u8, data); ··· 175 171 176 172 pub fn close(_: *FirehoseCaptureHandler) void {} 177 173 }; 174 + 175 + /// minimal check: is this a commit frame with at least one op? 176 + /// decodes only the CBOR header and payload map keys — no CAR parsing, 177 + /// no record decode, no SDK-specific types. 178 + fn isCommitWithOps(allocator: Allocator, data: []const u8) bool { 179 + const cbor = zat.cbor; 180 + 181 + // decode frame header (first CBOR value): {op: int, t: string} 182 + const header_result = cbor.decode(allocator, data) catch return false; 183 + const header = header_result.value; 184 + 185 + // check t == "#commit" 186 + const t = header.getString("t") orelse return false; 187 + if (!std.mem.eql(u8, t, "#commit")) return false; 188 + 189 + // decode payload (second CBOR value) — just the top-level map 190 + const payload_data = data[header_result.consumed..]; 191 + const payload = cbor.decodeAll(allocator, payload_data) catch return false; 192 + 193 + // check ops field exists and is a non-empty array 194 + const ops = payload.getArray("ops") orelse return false; 195 + return ops.len > 0; 196 + } 178 197 179 198 fn saveFile(path: []const u8, data: []const u8) !void { 180 199 const file = try std.fs.cwd().createFile(path, .{});