Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

walk keys

phil 1d1d4005 0af4e342

+114 -29
+32
src/mem.rs
··· 280 280 self.walker.step_with_nodes(&self.blocks, self.process) 281 281 } 282 282 283 + /// Get the next key and CID from the walk, without fetching record blocks. 284 + /// 285 + /// Record CIDs come directly from MST node entries — record blocks are never 286 + /// looked up. MST node blocks are still fetched to traverse the tree. 287 + /// 288 + /// Returns `Ok(None)` when the walk is complete. Returns 289 + /// `Err(WalkError::MissingNode)` if a child MST node block is absent. 290 + pub fn next_keys(&mut self) -> Result<Option<(RepoPath, Cid)>, WalkError> { 291 + self.walker.step_keys(&self.blocks) 292 + } 293 + 294 + /// Collect up to `n` key+CID pairs, without fetching record blocks. 295 + /// 296 + /// Like [`next_keys`] but collects up to `n` pairs in one call. 297 + /// 298 + /// Returns `Ok(None)` when the walk is complete. Returns 299 + /// `Err(WalkError::MissingNode)` if a child MST node block is absent. 300 + pub fn next_chunk_keys(&mut self, n: usize) -> Result<Option<Vec<(RepoPath, Cid)>>, WalkError> { 301 + let mut out = Vec::with_capacity(n); 302 + for _ in 0..n { 303 + match self.walker.step_keys(&self.blocks)? { 304 + Some(pair) => out.push(pair), 305 + None => break, 306 + } 307 + } 308 + if out.is_empty() { 309 + Ok(None) 310 + } else { 311 + Ok(Some(out)) 312 + } 313 + } 314 + 283 315 /// Collect up to `n` items (records, missing items, and node blocks). 284 316 /// 285 317 /// Like [`next_chunk`] but also includes `WalkItem::Node`. The chunk
+27 -29
src/slice.rs
··· 147 147 return Ok(Some(out)); 148 148 } 149 149 150 - loop { 151 - match self.mem_car.next()? { 152 - None => { 150 + match self.mem_car.next()? { 151 + None => { 152 + self.done = true; 153 + validate_upper(self.following_key.as_deref(), &self.upper)?; 154 + Ok(None) 155 + } 156 + Some(WalkItem::Node { .. }) => unreachable!("step() never emits Node"), 157 + Some(WalkItem::MissingSubtree { cid }) => { 158 + // Any missing subtree after the range starts is an error: 159 + // we can't prove the range is complete without it. 160 + Err(SliceError::MissingNode { cid }) 161 + } 162 + Some(WalkItem::MissingRecord { key, cid }) => { 163 + if is_after(&key, &self.upper) { 164 + self.following_key = Some(key); 153 165 self.done = true; 154 166 validate_upper(self.following_key.as_deref(), &self.upper)?; 155 - return Ok(None); 156 - } 157 - Some(WalkItem::Node { .. }) => unreachable!("step() never emits Node"), 158 - Some(WalkItem::MissingSubtree { cid }) => { 159 - // Any missing subtree after the range starts is an error: 160 - // we can't prove the range is complete without it. 161 - return Err(SliceError::MissingNode { cid }); 162 - } 163 - Some(WalkItem::MissingRecord { key, cid }) => { 164 - if is_after(&key, &self.upper) { 165 - self.following_key = Some(key); 166 - self.done = true; 167 - validate_upper(self.following_key.as_deref(), &self.upper)?; 168 - return Ok(None); 169 - } else { 170 - return Err(SliceError::IncompleteRange { key, cid }); 171 - } 167 + Ok(None) 168 + } else { 169 + Err(SliceError::IncompleteRange { key, cid }) 172 170 } 173 - Some(WalkItem::Record(out)) => { 174 - if is_after(&out.key, &self.upper) { 175 - self.following_key = Some(out.key); 176 - self.done = true; 177 - validate_upper(self.following_key.as_deref(), &self.upper)?; 178 - return Ok(None); 179 - } else { 180 - return Ok(Some(out)); 181 - } 171 + } 172 + Some(WalkItem::Record(out)) => { 173 + if is_after(&out.key, &self.upper) { 174 + self.following_key = Some(out.key); 175 + self.done = true; 176 + validate_upper(self.following_key.as_deref(), &self.upper)?; 177 + Ok(None) 178 + } else { 179 + Ok(Some(out)) 182 180 } 183 181 } 184 182 }
+55
src/walk.rs
··· 302 302 Ok(None) 303 303 } 304 304 305 + /// Like [`step`], but skips record block lookups entirely. 306 + /// 307 + /// Returns the key and CID of each record directly from the MST node entries. 308 + /// MST node blocks are still fetched to traverse the tree structure. 309 + /// 310 + /// Returns `Err(WalkError::MissingNode)` if a child MST node block is absent. 311 + pub fn step_keys( 312 + &mut self, 313 + blocks: &HashMap<ObjectLink, MaybeProcessedBlock>, 314 + ) -> Result<Option<(RepoPath, Cid)>, WalkError> { 315 + while let Some(NodeThing { link, kind }) = self.next_todo() { 316 + match kind { 317 + ThingKind::Record(key) => { 318 + if Some(&key) <= self.prev_key.as_ref() { 319 + return Err(WalkError::MstError(MstError::KeyOutOfOrder { 320 + key, 321 + prev: self.prev_key.clone().unwrap_or("[no prev key]".to_string()), 322 + })); 323 + } 324 + self.prev_key = Some(key.clone()); 325 + return Ok(Some((key, link.into()))); 326 + } 327 + ThingKind::ChildNode => { 328 + let Some(mpb) = blocks.get(&link) else { 329 + return Err(WalkError::MissingNode { 330 + cid: Box::new(link.into()), 331 + }); 332 + }; 333 + let MaybeProcessedBlock::Raw(data) = mpb else { 334 + return Err(WalkError::BadCommitFingerprint); 335 + }; 336 + let node: MstNode = 337 + serde_ipld_dagcbor::from_slice(data).map_err(WalkError::BadCommit)?; 338 + if node.is_empty() { 339 + return Err(WalkError::MstError(MstError::EmptyNode)); 340 + } 341 + let current_layer = self.root_layer - (self.todo.len() - 1) as u32; 342 + let next_layer = current_layer 343 + .checked_sub(1) 344 + .ok_or(MstError::LayerUnderflow)?; 345 + if let Some(d) = node.layer 346 + && d != next_layer 347 + { 348 + return Err(WalkError::MstError(MstError::WrongLayer { 349 + layer: d, 350 + expected: next_layer, 351 + })); 352 + } 353 + self.todo.push(node.things); 354 + } 355 + } 356 + } 357 + Ok(None) 358 + } 359 + 305 360 /// Skip forward to the first record at or after `target`, without emitting anything. 306 361 /// 307 362 /// Uses the tree structure to skip entire subtrees that are provably before `target`,