Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

restructuring for strict/non-strict walking

phil 6ab51051 78a1ccc4

+214 -185
+17 -27
benches/collections.rs
··· 1 1 extern crate repo_stream; 2 - use repo_stream::{DriverBuilder, Step}; 2 + use repo_stream::DriverBuilder; 3 3 use std::collections::HashSet; 4 4 use std::path::Path; 5 5 ··· 24 24 25 25 let mut seen = HashSet::new(); 26 26 let mut collections = vec![]; 27 - loop { 28 - match mem_car.next_chunk(256).unwrap() { 29 - Step::End(_) => break, 30 - Step::Value(outputs) => { 31 - for output in outputs { 32 - let collection = output.key.split_once('/').unwrap().0.to_string(); 33 - if seen.insert(collection.clone()) { 34 - collections.push(collection); 35 - } 36 - } 27 + while let Some(outputs) = mem_car.next_chunk_strict(256).unwrap() { 28 + for output in outputs { 29 + let collection = output.key.split_once('/').unwrap().0.to_string(); 30 + if seen.insert(collection.clone()) { 31 + collections.push(collection); 37 32 } 38 33 } 39 34 } ··· 55 50 56 51 let mut collections = vec![]; 57 52 loop { 58 - match mem_car.next().unwrap() { 59 - Step::End(_) => break, 60 - Step::Value(output) => { 53 + match mem_car.next_strict().unwrap() { 54 + None => break, 55 + Some(output) => { 61 56 let collection = output.key.split_once('/').unwrap().0.to_string(); 62 57 collections.push(collection.clone()); 63 58 mem_car.seek(&format!("{collection}/{tilde_max}")).unwrap(); ··· 77 72 78 73 let mut seen = HashSet::new(); 79 74 let mut collections = vec![]; 80 - loop { 81 - match mem_car.next_chunk(256).unwrap() { 82 - Step::End(_) => break, 83 - Step::Value(outputs) => { 84 - for output in outputs { 85 - let collection = output.key.split_once('/').unwrap().0.to_string(); 86 - if seen.insert(collection.clone()) { 87 - collections.push(collection); 88 - } 89 - } 75 + while let Some(outputs) = mem_car.next_chunk_strict(256).unwrap() { 76 + for output in outputs { 77 + let collection = output.key.split_once('/').unwrap().0.to_string(); 78 + if seen.insert(collection.clone()) { 79 + collections.push(collection); 90 80 } 91 81 } 92 82 } ··· 104 94 105 95 let mut collections = vec![]; 106 96 loop { 107 - match mem_car.next().unwrap() { 108 - Step::End(_) => break, 109 - Step::Value(output) => { 97 + match mem_car.next_strict().unwrap() { 98 + None => break, 99 + Some(output) => { 110 100 let collection = output.key.split_once('/').unwrap().0.to_string(); 111 101 collections.push(collection.clone()); 112 102 mem_car.seek(&format!("{collection}/{tilde_max}")).unwrap();
+8 -6
benches/huge-car.rs
··· 1 1 extern crate repo_stream; 2 - use repo_stream::{Driver, Step}; 2 + use repo_stream::DriverBuilder; 3 3 use std::path::{Path, PathBuf}; 4 4 5 5 use criterion::{Criterion, criterion_group, criterion_main}; ··· 32 32 let reader = tokio::fs::File::open(filename).await.unwrap(); 33 33 let reader = tokio::io::BufReader::new(reader); 34 34 35 - let mut driver = match Driver::load_car(reader, ser, 1024).await.unwrap() { 36 - Driver::Memory(_, _, mem_driver) => mem_driver, 37 - Driver::Disk(_) => panic!("not doing disk for benchmark"), 38 - }; 35 + let mut driver = DriverBuilder::new() 36 + .with_mem_limit_mb(1024) 37 + .with_block_processor(ser) 38 + .load_car(reader) 39 + .await 40 + .unwrap(); 39 41 40 42 let mut n = 0; 41 - while let Step::Value(pairs) = driver.next_chunk(256).await.unwrap() { 43 + while let Some(pairs) = driver.next_chunk(256).unwrap() { 42 44 n += pairs.len(); 43 45 } 44 46 n
+8 -6
benches/non-huge-cars.rs
··· 1 1 extern crate repo_stream; 2 - use repo_stream::{Driver, Step}; 2 + use repo_stream::DriverBuilder; 3 3 4 4 use criterion::{Criterion, criterion_group, criterion_main}; 5 5 ··· 39 39 } 40 40 41 41 async fn drive_car(bytes: &[u8]) -> usize { 42 - let mut driver = match Driver::load_car(bytes, ser, 32).await.unwrap() { 43 - Driver::Memory(_, _, mem_driver) => mem_driver, 44 - Driver::Disk(_) => panic!("not benching big cars here"), 45 - }; 42 + let mut mem_car = DriverBuilder::new() 43 + .with_mem_limit_mb(32) 44 + .with_block_processor(ser) 45 + .load_car(bytes) 46 + .await 47 + .unwrap(); 46 48 47 49 let mut n = 0; 48 - while let Step::Value(pairs) = driver.next_chunk(256).await.unwrap() { 50 + while let Some(pairs) = mem_car.next_chunk_strict(256).unwrap() { 49 51 n += pairs.len(); 50 52 } 51 53 n
+3 -6
examples/disk-read-file/main.rs
··· 9 9 static GLOBAL: MiMalloc = MiMalloc; 10 10 11 11 use clap::Parser; 12 - use repo_stream::{DiskBuilder, DriverBuilder, LoadError, Step}; 12 + use repo_stream::{DiskBuilder, DriverBuilder, LoadError}; 13 13 use std::path::PathBuf; 14 14 use std::time::Instant; 15 15 ··· 66 66 // this example uses the disk driver's channel mode: the tree walking is 67 67 // spawned onto a blocking thread, and we get chunks of rkey+blocks back 68 68 let (mut rx, join) = driver.to_channel(512); 69 - while let Some(step) = rx.recv().await { 70 - let step = step?; 71 - let Step::Value(outputs) = step else { 72 - break; 73 - }; 69 + while let Some(outputs) = rx.recv().await { 70 + let outputs = outputs?; 74 71 75 72 // keep a count of the total number of blocks seen 76 73 n += outputs.len();
+2 -2
examples/read-file/main.rs
··· 4 4 5 5 extern crate repo_stream; 6 6 use clap::Parser; 7 - use repo_stream::{DriverBuilder, Output, Step}; 7 + use repo_stream::{DriverBuilder, Output}; 8 8 use std::path::PathBuf; 9 9 10 10 type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; ··· 31 31 32 32 log::info!("got commit: {:?}", mem_car.commit); 33 33 34 - while let Step::Value(records) = mem_car.next_chunk(256)? { 34 + while let Some(records) = mem_car.next_chunk_strict(256)? { 35 35 for Output { 36 36 key: _, 37 37 cid: _,
+32 -20
examples/read-slice/main.rs
··· 3 3 */ 4 4 5 5 extern crate repo_stream; 6 - use repo_stream::{DriverBuilder, LoadError, Output, Step}; 6 + use repo_stream::{DriverBuilder, LoadError, Output, WalkItem}; 7 7 8 8 type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; 9 9 ··· 26 26 "\nthis slice is from {}, repo rev {}", 27 27 mem_car.commit.did, mem_car.commit.rev 28 28 ); 29 - if let Some(key) = &mem_car.prev_key { 30 - println!(" -> key immediately before CAR slice: {key}"); 31 - } else { 32 - println!( 33 - " -> no key preceeding the CAR slice, so it includes the leading edge of the tree." 34 - ); 35 - } 36 29 37 30 println!("included records:"); 38 - let end = loop { 39 - match mem_car.next_chunk(256)? { 40 - Step::Value(chunk) => { 41 - for Output { cid, key, .. } in chunk { 31 + 32 + let mut preceding: Option<String> = None; 33 + let mut trailing: Option<String> = None; 34 + let mut after_records = false; 35 + 36 + while let Some(items) = mem_car.next_chunk(256)? { 37 + for item in items { 38 + match item { 39 + WalkItem::Record(Output { cid, key, .. }) => { 40 + after_records = true; 41 + trailing = None; 42 42 print!(" SHA256 "); 43 43 for byte in cid.to_bytes().iter().skip(4).take(5) { 44 44 print!("{byte:02x}"); 45 45 } 46 46 println!("...\t{key}"); 47 47 } 48 + WalkItem::MissingRecord { key, .. } => { 49 + if !after_records { 50 + preceding = Some(key); 51 + } else if trailing.is_none() { 52 + trailing = Some(key); 53 + } 54 + } 55 + WalkItem::MissingSubtree { .. } => {} 48 56 } 49 - Step::End(e) => break e, 50 57 } 51 - }; 58 + } 52 59 53 60 println!("done walking records present in the slice."); 54 - if let Some(key) = end { 55 - println!(" -> key immediately after CAR slice: {key}"); 56 - } else { 57 - println!( 58 - " -> no key proceeding the CAR slice, so it includes the trailing edge of the tree." 59 - ); 61 + match preceding { 62 + Some(key) => println!(" -> key immediately before CAR slice: {key}"), 63 + None => println!( 64 + " -> no key preceding the CAR slice, so it includes the leading edge of the tree." 65 + ), 66 + } 67 + match trailing { 68 + Some(key) => println!(" -> key immediately after CAR slice: {key}"), 69 + None => println!( 70 + " -> no key following the CAR slice, so it includes the trailing edge of the tree." 71 + ), 60 72 } 61 73 62 74 Ok(())
+3 -3
readme.md
··· 11 11 [sponsor-badge]: https://img.shields.io/badge/at-microcosm-b820f9?labelColor=b820f9&logo=githubsponsors&logoColor=fff 12 12 13 13 ```rust no_run 14 - use repo_stream::{DriverBuilder, LoadError, DiskBuilder, Output, Step}; 14 + use repo_stream::{DriverBuilder, LoadError, DiskBuilder, Output}; 15 15 16 16 #[tokio::main] 17 17 async fn main() -> Result<(), Box<dyn std::error::Error>> { ··· 31 31 { 32 32 // if all blocks fit within memory 33 33 Ok(mut mem_car) => { 34 - while let Step::Value(chunk) = mem_car.next_chunk(256)? { 34 + while let Some(chunk) = mem_car.next_chunk_strict(256)? { 35 35 for Output { key: _, cid: _, data } in chunk { 36 36 let size = usize::from_ne_bytes(<[u8; 8]>::try_from(data).unwrap()); 37 37 total_size += size; ··· 46 46 // do the spilling, get back a disk driver 47 47 let (_commit, _prev_key, mut driver) = partial.finish_loading(store).await?; 48 48 49 - while let Step::Value(chunk) = driver.next_chunk(256).await? { 49 + while let Some(chunk) = driver.next_chunk(256).await? { 50 50 for Output { key: _, cid: _, data } in chunk { 51 51 let size = usize::from_ne_bytes(<[u8; 8]>::try_from(data).unwrap()); 52 52 total_size += size;
+25 -22
src/disk.rs
··· 15 15 */ 16 16 17 17 use crate::{ 18 - Bytes, Step, 18 + Bytes, 19 19 mst::ThingKind, 20 20 walk::{MaybeProcessedBlock, MstError, Output, WalkError, WalkItem, Walker}, 21 21 }; ··· 62 62 #[error("Storage error: {0}")] 63 63 StorageError(#[from] DiskError), 64 64 #[error("Unexpected missing block: {0:?}")] 65 - MissingBlock(cid::Cid), 65 + MissingBlock(Box<cid::Cid>), 66 66 #[error("Tried to send on a closed channel")] 67 67 ChannelSendError, 68 68 #[error("Failed to join a task: {0}")] ··· 255 255 } 256 256 257 257 impl DiskDriver { 258 - /// Walk the MST returning up to `n` key + record pairs 258 + /// Walk the MST returning up to `n` key + record pairs. 259 + /// 260 + /// Returns `Ok(Some(outputs))` while records remain, `Ok(None)` when done. 261 + /// Errors if any block is absent (disk path always expects all blocks present). 259 262 /// 260 263 /// ```no_run 261 - /// # use repo_stream::{disk::{DiskDriver, DriveError, _get_fake_disk_driver}, Step}; 264 + /// # use repo_stream::disk::{DiskDriver, DriveError, _get_fake_disk_driver}; 262 265 /// # #[tokio::main] 263 266 /// # async fn main() -> Result<(), DriveError> { 264 267 /// # let mut disk_driver = _get_fake_disk_driver(); 265 - /// while let Step::Value(outputs) = disk_driver.next_chunk(256).await? { 268 + /// while let Some(outputs) = disk_driver.next_chunk(256).await? { 266 269 /// for output in outputs { 267 270 /// println!("{}: size={}", output.key, output.data.len()); 268 271 /// } ··· 270 273 /// # Ok(()) 271 274 /// # } 272 275 /// ``` 273 - pub async fn next_chunk(&mut self, n: usize) -> Result<Step<Vec<Output>>, DriveError> { 276 + pub async fn next_chunk(&mut self, n: usize) -> Result<Option<Vec<Output>>, DriveError> { 274 277 let process = self.process; 275 278 276 279 let mut state = self.state.take().expect("DiskDriver must have Some(state)"); ··· 285 288 Ok(Some(WalkItem::Record(output))) => out.push(output), 286 289 Ok(Some(WalkItem::MissingRecord { cid, .. })) 287 290 | Ok(Some(WalkItem::MissingSubtree { cid })) => { 288 - return (state, Err(DriveError::MissingBlock(cid))); 291 + return (state, Err(DriveError::MissingBlock(Box::new(cid)))); 289 292 } 290 293 Ok(None) => break, 291 294 } ··· 300 303 let out = res?; 301 304 302 305 if out.is_empty() { 303 - Ok(Step::End(None)) 306 + Ok(None) 304 307 } else { 305 - Ok(Step::Value(out)) 308 + Ok(Some(out)) 306 309 } 307 310 } 308 311 309 312 fn read_tx_blocking( 310 313 &mut self, 311 314 n: usize, 312 - tx: mpsc::Sender<Result<Step<Vec<Output>>, DriveError>>, 313 - ) -> Result<(), mpsc::error::SendError<Result<Step<Vec<Output>>, DriveError>>> { 315 + tx: mpsc::Sender<Result<Vec<Output>, DriveError>>, 316 + ) -> Result<(), mpsc::error::SendError<Result<Vec<Output>, DriveError>>> { 314 317 let BigState { store, walker } = self.state.as_mut().expect("valid state"); 315 318 316 319 loop { ··· 322 325 Ok(Some(WalkItem::Record(output))) => out.push(output), 323 326 Ok(Some(WalkItem::MissingRecord { cid, .. })) 324 327 | Ok(Some(WalkItem::MissingSubtree { cid })) => { 325 - return tx.blocking_send(Err(DriveError::MissingBlock(cid))); 328 + return tx.blocking_send(Err(DriveError::MissingBlock(Box::new(cid)))); 326 329 } 327 330 Ok(None) => break, 328 331 } ··· 331 334 if out.is_empty() { 332 335 break; 333 336 } 334 - tx.blocking_send(Ok(Step::Value(out)))?; 337 + tx.blocking_send(Ok(out))?; 335 338 } 336 339 337 340 Ok(()) 338 341 } 339 342 340 - /// Spawn the disk reading task into a tokio blocking thread 343 + /// Spawn the disk reading task into a tokio blocking thread. 344 + /// 345 + /// The channel sends `Ok(chunk)` for each batch of records. When the walk 346 + /// is complete the sender is dropped and `rx.recv()` returns `None`. 341 347 /// 342 348 /// ```no_run 343 - /// # use repo_stream::{disk::{DiskDriver, DriveError, _get_fake_disk_driver}, Step}; 349 + /// # use repo_stream::disk::{DiskDriver, DriveError, _get_fake_disk_driver}; 344 350 /// # #[tokio::main] 345 351 /// # async fn main() -> Result<(), DriveError> { 346 352 /// # let mut disk_driver = _get_fake_disk_driver(); 347 353 /// let (mut rx, join) = disk_driver.to_channel(512); 348 - /// while let Some(recvd) = rx.recv().await { 349 - /// let outputs = recvd?; 350 - /// let Step::Value(outputs) = outputs else { break; }; 351 - /// for output in outputs { 354 + /// while let Some(chunk) = rx.recv().await { 355 + /// for output in chunk? { 352 356 /// println!("{}: size={}", output.key, output.data.len()); 353 357 /// } 354 - /// 355 358 /// } 356 359 /// # Ok(()) 357 360 /// # } ··· 360 363 mut self, 361 364 n: usize, 362 365 ) -> ( 363 - mpsc::Receiver<Result<Step<Vec<Output>>, DriveError>>, 366 + mpsc::Receiver<Result<Vec<Output>, DriveError>>, 364 367 tokio::task::JoinHandle<Self>, 365 368 ) { 366 - let (tx, rx) = mpsc::channel::<Result<Step<Vec<Output>>, DriveError>>(1); 369 + let (tx, rx) = mpsc::channel::<Result<Vec<Output>, DriveError>>(1); 367 370 368 371 let chan_task = tokio::task::spawn_blocking(move || { 369 372 if let Err(mpsc::error::SendError(_)) = self.read_tx_blocking(n, tx) {
+5 -5
src/lib.rs
··· 18 18 `iroh_car` additionally applies a block size limit of `2MiB`. 19 19 20 20 ``` 21 - use repo_stream::{DriverBuilder, Step}; 21 + use repo_stream::DriverBuilder; 22 22 23 23 # #[tokio::main] 24 24 # async fn main() -> Result<(), Box<dyn std::error::Error>> { ··· 31 31 .load_car(reader) 32 32 .await?; 33 33 34 - while let Step::Value(chunk) = mem_car.next_chunk(256)? { 34 + while let Some(chunk) = mem_car.next_chunk_strict(256)? { 35 35 for output in chunk { 36 36 let size = usize::from_ne_bytes(output.data.try_into().unwrap()); 37 37 total_size += size; ··· 45 45 If the CAR is too large for memory, handle the `MemoryLimitReached` error: 46 46 47 47 ```no_run 48 - use repo_stream::{DriverBuilder, LoadError, Step}; 48 + use repo_stream::{DriverBuilder, LoadError}; 49 49 50 50 # #[tokio::main] 51 51 # async fn main() -> Result<(), Box<dyn std::error::Error>> { ··· 56 56 .await 57 57 { 58 58 Ok(mut mem_car) => { 59 - while let Step::Value(chunk) = mem_car.next_chunk(256)? { 59 + while let Some(chunk) = mem_car.next_chunk_strict(256)? { 60 60 // process records 61 61 } 62 62 } ··· 82 82 pub use disk::{DiskBuilder, DiskDriver, DiskError, DiskStore, DriveError}; 83 83 pub use mem::{DriverBuilder, LoadError, MemCar, PartialCar}; 84 84 pub use mst::Commit; 85 - pub use walk::{Output, Step, WalkError, WalkItem, noop}; 85 + pub use walk::{Output, WalkError, WalkItem, noop}; 86 86 87 87 pub type Bytes = Vec<u8>; 88 88
+57 -55
src/mem.rs
··· 1 1 //! Load a CAR file into memory and walk its MST 2 2 3 3 use crate::{ 4 - Bytes, HashMap, RepoPath, Step, 4 + Bytes, HashMap, RepoPath, 5 5 disk::{DiskDriver, DiskError, DiskStore, DriveError, make_disk_driver}, 6 6 mst::{Commit, MstNode, ObjectLink}, 7 7 walk::{MaybeProcessedBlock, Output, WalkError, WalkItem, Walker}, ··· 161 161 blocks: mem_blocks, 162 162 walker: Walker::new(root_node), 163 163 process, 164 - trailing_key: None, 165 164 }) 166 165 } 167 166 ··· 171 170 pub commit: Commit, 172 171 /// For CAR slices: the key of the last record before this slice's leading edge. 173 172 /// `None` if this slice (or full CAR) starts from the leftmost record in the tree. 173 + /// Not set automatically — callers may derive it from leading `MissingRecord` items. 174 174 pub prev_key: Option<RepoPath>, 175 175 pub blocks: HashMap<ObjectLink, MaybeProcessedBlock>, 176 176 walker: Walker, 177 177 process: fn(Bytes) -> Bytes, 178 - /// `None` = no gap encountered yet; `Some(k)` = trailing edge determined. 179 - trailing_key: Option<Option<RepoPath>>, 180 178 } 181 179 182 180 impl MemCar { 183 181 /// Seek forward to the first record at or after `target`. 184 182 /// 185 183 /// Uses the MST structure to skip entire subtrees efficiently. 186 - /// After this returns, the next `next` or `next_chunk` call will start at or after `target`. 184 + /// After this returns, the next call to `next*` will start at or after `target`. 187 185 pub fn seek(&mut self, target: &str) -> Result<(), WalkError> { 188 186 self.walker.seek(target, &self.blocks) 189 187 } 190 188 191 - /// Walk forward past any gaps to determine the trailing edge key. 192 - fn find_trailing_edge(&mut self) -> Result<Option<RepoPath>, WalkError> { 193 - let trailing = loop { 189 + /// Get the next item from the walk. 190 + /// 191 + /// Returns all `WalkItem` variants as-is, including `MissingRecord` and 192 + /// `MissingSubtree` for sparse trees and CAR slices. Returns `Ok(None)` 193 + /// when the walk is complete. 194 + /// 195 + /// TODO: make this an implementation of Iterator 196 + pub fn next(&mut self) -> Result<Option<WalkItem>, WalkError> { 197 + self.walker.step(&self.blocks, self.process) 198 + } 199 + 200 + /// Collect up to `n` walk items. 201 + /// 202 + /// Like `next`, passes through `MissingRecord` and `MissingSubtree` items. 203 + /// Returns `Ok(None)` when the walk is complete. 204 + pub fn next_chunk(&mut self, n: usize) -> Result<Option<Vec<WalkItem>>, WalkError> { 205 + let mut out = Vec::with_capacity(n); 206 + for _ in 0..n { 194 207 match self.walker.step(&self.blocks, self.process)? { 195 - Some(WalkItem::Record(r)) => break Some(r.key), 196 - Some(WalkItem::MissingRecord { key, .. }) => break Some(key), 197 - Some(WalkItem::MissingSubtree { .. }) => continue, 198 - None => break None, 208 + Some(item) => out.push(item), 209 + None => break, 199 210 } 200 - }; 201 - self.trailing_key = Some(trailing.clone()); 202 - Ok(trailing) 211 + } 212 + if out.is_empty() { 213 + Ok(None) 214 + } else { 215 + Ok(Some(out)) 216 + } 203 217 } 204 218 205 - /// Get the next record. 219 + /// Get the next present record, erroring if any block is absent. 206 220 /// 207 - /// Returns `Step::Value(output)` for each record in key order, then 208 - /// `Step::End(None)` at the end of a full CAR, or `Step::End(Some(key))` 209 - /// for CAR slices where `key` is the first key immediately after the slice. 210 - /// 211 - /// TODO: make this an implementation of Iterator 212 - pub fn next(&mut self) -> Result<Step, WalkError> { 213 - if let Some(trailing) = &self.trailing_key { 214 - return Ok(Step::End(trailing.clone())); 215 - } 221 + /// Returns `Ok(None)` when the walk is complete. Returns 222 + /// `Err(WalkError::MissingBlock)` if a record block is absent, or 223 + /// `Err(WalkError::MissingNode)` if an MST node block is absent. 224 + pub fn next_strict(&mut self) -> Result<Option<Output>, WalkError> { 216 225 match self.walker.step(&self.blocks, self.process)? { 217 - Some(WalkItem::Record(out)) => Ok(Step::Value(out)), 218 - Some(WalkItem::MissingRecord { key, .. }) => { 219 - self.trailing_key = Some(Some(key.clone())); 220 - Ok(Step::End(Some(key))) 221 - } 222 - Some(WalkItem::MissingSubtree { .. }) => { 223 - let trailing = self.find_trailing_edge()?; 224 - Ok(Step::End(trailing)) 225 - } 226 - None => { 227 - self.trailing_key = Some(None); 228 - Ok(Step::End(None)) 226 + None => Ok(None), 227 + Some(WalkItem::Record(out)) => Ok(Some(out)), 228 + Some(WalkItem::MissingRecord { key, cid }) => Err(WalkError::MissingBlock { 229 + key, 230 + cid: Box::new(cid), 231 + }), 232 + Some(WalkItem::MissingSubtree { cid }) => { 233 + Err(WalkError::MissingNode { cid: Box::new(cid) }) 229 234 } 230 235 } 231 236 } 232 237 233 - /// Iterate up to `n` records in key order. 238 + /// Collect up to `n` present records, erroring if any block is absent. 234 239 /// 235 - /// Returns `Step::Value(records)` while records remain, then `Step::End(next_key)` 236 - /// where `next_key` is the first key after the slice (for CAR slices), or `None`. 237 - pub fn next_chunk(&mut self, n: usize) -> Result<Step<Vec<Output>>, WalkError> { 238 - if let Some(trailing) = &self.trailing_key { 239 - return Ok(Step::End(trailing.clone())); 240 - } 240 + /// Returns `Ok(None)` when the walk is complete. Returns 241 + /// `Err(WalkError::MissingBlock)` if a record block is absent, or 242 + /// `Err(WalkError::MissingNode)` if an MST node block is absent. 243 + pub fn next_chunk_strict(&mut self, n: usize) -> Result<Option<Vec<Output>>, WalkError> { 241 244 let mut out = Vec::with_capacity(n); 242 245 for _ in 0..n { 243 246 match self.walker.step(&self.blocks, self.process)? { 247 + None => break, 244 248 Some(WalkItem::Record(record)) => out.push(record), 245 - Some(WalkItem::MissingRecord { key, .. }) => { 246 - self.trailing_key = Some(Some(key.clone())); 247 - return Ok(Step::Value(out)); // may be empty 249 + Some(WalkItem::MissingRecord { key, cid }) => { 250 + return Err(WalkError::MissingBlock { 251 + key, 252 + cid: Box::new(cid), 253 + }); 248 254 } 249 - Some(WalkItem::MissingSubtree { .. }) => { 250 - let trailing = self.find_trailing_edge()?; 251 - self.trailing_key = Some(trailing); 252 - return Ok(Step::Value(out)); // may be empty 255 + Some(WalkItem::MissingSubtree { cid }) => { 256 + return Err(WalkError::MissingNode { cid: Box::new(cid) }); 253 257 } 254 - None => break, 255 258 } 256 259 } 257 260 if out.is_empty() { 258 - self.trailing_key = Some(None); 259 - Ok(Step::End(None)) 261 + Ok(None) 260 262 } else { 261 - Ok(Step::Value(out)) 263 + Ok(Some(out)) 262 264 } 263 265 } 264 266 }
+9 -6
src/walk.rs
··· 70 70 MstError(#[from] MstError), 71 71 #[error("storage error: {0}")] 72 72 StorageError(#[from] fjall::Error), 73 + /// Returned by `next_strict`/`next_chunk_strict` when a record block is absent. 74 + #[error("record block absent: key={key:?} cid={cid}")] 75 + MissingBlock { 76 + key: crate::RepoPath, 77 + cid: Box<cid::Cid>, 78 + }, 79 + /// Returned by `next_strict`/`next_chunk_strict` when an MST node block is absent. 80 + #[error("MST node block absent: cid={cid}")] 81 + MissingNode { cid: Box<cid::Cid> }, 73 82 } 74 83 75 84 /// Errors from invalid repo path keys ··· 106 115 pub key: RepoPath, 107 116 pub cid: Cid, 108 117 pub data: T, 109 - } 110 - 111 - #[derive(Debug, PartialEq)] 112 - pub enum Step<T = Output> { 113 - Value(T), 114 - End(Option<RepoPath>), 115 118 } 116 119 117 120 /// Walker: traverser of an atproto MST
+43 -25
tests/car-slices.rs
··· 1 1 extern crate repo_stream; 2 - use repo_stream::{DriverBuilder, LoadError, Output, Step}; 2 + use repo_stream::{DriverBuilder, LoadError, Output, WalkItem}; 3 3 4 - const RECORD_SLICE: &'static [u8] = include_bytes!("../car-samples/slice-one.car"); 5 - const RECORD_NODE_FIRST_KEY: &'static [u8] = 6 - include_bytes!("../car-samples/slice-node-first-key.car"); 7 - const RECORD_NODE_AFTER: &'static [u8] = include_bytes!("../car-samples/slice-node-after.car"); 8 - const RECORD_NODE_ABSENT: &'static [u8] = 9 - include_bytes!("../car-samples/slice-proving-absence.car"); 4 + const RECORD_SLICE: &[u8] = include_bytes!("../car-samples/slice-one.car"); 5 + const RECORD_NODE_FIRST_KEY: &[u8] = include_bytes!("../car-samples/slice-node-first-key.car"); 6 + const RECORD_NODE_AFTER: &[u8] = include_bytes!("../car-samples/slice-node-after.car"); 7 + const RECORD_NODE_ABSENT: &[u8] = include_bytes!("../car-samples/slice-proving-absence.car"); 10 8 9 + /// Walk a CAR slice and assert on: 10 + /// - `expect_preceding`: the last `MissingRecord` key before any present records 11 + /// (i.e. the key just before the slice's window) 12 + /// - `expected_records`: count of present records 13 + /// - `expected_sum`: sum of record sizes (via processor) 14 + /// - `expect_key`: a specific key that must appear among the present records 15 + /// - `expect_trailing`: the first `MissingRecord` key after the last present record 16 + /// (i.e. the key just after the slice's window) 11 17 async fn test_car_slice( 12 18 bytes: &[u8], 13 19 expected_records: usize, 14 20 expected_sum: usize, 15 - expect_preceeding: Option<&str>, 21 + expect_preceding: Option<&str>, 16 22 expect_key: Option<&str>, 17 - expect_proceeding: Option<&str>, 23 + expect_trailing: Option<&str>, 18 24 ) { 19 25 let mut mem_car = match DriverBuilder::new() 20 26 .with_block_processor(|block| block.len().to_ne_bytes().to_vec()) ··· 25 31 Err(LoadError::MemoryLimitReached(_)) => panic!("too big"), 26 32 Err(e) => panic!("{e}"), 27 33 }; 28 - 29 - assert_eq!(mem_car.prev_key.as_deref(), expect_preceeding); 30 34 31 35 let mut found_records = 0; 32 36 let mut sum = 0; 33 37 let mut found_expected_key = false; 34 38 let mut prev_key = "".to_string(); 35 39 36 - loop { 37 - match mem_car.next_chunk(256).unwrap() { 38 - Step::Value(records) => { 39 - for Output { key, cid: _, data } in records { 40 + // The last MissingRecord key seen before the first present record. 41 + let mut preceding: Option<String> = None; 42 + // The first MissingRecord key seen after the last present record. 43 + let mut trailing: Option<String> = None; 44 + let mut after_records = false; 45 + 46 + while let Some(items) = mem_car.next_chunk(256).unwrap() { 47 + for item in items { 48 + match item { 49 + WalkItem::Record(Output { key, cid: _, data }) => { 50 + after_records = true; 51 + trailing = None; // a later MissingRecord replaces this 40 52 found_records += 1; 41 53 42 54 let (int_bytes, _) = data.split_at(size_of::<usize>()); 43 55 let size = usize::from_ne_bytes(int_bytes.try_into().unwrap()); 44 - 45 56 sum += size; 57 + 46 58 if Some(key.as_str()) == expect_key { 47 59 found_expected_key = true; 48 60 } 49 - eprintln!("!!!! {key}"); 50 61 assert!(key > prev_key, "keys are streamed in order"); 51 62 prev_key = key; 52 63 } 53 - } 54 - Step::End(proceeding) => { 55 - assert_eq!(proceeding.as_deref(), expect_proceeding); 56 - break; 64 + WalkItem::MissingRecord { key, .. } => { 65 + if !after_records { 66 + preceding = Some(key); 67 + } else if trailing.is_none() { 68 + trailing = Some(key); 69 + } 70 + } 71 + WalkItem::MissingSubtree { .. } => {} 57 72 } 58 73 } 59 74 } 60 75 61 76 assert_eq!(found_records, expected_records); 77 + assert_eq!(preceding.as_deref(), expect_preceding); 78 + assert_eq!(trailing.as_deref(), expect_trailing); 79 + 62 80 if expected_records > 0 { 63 81 assert!(found_expected_key); 64 82 assert_eq!(sum, expected_sum); 65 - } else { 66 - assert!(!found_expected_key); 67 83 } 68 84 } 69 85 ··· 108 124 109 125 #[tokio::test] 110 126 async fn test_record_slice_proving_absence() { 111 - // missing key is `app.bsky.feed.like/3lohfzs6qea23` 112 - // NOTE: repo-stream output here isn't enough info for proof 127 + // proves `app.bsky.feed.like/3lohfzs6qea23` is absent. 128 + // the included MST nodes contain entries for neighbouring keys whose 129 + // record blocks are not in this CAR — they surface as MissingRecord items. 130 + // no present records; the last MissingRecord key seen is the neighbour. 113 131 test_car_slice( 114 132 RECORD_NODE_ABSENT, 115 133 0,
+2 -2
tests/non-huge-cars.rs
··· 1 1 extern crate repo_stream; 2 - use repo_stream::{DriverBuilder, Output, Step}; 2 + use repo_stream::{DriverBuilder, Output}; 3 3 4 4 const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car"); 5 5 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); ··· 24 24 let mut found_bsky_profile = false; 25 25 let mut prev_key = "".to_string(); 26 26 27 - while let Step::Value(pairs) = mem_car.next_chunk(256).unwrap() { 27 + while let Some(pairs) = mem_car.next_chunk_strict(256).unwrap() { 28 28 for Output { key, cid: _, data } in pairs { 29 29 records += 1; 30 30