Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

non-strict keys functions

turns out they are useful after all

phil 5ac78f77 3673c2b3

+73 -12
+1 -1
Cargo.lock
··· 2741 2741 2742 2742 [[package]] 2743 2743 name = "repo-stream" 2744 - version = "0.5.0-alpha.2" 2744 + version = "0.5.0-alpha.3" 2745 2745 dependencies = [ 2746 2746 "async-channel", 2747 2747 "cid",
+1 -1
Cargo.toml
··· 1 1 [package] 2 2 name = "repo-stream" 3 - version = "0.5.0-alpha.2" 3 + version = "0.5.0-alpha.3" 4 4 edition = "2024" 5 5 license = "MIT OR Apache-2.0" 6 6 description = "Fast and robust atproto CAR file processing"
+42 -4
src/mem.rs
··· 285 285 /// Record CIDs come directly from MST node entries — record blocks are never 286 286 /// looked up. MST node blocks are still fetched to traverse the tree. 287 287 /// 288 - /// Returns `Ok(None)` when the walk is complete. Returns 289 - /// `Err(WalkError::MissingNode)` if a child MST node block is absent. 288 + /// **Not strict**: if a child MST node block is absent, the subtree is silently 289 + /// skipped. Use [`next_keys_strict`] to error instead. 290 + /// 291 + /// Returns `Ok(None)` when the walk is complete. 290 292 pub fn next_keys(&mut self) -> Result<Option<(RepoPath, Cid)>, WalkError> { 291 293 self.walker.step_keys(&self.blocks) 292 294 } ··· 295 297 /// 296 298 /// Like [`next_keys`] but collects up to `n` pairs in one call. 297 299 /// 298 - /// Returns `Ok(None)` when the walk is complete. Returns 299 - /// `Err(WalkError::MissingNode)` if a child MST node block is absent. 300 + /// **Not strict**: if a child MST node block is absent, the subtree is silently 301 + /// skipped. Use [`next_chunk_keys_strict`] to error instead. 302 + /// 303 + /// Returns `Ok(None)` when the walk is complete. 300 304 pub fn next_chunk_keys(&mut self, n: usize) -> Result<Option<Vec<(RepoPath, Cid)>>, WalkError> { 301 305 let mut out = Vec::with_capacity(n); 302 306 for _ in 0..n { 303 307 match self.walker.step_keys(&self.blocks)? { 308 + Some(pair) => out.push(pair), 309 + None => break, 310 + } 311 + } 312 + if out.is_empty() { 313 + Ok(None) 314 + } else { 315 + Ok(Some(out)) 316 + } 317 + } 318 + 319 + /// Get the next key and CID from the walk, without fetching record blocks. 320 + /// 321 + /// Like [`next_keys`] but returns `Err(WalkError::MissingNode)` if a child 322 + /// MST node block is absent rather than silently skipping the subtree. 323 + /// 324 + /// Returns `Ok(None)` when the walk is complete. 325 + pub fn next_keys_strict(&mut self) -> Result<Option<(RepoPath, Cid)>, WalkError> { 326 + self.walker.step_keys_strict(&self.blocks) 327 + } 328 + 329 + /// Collect up to `n` key+CID pairs, without fetching record blocks. 330 + /// 331 + /// Like [`next_chunk_keys`] but returns `Err(WalkError::MissingNode)` if a 332 + /// child MST node block is absent rather than silently skipping the subtree. 333 + /// 334 + /// Returns `Ok(None)` when the walk is complete. 335 + pub fn next_chunk_keys_strict( 336 + &mut self, 337 + n: usize, 338 + ) -> Result<Option<Vec<(RepoPath, Cid)>>, WalkError> { 339 + let mut out = Vec::with_capacity(n); 340 + for _ in 0..n { 341 + match self.walker.step_keys_strict(&self.blocks)? { 304 342 Some(pair) => out.push(pair), 305 343 None => break, 306 344 }
+27 -4
src/walk.rs
··· 37 37 } 38 38 } 39 39 40 + #[allow(clippy::len_without_is_empty)] 40 41 pub fn len(&self) -> usize { 41 42 match self { 42 43 MaybeProcessedBlock::Raw(b) | MaybeProcessedBlock::Processed(b) => b.len(), ··· 307 308 /// Returns the key and CID of each record directly from the MST node entries. 308 309 /// MST node blocks are still fetched to traverse the tree structure. 309 310 /// 310 - /// Returns `Err(WalkError::MissingNode)` if a child MST node block is absent. 311 + /// If a child MST node block is absent, the subtree is silently skipped. 312 + /// Use [`step_keys_strict`] to error instead. 311 313 pub fn step_keys( 312 314 &mut self, 313 315 blocks: &HashMap<ObjectLink, MaybeProcessedBlock>, 314 316 ) -> Result<Option<(RepoPath, Cid)>, WalkError> { 317 + self.step_keys_impl(blocks, false) 318 + } 319 + 320 + /// Like [`step_keys`], but returns `Err(WalkError::MissingNode)` if a child 321 + /// MST node block is absent rather than silently skipping the subtree. 322 + pub fn step_keys_strict( 323 + &mut self, 324 + blocks: &HashMap<ObjectLink, MaybeProcessedBlock>, 325 + ) -> Result<Option<(RepoPath, Cid)>, WalkError> { 326 + self.step_keys_impl(blocks, true) 327 + } 328 + 329 + fn step_keys_impl( 330 + &mut self, 331 + blocks: &HashMap<ObjectLink, MaybeProcessedBlock>, 332 + strict: bool, 333 + ) -> Result<Option<(RepoPath, Cid)>, WalkError> { 315 334 while let Some(NodeThing { link, kind }) = self.next_todo() { 316 335 match kind { 317 336 ThingKind::Record(key) => { ··· 326 345 } 327 346 ThingKind::ChildNode => { 328 347 let Some(mpb) = blocks.get(&link) else { 329 - return Err(WalkError::MissingNode { 330 - cid: Box::new(link.into()), 331 - }); 348 + if strict { 349 + return Err(WalkError::MissingNode { 350 + cid: Box::new(link.into()), 351 + }); 352 + } else { 353 + continue; 354 + } 332 355 }; 333 356 let MaybeProcessedBlock::Raw(data) = mpb else { 334 357 return Err(WalkError::BadCommitFingerprint);
+2 -2
tests/non-huge-cars.rs
··· 96 96 assert_eq!(count_keys(MIDSIZE_CAR).await, 11585); 97 97 } 98 98 99 - /// Verify that next_chunk_keys returns the same (key, cid) pairs as next_chunk_strict. 99 + /// Verify that next_chunk_keys_strict returns the same (key, cid) pairs as next_chunk_strict. 100 100 #[tokio::test] 101 101 async fn test_next_chunk_keys_agrees_with_strict() { 102 102 let mut mc_strict = DriverBuilder::new() ··· 118 118 } 119 119 120 120 let mut from_keys = Vec::new(); 121 - while let Some(pairs) = mc_keys.next_chunk_keys(256).unwrap() { 121 + while let Some(pairs) = mc_keys.next_chunk_keys_strict(256).unwrap() { 122 122 from_keys.extend(pairs); 123 123 } 124 124