Fast and robust atproto CAR file processing in rust
15
fork

Configure Feed

Select the types of activity you want to include in your feed.

try approaches for walking with blocks

phil 0af4e342 87079eaf

+378 -36
+4
Cargo.toml
··· 53 53 name = "collections" 54 54 harness = false 55 55 56 + [[bench]] 57 + name = "node-counts" 58 + harness = false 59 + 56 60 # [[bench]] 57 61 # name = "leading" 58 62 # harness = false
+226
benches/node-counts.rs
··· 1 + //! Benchmark comparing two key-only walk implementations: 2 + //! 3 + //! - **Approach A (filter)**: call `next_chunk_with_nodes` and discard Node items at the 4 + //! call site. The underlying walker emits and clones node bytes that are immediately 5 + //! thrown away. 6 + //! - **Approach B (separate)**: call `next_chunk` which routes through `Walker::step`, 7 + //! a separate code path that never allocates node bytes. 8 + //! 9 + //! The difference quantifies the cost of `data.clone()` inside `step_with_nodes`. 10 + 11 + extern crate repo_stream; 12 + use repo_stream::{DriverBuilder, WalkItem}; 13 + use std::path::Path; 14 + 15 + use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; 16 + 17 + use mimalloc::MiMalloc; 18 + #[global_allocator] 19 + static GLOBAL: MiMalloc = MiMalloc; 20 + 21 + const EMPTY_CAR: &[u8] = include_bytes!("../car-samples/empty.car"); 22 + const TINY_CAR: &[u8] = include_bytes!("../car-samples/tiny.car"); 23 + const LITTLE_CAR: &[u8] = include_bytes!("../car-samples/little.car"); 24 + const MIDSIZE_CAR: &[u8] = include_bytes!("../car-samples/midsize.car"); 25 + 26 + /// Approach A: key-only walk via filter over the node-inclusive path. 27 + /// Calls `next_chunk_with_nodes`, counts records, discards Node items. 28 + async fn count_records_filter(bytes: &[u8]) -> usize { 29 + let mut mem_car = DriverBuilder::new() 30 + .with_mem_limit_mb(100) 31 + .load_car(bytes) 32 + .await 33 + .unwrap(); 34 + 35 + let mut records = 0usize; 36 + while let Some(items) = mem_car.next_chunk_with_nodes(256).unwrap() { 37 + for item in items { 38 + if matches!(item, WalkItem::Record(_)) { 39 + records += 1; 40 + } 41 + } 42 + } 43 + records 44 + } 45 + 46 + /// Approach B: key-only walk via the separate `next_chunk` path. 47 + /// `Walker::step` never allocates node bytes. 48 + async fn count_records_separate(bytes: &[u8]) -> usize { 49 + let mut mem_car = DriverBuilder::new() 50 + .with_mem_limit_mb(100) 51 + .load_car(bytes) 52 + .await 53 + .unwrap(); 54 + 55 + let mut records = 0usize; 56 + while let Some(chunk) = mem_car.next_chunk_strict(256).unwrap() { 57 + records += chunk.len(); 58 + } 59 + records 60 + } 61 + 62 + /// Walk with nodes: use `next_chunk_with_nodes`, count both records and nodes. 63 + async fn count_records_and_nodes(bytes: &[u8]) -> (usize, usize) { 64 + let mut mem_car = DriverBuilder::new() 65 + .with_mem_limit_mb(100) 66 + .load_car(bytes) 67 + .await 68 + .unwrap(); 69 + 70 + let mut records = 0usize; 71 + let mut nodes = 0usize; 72 + while let Some(items) = mem_car.next_chunk_with_nodes(256).unwrap() { 73 + for item in items { 74 + match item { 75 + WalkItem::Record(_) => records += 1, 76 + WalkItem::Node { .. } => nodes += 1, 77 + _ => {} 78 + } 79 + } 80 + } 81 + (records, nodes) 82 + } 83 + 84 + async fn count_records_filter_file(path: &Path) -> usize { 85 + let reader = tokio::io::BufReader::new(tokio::fs::File::open(path).await.unwrap()); 86 + let mut mem_car = DriverBuilder::new() 87 + .with_mem_limit_mb(1024) 88 + .load_car(reader) 89 + .await 90 + .unwrap(); 91 + 92 + let mut records = 0usize; 93 + while let Some(items) = mem_car.next_chunk_with_nodes(256).unwrap() { 94 + for item in items { 95 + if matches!(item, WalkItem::Record(_)) { 96 + records += 1; 97 + } 98 + } 99 + } 100 + records 101 + } 102 + 103 + async fn count_records_separate_file(path: &Path) -> usize { 104 + let reader = tokio::io::BufReader::new(tokio::fs::File::open(path).await.unwrap()); 105 + let mut mem_car = DriverBuilder::new() 106 + .with_mem_limit_mb(1024) 107 + .load_car(reader) 108 + .await 109 + .unwrap(); 110 + 111 + let mut records = 0usize; 112 + while let Some(chunk) = mem_car.next_chunk_strict(256).unwrap() { 113 + records += chunk.len(); 114 + } 115 + records 116 + } 117 + 118 + async fn count_records_and_nodes_file(path: &Path) -> (usize, usize) { 119 + let reader = tokio::io::BufReader::new(tokio::fs::File::open(path).await.unwrap()); 120 + let mut mem_car = DriverBuilder::new() 121 + .with_mem_limit_mb(1024) 122 + .load_car(reader) 123 + .await 124 + .unwrap(); 125 + 126 + let mut records = 0usize; 127 + let mut nodes = 0usize; 128 + while let Some(items) = mem_car.next_chunk_with_nodes(256).unwrap() { 129 + for item in items { 130 + match item { 131 + WalkItem::Record(_) => records += 1, 132 + WalkItem::Node { .. } => nodes += 1, 133 + _ => {} 134 + } 135 + } 136 + } 137 + (records, nodes) 138 + } 139 + 140 + pub fn criterion_benchmark(c: &mut Criterion) { 141 + let rt = tokio::runtime::Builder::new_multi_thread() 142 + .enable_all() 143 + .build() 144 + .expect("Creating runtime failed"); 145 + 146 + // let cars = [ 147 + // ("empty", EMPTY_CAR), 148 + // ("tiny", TINY_CAR), 149 + // ("little", LITTLE_CAR), 150 + // ("midsize", MIDSIZE_CAR), 151 + // ]; 152 + 153 + // // Sanity-check: both approaches agree on record count. 154 + // for (name, bytes) in cars { 155 + // let a = rt.block_on(count_records_filter(bytes)); 156 + // let b = rt.block_on(count_records_separate(bytes)); 157 + // assert_eq!(a, b, "approaches disagree on record count for {name}"); 158 + // let (records, nodes) = rt.block_on(count_records_and_nodes(bytes)); 159 + // println!("{name}: {records} records, {nodes} nodes"); 160 + // } 161 + 162 + // let mut group = c.benchmark_group("node-counts"); 163 + 164 + // for (name, bytes) in cars { 165 + // group.bench_with_input( 166 + // BenchmarkId::new("records-filter-approach", name), 167 + // bytes, 168 + // |b, bytes| b.to_async(&rt).iter(async || count_records_filter(bytes).await), 169 + // ); 170 + // group.bench_with_input( 171 + // BenchmarkId::new("records-separate-approach", name), 172 + // bytes, 173 + // |b, bytes| b.to_async(&rt).iter(async || count_records_separate(bytes).await), 174 + // ); 175 + // group.bench_with_input( 176 + // BenchmarkId::new("records-and-nodes", name), 177 + // bytes, 178 + // |b, bytes| b.to_async(&rt).iter(async || count_records_and_nodes(bytes).await), 179 + // ); 180 + // } 181 + 182 + // group.finish(); 183 + 184 + if let Ok(huge_car) = std::env::var("HUGE_CAR") { 185 + let path: std::path::PathBuf = huge_car.into(); 186 + 187 + // Sanity-check the huge car too. 188 + let a = rt.block_on(count_records_filter_file(&path)); 189 + let b = rt.block_on(count_records_separate_file(&path)); 190 + assert_eq!(a, b, "approaches disagree on record count for huge-car"); 191 + let (records, nodes) = rt.block_on(count_records_and_nodes_file(&path)); 192 + println!("huge: {records} records, {nodes} nodes"); 193 + 194 + let mut group = c.benchmark_group("node-counts-huge"); 195 + 196 + group.bench_with_input( 197 + BenchmarkId::new("records-filter-approach", "huge"), 198 + &path, 199 + |b, path| { 200 + b.to_async(&rt) 201 + .iter(async || count_records_filter_file(path).await) 202 + }, 203 + ); 204 + group.bench_with_input( 205 + BenchmarkId::new("records-separate-approach", "huge"), 206 + &path, 207 + |b, path| { 208 + b.to_async(&rt) 209 + .iter(async || count_records_separate_file(path).await) 210 + }, 211 + ); 212 + group.bench_with_input( 213 + BenchmarkId::new("records-and-nodes", "huge"), 214 + &path, 215 + |b, path| { 216 + b.to_async(&rt) 217 + .iter(async || count_records_and_nodes_file(path).await) 218 + }, 219 + ); 220 + 221 + group.finish(); 222 + } 223 + } 224 + 225 + criterion_group!(benches, criterion_benchmark); 226 + criterion_main!(benches);
+1 -1
examples/read-slice/main.rs
··· 52 52 trailing = Some(key); 53 53 } 54 54 } 55 - WalkItem::MissingSubtree { .. } => {} 55 + WalkItem::MissingSubtree { .. } | WalkItem::Node { .. } => {} 56 56 } 57 57 } 58 58 }
+6
src/disk.rs
··· 291 291 return (state, Err(DriveError::MissingBlock(Box::new(cid)))); 292 292 } 293 293 Ok(None) => break, 294 + Ok(Some(WalkItem::Node { .. })) => { 295 + unreachable!("disk_step never emits Node items") 296 + } 294 297 } 295 298 } 296 299 ··· 328 331 return tx.blocking_send(Err(DriveError::MissingBlock(Box::new(cid)))); 329 332 } 330 333 Ok(None) => break, 334 + Ok(Some(WalkItem::Node { .. })) => { 335 + unreachable!("disk_step never emits Node items") 336 + } 331 337 } 332 338 } 333 339
+43 -7
src/mem.rs
··· 3 3 use crate::{ 4 4 Bytes, HashMap, RepoPath, 5 5 disk::{DiskDriver, DiskError, DiskStore, DriveError, make_disk_driver}, 6 - mst::{Commit, MstNode, ObjectLink}, 6 + mst::{Commit, ObjectLink}, 7 7 walk::{MaybeProcessedBlock, Output, WalkError, WalkItem, Walker}, 8 8 }; 9 + use cid::Cid; 9 10 use iroh_car::CarReader; 10 11 use std::convert::Infallible; 11 12 use thiserror::Error; ··· 148 149 149 150 let commit = commit.ok_or(LoadError::MissingCommit)?; 150 151 151 - let root_node: MstNode = match mem_blocks 152 + let (root_node, root_bytes) = match mem_blocks 152 153 .get(&commit.data) 153 154 .ok_or(LoadError::MissingCommit)? 154 155 { 155 156 MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?, 156 - MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(bytes)?, 157 + MaybeProcessedBlock::Raw(bytes) => (serde_ipld_dagcbor::from_slice(bytes)?, bytes.clone()), 157 158 }; 159 + let root_cid: Cid = commit.data.clone().into(); 158 160 159 161 Ok(MemCar { 160 162 commit, 161 163 prev_key: None, 162 164 blocks: mem_blocks, 163 - walker: Walker::new(root_node), 165 + walker: Walker::new(root_node, root_cid, root_bytes), 164 166 process, 165 167 }) 166 168 } ··· 233 235 Some(WalkItem::MissingSubtree { cid }) => { 234 236 Err(WalkError::MissingNode { cid: Box::new(cid) }) 235 237 } 238 + Some(WalkItem::Node { .. }) => unreachable!("step() never emits Node items"), 236 239 } 237 240 } 238 241 ··· 256 259 Some(WalkItem::MissingSubtree { cid }) => { 257 260 return Err(WalkError::MissingNode { cid: Box::new(cid) }); 258 261 } 262 + Some(WalkItem::Node { .. }) => unreachable!("step() never emits Node items"), 263 + } 264 + } 265 + if out.is_empty() { 266 + Ok(None) 267 + } else { 268 + Ok(Some(out)) 269 + } 270 + } 271 + 272 + /// Walk the MST emitting records, missing items, **and** MST node blocks. 273 + /// 274 + /// Like [`next`] but also yields `WalkItem::Node` for every node descended 275 + /// into (root first, then children in traversal order). Useful for collecting 276 + /// or counting the raw node blocks alongside records. 277 + /// 278 + /// Note: node bytes are cloned on each descent — see [`Walker::step_with_nodes`]. 279 + pub fn next_with_nodes(&mut self) -> Result<Option<WalkItem>, WalkError> { 280 + self.walker.step_with_nodes(&self.blocks, self.process) 281 + } 282 + 283 + /// Collect up to `n` items (records, missing items, and node blocks). 284 + /// 285 + /// Like [`next_chunk`] but also includes `WalkItem::Node`. The chunk 286 + /// size counts all item types, so a chunk of 256 may contain fewer records 287 + /// than a [`next_chunk`] call of 256. 288 + pub fn next_chunk_with_nodes(&mut self, n: usize) -> Result<Option<Vec<WalkItem>>, WalkError> { 289 + let mut out = Vec::with_capacity(n); 290 + for _ in 0..n { 291 + match self.walker.step_with_nodes(&self.blocks, self.process)? { 292 + Some(item) => out.push(item), 293 + None => break, 259 294 } 260 295 } 261 296 if out.is_empty() { ··· 345 380 .map_err(|e| DriveError::StorageError(DiskError::DbError(e)))? 346 381 .ok_or(DriveError::MissingCommit)?; 347 382 348 - let node: MstNode = match MaybeProcessedBlock::from_bytes(db_bytes.to_vec()) { 383 + let root_cid: Cid = commit.data.clone().into(); 384 + let (node, root_bytes) = match MaybeProcessedBlock::from_bytes(db_bytes.to_vec()) { 349 385 MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?, 350 - MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(&bytes)?, 386 + MaybeProcessedBlock::Raw(bytes) => (serde_ipld_dagcbor::from_slice(&bytes)?, bytes), 351 387 }; 352 - let walker = Walker::new(node); 388 + let walker = Walker::new(node, root_cid, root_bytes); 353 389 354 390 Ok((commit, None, make_disk_driver(store, walker, self.process))) 355 391 }
+30 -26
src/slice.rs
··· 80 80 done = true; 81 81 break; 82 82 } 83 + Some(WalkItem::Node { .. }) => unreachable!("step() never emits Node"), 83 84 Some(WalkItem::MissingSubtree { .. }) => { 84 85 // Boundary subtree entirely before the range — safe to skip. 85 86 } ··· 146 147 return Ok(Some(out)); 147 148 } 148 149 149 - match self.mem_car.next()? { 150 - None => { 151 - self.done = true; 152 - validate_upper(self.following_key.as_deref(), &self.upper)?; 153 - Ok(None) 154 - } 155 - Some(WalkItem::MissingSubtree { cid }) => { 156 - // Any missing subtree after the range starts is an error: 157 - // we can't prove the range is complete without it. 158 - Err(SliceError::MissingNode { cid }) 159 - } 160 - Some(WalkItem::MissingRecord { key, cid }) => { 161 - if is_after(&key, &self.upper) { 162 - self.following_key = Some(key); 150 + loop { 151 + match self.mem_car.next()? { 152 + None => { 163 153 self.done = true; 164 154 validate_upper(self.following_key.as_deref(), &self.upper)?; 165 - Ok(None) 166 - } else { 167 - Err(SliceError::IncompleteRange { key, cid }) 155 + return Ok(None); 156 + } 157 + Some(WalkItem::Node { .. }) => unreachable!("step() never emits Node"), 158 + Some(WalkItem::MissingSubtree { cid }) => { 159 + // Any missing subtree after the range starts is an error: 160 + // we can't prove the range is complete without it. 161 + return Err(SliceError::MissingNode { cid }); 162 + } 163 + Some(WalkItem::MissingRecord { key, cid }) => { 164 + if is_after(&key, &self.upper) { 165 + self.following_key = Some(key); 166 + self.done = true; 167 + validate_upper(self.following_key.as_deref(), &self.upper)?; 168 + return Ok(None); 169 + } else { 170 + return Err(SliceError::IncompleteRange { key, cid }); 171 + } 168 172 } 169 - } 170 - Some(WalkItem::Record(out)) => { 171 - if is_after(&out.key, &self.upper) { 172 - self.following_key = Some(out.key); 173 - self.done = true; 174 - validate_upper(self.following_key.as_deref(), &self.upper)?; 175 - Ok(None) 176 - } else { 177 - Ok(Some(out)) 173 + Some(WalkItem::Record(out)) => { 174 + if is_after(&out.key, &self.upper) { 175 + self.following_key = Some(out.key); 176 + self.done = true; 177 + validate_upper(self.following_key.as_deref(), &self.upper)?; 178 + return Ok(None); 179 + } else { 180 + return Ok(Some(out)); 181 + } 178 182 } 179 183 } 180 184 }
+67 -1
src/walk.rs
··· 101 101 /// An item yielded by `Walker::step`. 102 102 #[derive(Debug, PartialEq)] 103 103 pub enum WalkItem { 104 + /// A raw MST node block (root first, then each child as it is descended into). 105 + Node { cid: Cid, data: Bytes }, 104 106 /// A record with its (processed) block data. 105 107 Record(Output), 106 108 /// A record whose block was absent from the loaded blocks. ··· 125 127 pub(crate) prev_key: Option<RepoPath>, 126 128 pub(crate) root_layer: Layer, 127 129 pub(crate) todo: Vec<Vec<NodeThing>>, 130 + /// The root MST node block, emitted as the first `WalkItem::Node` before any records. 131 + pending_root: Option<(Cid, Bytes)>, 128 132 } 129 133 130 134 impl Walker { 131 - pub fn new(root_node: MstNode) -> Self { 135 + pub fn new(root_node: MstNode, root_cid: Cid, root_bytes: Bytes) -> Self { 132 136 Self { 133 137 prev_key: None, 134 138 root_layer: root_node.layer.unwrap_or(0), // empty root node = empty mst 135 139 todo: vec![root_node.things], 140 + pending_root: Some((root_cid, root_bytes)), 136 141 } 137 142 } 138 143 ··· 212 217 /// Returns `Ok(Some(item))` for each block encountered (record, missing 213 218 /// record, or missing subtree), or `Ok(None)` when traversal is complete. 214 219 /// Only errors on structural MST violations (wrong layer, out-of-order keys). 220 + /// Advance one step through the MST. 221 + /// 222 + /// Returns `Ok(Some(item))` for each block encountered (record, missing 223 + /// record, or missing subtree), or `Ok(None)` when traversal is complete. 224 + /// Only errors on structural MST violations (wrong layer, out-of-order keys). 225 + /// 226 + /// MST node blocks are **not** emitted; use [`step_with_nodes`] for that. 215 227 pub fn step( 216 228 &mut self, 217 229 blocks: &HashMap<ObjectLink, MaybeProcessedBlock>, ··· 231 243 }; 232 244 if let Some(out) = self.mpb_step(thing, mpb, &process)? { 233 245 return Ok(Some(WalkItem::Record(out))); 246 + } 247 + } 248 + Ok(None) 249 + } 250 + 251 + /// Like [`step`], but also emits `WalkItem::Node` for every MST node block 252 + /// that is descended into (root first, then children in traversal order). 253 + /// 254 + /// Node bytes are cloned from the in-memory block map on each descent, so 255 + /// this is measurably more expensive than [`step`] for large trees. 256 + pub fn step_with_nodes( 257 + &mut self, 258 + blocks: &HashMap<ObjectLink, MaybeProcessedBlock>, 259 + process: impl Fn(Bytes) -> Bytes, 260 + ) -> Result<Option<WalkItem>, WalkError> { 261 + // Emit the root MST node block before any records. 262 + if let Some((cid, data)) = self.pending_root.take() { 263 + return Ok(Some(WalkItem::Node { cid, data })); 264 + } 265 + 266 + while let Some(thing) = self.next_todo() { 267 + let Some(mpb) = blocks.get(&thing.link) else { 268 + return Ok(Some(match thing.kind { 269 + ThingKind::Record(key) => WalkItem::MissingRecord { 270 + key, 271 + cid: thing.link.into(), 272 + }, 273 + ThingKind::ChildNode => WalkItem::MissingSubtree { 274 + cid: thing.link.into(), 275 + }, 276 + })); 277 + }; 278 + 279 + // Capture what we need to emit a Node item after mpb_step consumes `thing`. 280 + let child_link = if matches!(thing.kind, ThingKind::ChildNode) { 281 + Some(thing.link.clone()) 282 + } else { 283 + None 284 + }; 285 + 286 + if let Some(out) = self.mpb_step(thing, mpb, &process)? { 287 + return Ok(Some(WalkItem::Record(out))); 288 + } 289 + 290 + // mpb_step returns None only for ChildNode descent; emit the node block. 291 + // This clones the raw bytes — the main cost of step_with_nodes vs step. 292 + if let Some(link) = child_link { 293 + let MaybeProcessedBlock::Raw(data) = mpb else { 294 + unreachable!("mpb_step already errored on Processed ChildNode"); 295 + }; 296 + return Ok(Some(WalkItem::Node { 297 + cid: link.into(), 298 + data: data.clone(), 299 + })); 234 300 } 235 301 } 236 302 Ok(None)
+1 -1
tests/car-slices.rs
··· 68 68 trailing = Some(key); 69 69 } 70 70 } 71 - WalkItem::MissingSubtree { .. } => {} 71 + WalkItem::MissingSubtree { .. } | WalkItem::Node { .. } => {} 72 72 } 73 73 } 74 74 }