Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

wip restructure

phil 356cbc3a de2d74d9

+205 -206
+1 -1
examples/read-file/main.rs
··· 32 32 log::info!("got commit: {:?}", mem_car.commit); 33 33 34 34 while let Step::Value(records) = mem_car.next_chunk(256)? { 35 - for Output { rkey: _, cid: _, data: _ } in records { 35 + for Output { key: _, cid: _, data: _ } in records { 36 36 // process records 37 37 } 38 38 }
+6 -6
examples/read-slice/main.rs
··· 26 26 "\nthis slice is from {}, repo rev {}", 27 27 mem_car.commit.did, mem_car.commit.rev 28 28 ); 29 - if let Some(rkey) = &mem_car.prev_rkey { 30 - println!(" -> key immediately before CAR slice: {rkey}"); 29 + if let Some(key) = &mem_car.prev_key { 30 + println!(" -> key immediately before CAR slice: {key}"); 31 31 } else { 32 32 println!( 33 33 " -> no key preceeding the CAR slice, so it includes the leading edge of the tree." ··· 38 38 let end = loop { 39 39 match mem_car.next_chunk(256)? { 40 40 Step::Value(chunk) => { 41 - for Output { cid, rkey, .. } in chunk { 41 + for Output { cid, key, .. } in chunk { 42 42 print!(" SHA256 "); 43 43 for byte in cid.to_bytes().iter().skip(4).take(5) { 44 44 print!("{byte:02x}"); 45 45 } 46 - println!("...\t{rkey}"); 46 + println!("...\t{key}"); 47 47 } 48 48 } 49 49 Step::End(e) => break e, ··· 51 51 }; 52 52 53 53 println!("done walking records present in the slice."); 54 - if let Some(rkey) = end { 55 - println!(" -> key immediately after CAR slice: {rkey}"); 54 + if let Some(key) = end { 55 + println!(" -> key immediately after CAR slice: {key}"); 56 56 } else { 57 57 println!( 58 58 " -> no key proceeding the CAR slice, so it includes the trailing edge of the tree."
+3 -3
readme.md
··· 32 32 // if all blocks fit within memory 33 33 Ok(mut mem_car) => { 34 34 while let Step::Value(chunk) = mem_car.next_chunk(256)? { 35 - for Output { rkey: _, cid: _, data } in chunk { 35 + for Output { key: _, cid: _, data } in chunk { 36 36 let size = usize::from_ne_bytes(<[u8; 8]>::try_from(data).unwrap()); 37 37 total_size += size; 38 38 } ··· 44 44 // set up a disk store we can spill to 45 45 let store = DiskBuilder::new().open("some/path.db".into()).await?; 46 46 // do the spilling, get back a disk driver 47 - let (_commit, _prev_rkey, mut driver) = partial.finish_loading(store).await?; 47 + let (_commit, _prev_key, mut driver) = partial.finish_loading(store).await?; 48 48 49 49 while let Step::Value(chunk) = driver.next_chunk(256).await? { 50 - for Output { rkey: _, cid: _, data } in chunk { 50 + for Output { key: _, cid: _, data } in chunk { 51 51 let size = usize::from_ne_bytes(<[u8; 8]>::try_from(data).unwrap()); 52 52 total_size += size; 53 53 }
+92 -60
src/drive.rs
··· 1 1 //! Consume a CAR from an AsyncRead, producing an ordered stream of records 2 2 3 - use crate::link::{NodeThing, ObjectLink, ThingKind}; 3 + use crate::link::ObjectLink; 4 4 use crate::{ 5 - Bytes, HashMap, Rkey, Step, 5 + Bytes, HashMap, RepoPath, Step, 6 6 block::{MaybeProcessedBlock, noop}, 7 7 disk::{DiskError, DiskStore}, 8 8 mst::MstNode, 9 - walk::{MstError, Output}, 9 + walk::{MstError, Output, WalkItem}, 10 10 }; 11 11 use cid::Cid; 12 12 use iroh_car::CarReader; ··· 17 17 use crate::walk::{WalkError, Walker}; 18 18 use thiserror::Error; 19 19 20 - /// An in-order chunk of Rkey + CID + (processed) Block 20 + /// An in-order chunk of RepoPath + CID + (processed) Block 21 21 pub type BlockChunk = Vec<Output>; 22 22 23 23 /// Errors that can occur while loading a CAR into memory ··· 167 167 }; 168 168 let mut walker = Walker::new(root_node); 169 169 170 - let prev_rkey = walker.step_to_edge(&mem_blocks)?; 170 + let prev_key = walker.step_to_edge(&mem_blocks)?; 171 171 172 172 Ok(MemCar { 173 173 commit, 174 - prev_rkey, 174 + prev_key, 175 175 blocks: mem_blocks, 176 176 walker, 177 177 process, 178 - next_missing: None, 178 + trailing_key: None, 179 179 }) 180 180 } 181 181 ··· 183 183 #[derive(Debug)] 184 184 pub struct MemCar { 185 185 pub commit: Commit, 186 - /// For CAR slices: the rkey of the last record before this slice's leading edge. 186 + /// For CAR slices: the key of the last record before this slice's leading edge. 187 187 /// `None` if this slice (or full CAR) starts from the leftmost record in the tree. 188 - pub prev_rkey: Option<Rkey>, 188 + pub prev_key: Option<RepoPath>, 189 189 pub blocks: HashMap<ObjectLink, MaybeProcessedBlock>, 190 190 walker: Walker, 191 191 process: fn(Bytes) -> Bytes, 192 - next_missing: Option<NodeThing>, 192 + /// `None` = no gap encountered yet; `Some(k)` = trailing edge determined. 193 + trailing_key: Option<Option<RepoPath>>, 193 194 } 194 195 195 196 impl MemCar { ··· 197 198 /// Seek forward to the first record at or after `target`. 198 199 /// 199 200 /// Uses the MST structure to skip entire subtrees efficiently. 200 - /// After this returns, the next `next_chunk` call will start at or after `target`. 201 + /// After this returns, the next `next` or `next_chunk` call will start at or after `target`. 201 202 pub fn seek(&mut self, target: &str) -> Result<(), WalkError> { 202 203 self.walker.seek(target, &self.blocks) 203 204 } 204 205 205 - /// Get the next record 206 - pub fn next(&mut self) -> Result<Option<Output>, WalkError> { 207 - todo!() 206 + /// Walk forward past any gaps to determine the trailing edge key. 207 + /// 208 + /// The first record key encountered after a gap (whether the record's block 209 + /// is present or missing) is the trailing edge — the first key not covered 210 + /// by this slice. Sets `self.trailing_key` and returns it. 211 + fn find_trailing_edge(&mut self) -> Result<Option<RepoPath>, WalkError> { 212 + let trailing = loop { 213 + match self.walker.step(&self.blocks, self.process)? { 214 + Some(WalkItem::Record(r)) => break Some(r.key), 215 + Some(WalkItem::MissingRecord { key, .. }) => break Some(key), 216 + Some(WalkItem::MissingSubtree { .. }) => continue, 217 + None => break None, 218 + } 219 + }; 220 + self.trailing_key = Some(trailing.clone()); 221 + Ok(trailing) 208 222 } 209 223 210 - /// Iterate up to `n` records in rkey order. 224 + /// Get the next record. 211 225 /// 212 - /// Returns `Step::Value(records)` while records remain, then `Step::End(next_rkey)` 213 - /// where `next_rkey` is the first rkey after the slice (for CAR slices), or `None`. 214 - pub fn next_chunk(&mut self, n: usize) -> Result<Step<BlockChunk>, WalkError> { 215 - if let Some(ref mut missing) = self.next_missing { 216 - while let Step::Value(sparse_out) = 217 - self.walker.step_sparse(&self.blocks, self.process)? 218 - { 219 - if missing.kind == ThingKind::ChildNode { 220 - *missing = NodeThing { 221 - link: sparse_out.cid.into(), 222 - kind: ThingKind::Record(sparse_out.rkey), 223 - }; 224 - } 226 + /// Returns `Step::Value(output)` for each record in key order, then 227 + /// `Step::End(None)` at the end of a full CAR, or `Step::End(Some(key))` 228 + /// for CAR slices where `key` is the first key immediately after the slice. 229 + pub fn next(&mut self) -> Result<Step, WalkError> { 230 + if let Some(trailing) = &self.trailing_key { 231 + return Ok(Step::End(trailing.clone())); 232 + } 233 + match self.walker.step(&self.blocks, self.process)? { 234 + Some(WalkItem::Record(out)) => Ok(Step::Value(out)), 235 + Some(WalkItem::MissingRecord { key, .. }) => { 236 + self.trailing_key = Some(Some(key.clone())); 237 + Ok(Step::End(Some(key))) 225 238 } 226 - return Ok(match &missing.kind { 227 - ThingKind::ChildNode => Step::End(None), 228 - ThingKind::Record(rkey) => Step::End(Some(rkey.clone())), 229 - }); 239 + Some(WalkItem::MissingSubtree { .. }) => { 240 + let trailing = self.find_trailing_edge()?; 241 + Ok(Step::End(trailing)) 242 + } 243 + None => { 244 + self.trailing_key = Some(None); 245 + Ok(Step::End(None)) 246 + } 247 + } 248 + } 249 + 250 + /// Iterate up to `n` records in key order. 251 + /// 252 + /// Returns `Step::Value(records)` while records remain, then `Step::End(next_key)` 253 + /// where `next_key` is the first key after the slice (for CAR slices), or `None`. 254 + pub fn next_chunk(&mut self, n: usize) -> Result<Step<BlockChunk>, WalkError> { 255 + if let Some(trailing) = &self.trailing_key { 256 + return Ok(Step::End(trailing.clone())); 230 257 } 231 258 let mut out = Vec::with_capacity(n); 232 259 for _ in 0..n { 233 - match self.walker.step(&self.blocks, self.process) { 234 - Ok(Step::Value(record)) => out.push(record), 235 - Ok(Step::End(None)) => break, 236 - Ok(Step::End(_)) => unreachable!(), 237 - Err(WalkError::MissingBlock(missing)) => { 238 - self.next_missing = Some(*missing); 260 + match self.walker.step(&self.blocks, self.process)? { 261 + Some(WalkItem::Record(record)) => out.push(record), 262 + Some(WalkItem::MissingRecord { key, .. }) => { 263 + self.trailing_key = Some(Some(key.clone())); 239 264 return Ok(Step::Value(out)); // may be empty 240 265 } 241 - Err(other) => return Err(other), 266 + Some(WalkItem::MissingSubtree { .. }) => { 267 + let trailing = self.find_trailing_edge()?; 268 + self.trailing_key = Some(trailing); 269 + return Ok(Step::Value(out)); // may be empty 270 + } 271 + None => break, 242 272 } 243 273 } 244 274 if out.is_empty() { 275 + self.trailing_key = Some(None); 245 276 Ok(Step::End(None)) 246 277 } else { 247 278 Ok(Step::Value(out)) ··· 268 299 MissingRoot, 269 300 #[error("Storage error")] 270 301 StorageError(#[from] DiskError), 302 + #[error("Unexpected missing block: {0:?}")] 303 + MissingBlock(Cid), 271 304 #[error("Tried to send on a closed channel")] 272 305 ChannelSendError, 273 306 #[error("Failed to join a task: {0}")] ··· 284 317 pub async fn finish_loading( 285 318 mut self, 286 319 mut store: DiskStore, 287 - ) -> Result<(Commit, Option<Rkey>, DiskDriver), DriveError> { 320 + ) -> Result<(Commit, Option<RepoPath>, DiskDriver), DriveError> { 288 321 store = tokio::task::spawn(async move { 289 322 let kvs = self 290 323 .blocks ··· 391 424 } 392 425 393 426 impl DiskDriver { 394 - /// Walk the MST returning up to `n` rkey + record pairs 427 + /// Walk the MST returning up to `n` key + record pairs 395 428 /// 396 429 /// ```no_run 397 430 /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, Step, noop}; ··· 400 433 /// # let mut disk_driver = _get_fake_disk_driver(); 401 434 /// while let Step::Value(outputs) = disk_driver.next_chunk(256).await? { 402 435 /// for output in outputs { 403 - /// println!("{}: size={}", output.rkey, output.data.len()); 436 + /// println!("{}: size={}", output.key, output.data.len()); 404 437 /// } 405 438 /// } 406 439 /// # Ok(()) ··· 416 449 let mut out = Vec::with_capacity(n); 417 450 418 451 for _ in 0..n { 419 - let step = match state.walker.disk_step(&state.store, process) { 420 - Ok(s) => s, 421 - Err(e) => { 422 - return (state, Err(e.into())); 452 + match state.walker.disk_step(&state.store, process) { 453 + Err(e) => return (state, Err(e.into())), 454 + Ok(Some(WalkItem::Record(output))) => out.push(output), 455 + Ok(Some(WalkItem::MissingRecord { cid, .. })) 456 + | Ok(Some(WalkItem::MissingSubtree { cid })) => { 457 + return (state, Err(DriveError::MissingBlock(cid))); 423 458 } 424 - }; 425 - let Step::Value(output) = step else { 426 - break; 427 - }; 428 - out.push(output); 459 + Ok(None) => break, 460 + } 429 461 } 430 462 431 463 (state, Ok::<_, DriveError>(out)) ··· 454 486 let mut out: BlockChunk = Vec::with_capacity(n); 455 487 456 488 for _ in 0..n { 457 - let step = match walker.disk_step(store, self.process) { 458 - Ok(s) => s, 489 + match walker.disk_step(store, self.process) { 459 490 Err(e) => return tx.blocking_send(Err(e.into())), 460 - }; 461 - 462 - let Step::Value(output) = step else { 463 - break; 464 - }; 465 - out.push(output); 491 + Ok(Some(WalkItem::Record(output))) => out.push(output), 492 + Ok(Some(WalkItem::MissingRecord { cid, .. })) 493 + | Ok(Some(WalkItem::MissingSubtree { cid })) => { 494 + return tx.blocking_send(Err(DriveError::MissingBlock(cid))); 495 + } 496 + Ok(None) => break, 497 + } 466 498 } 467 499 468 500 if out.is_empty() { ··· 486 518 /// let outputs = recvd?; 487 519 /// let Step::Value(outputs) = outputs else { break; }; 488 520 /// for output in outputs { 489 - /// println!("{}: size={}", output.rkey, output.data.len()); 521 + /// println!("{}: size={}", output.key, output.data.len()); 490 522 /// } 491 523 /// 492 524 /// }
+3 -3
src/lib.rs
··· 9 9 transformed into a smaller representation to save memory. 10 10 11 11 Once blocks are loaded, the MST is walked and emitted as chunks of 12 - `(rkey, cid, processed_block)` records in left-to-right order. 12 + `(key, cid, processed_block)` records in left-to-right order. 13 13 14 14 Some MST validations are applied: 15 15 - Keys must appear in order ··· 86 86 pub use drive::{DriveError, DriverBuilder, LoadError, MemCar, PartialCar}; 87 87 pub use link::NodeThing; 88 88 pub use mst::Commit; 89 - pub use walk::{Output, Step, WalkError}; 89 + pub use walk::{Output, Step, WalkError, WalkItem}; 90 90 91 91 pub type Bytes = Vec<u8>; 92 92 93 - pub type Rkey = String; 93 + pub type RepoPath = String; 94 94 95 95 #[cfg(feature = "hashbrown")] 96 96 pub(crate) use hashbrown::HashMap;
+1 -1
src/link.rs
··· 39 39 #[derive(Debug, Clone, PartialEq)] 40 40 pub enum ThingKind { 41 41 ChildNode, 42 - Record(crate::Rkey), 42 + Record(crate::RepoPath), 43 43 }
+82 -115
src/walk.rs
··· 2 2 3 3 use crate::link::{NodeThing, ObjectLink, ThingKind}; 4 4 use crate::mst::{Depth, MstNode}; 5 - use crate::{Bytes, HashMap, Rkey, disk::DiskStore, block::MaybeProcessedBlock, noop}; 5 + use crate::{Bytes, HashMap, RepoPath, disk::DiskStore, block::MaybeProcessedBlock, noop}; 6 6 use cid::Cid; 7 7 use std::convert::Infallible; 8 8 ··· 17 17 MstError(#[from] MstError), 18 18 #[error("storage error: {0}")] 19 19 StorageError(#[from] fjall::Error), 20 - #[error("block not found: {0:?}")] 21 - MissingBlock(Box<NodeThing>), 22 20 } 23 21 24 - /// Errors from invalid Rkeys 22 + /// Errors from invalid repo path keys 25 23 #[derive(Debug, PartialEq, thiserror::Error)] 26 24 pub enum MstError { 27 25 #[error("Nodes cannot be empty (except for an entirely empty MST)")] ··· 30 28 WrongDepth { depth: Depth, expected: Depth }, 31 29 #[error("MST depth underflow: depth-0 node with child trees")] 32 30 DepthUnderflow, 33 - #[error("Encountered rkey {rkey:?} which cannot follow the previous: {prev:?}")] 34 - RkeyOutOfOrder { prev: Rkey, rkey: Rkey }, 31 + #[error("Encountered key {key:?} which cannot follow the previous: {prev:?}")] 32 + KeyOutOfOrder { prev: RepoPath, key: RepoPath }, 33 + } 34 + 35 + /// An item yielded by `Walker::step`. 36 + #[derive(Debug, PartialEq)] 37 + pub enum WalkItem { 38 + /// A record with its (processed) block data. 39 + Record(Output), 40 + /// A record whose block was absent from the loaded blocks. 41 + MissingRecord { key: RepoPath, cid: Cid }, 42 + /// A child subtree whose root block was absent; its key range is unknown. 43 + MissingSubtree { cid: Cid }, 35 44 } 36 45 37 46 /// Walker outputs 38 - /// 39 - /// TODO: rename to "Record" or "Entry" or something 40 47 #[derive(Debug, PartialEq)] 41 48 pub struct Output<T = Bytes> { 42 - pub rkey: Rkey, // TODO: aaa it's not really rkey, it's just "key" (or split to collection/rkey??) 49 + pub key: RepoPath, 43 50 pub cid: Cid, 44 51 pub data: T, 45 52 } ··· 47 54 #[derive(Debug, PartialEq)] 48 55 pub enum Step<T = Output> { 49 56 Value(T), 50 - End(Option<Rkey>), 57 + End(Option<RepoPath>), 51 58 } 52 59 53 - // #[derive(Debug, PartialEq)] 54 - // pub struct LowStep { 55 - // pub cid: Cid, 56 - // pub kind: LowKind, 57 - // } 58 - 59 - // #[derive(Debug, PartialEq)] 60 - // pub enum LowKind { 61 - // Node { 62 - // children: Option<Vec<NodeThing>>, 63 - // }, 64 - // Record { 65 - // key: Rkey, 66 - // data: Option<Bytes>, 67 - // }, 68 - // } 69 - 70 60 /// Traverser of an atproto MST 71 61 /// 72 62 /// Walks the tree from left-to-right in depth-first order 73 63 #[derive(Debug, Clone)] 74 64 pub struct Walker { 75 65 links: usize, 76 - prev_rkey: Rkey, 66 + prev_key: RepoPath, 77 67 root_depth: Depth, 78 68 todo: Vec<Vec<NodeThing>>, 79 69 } ··· 82 72 pub fn new(root_node: MstNode) -> Self { 83 73 Self { 84 74 links: 0, 85 - prev_rkey: "".to_string(), 75 + prev_key: "".to_string(), 86 76 root_depth: root_node.depth.unwrap_or(0), // empty root node = empty mst 87 77 todo: vec![root_node.things], 88 78 } ··· 95 85 process: impl Fn(Bytes) -> Bytes, 96 86 ) -> Result<Option<Output>, WalkError> { 97 87 match thing.kind { 98 - ThingKind::Record(rkey) => { 88 + ThingKind::Record(key) => { 99 89 let data = match mpb { 100 90 MaybeProcessedBlock::Raw(data) => process(data.clone()), 101 91 MaybeProcessedBlock::Processed(t) => t.clone(), 102 92 }; 103 93 104 - if rkey <= self.prev_rkey { 105 - return Err(WalkError::MstError(MstError::RkeyOutOfOrder { 106 - rkey, 107 - prev: self.prev_rkey.clone(), 94 + if key <= self.prev_key { 95 + return Err(WalkError::MstError(MstError::KeyOutOfOrder { 96 + key, 97 + prev: self.prev_key.clone(), 108 98 })); 109 99 } 110 - self.prev_rkey = rkey.clone(); 100 + self.prev_key = key.clone(); 111 101 112 - log::trace!("val @ {rkey}"); 102 + log::trace!("val @ {key}"); 113 103 Ok(Some(Output { 114 - rkey, 104 + key, 115 105 cid: thing.link.into(), 116 106 data, 117 107 })) ··· 162 152 None 163 153 } 164 154 165 - /// Advance through nodes until we find a record or can't go further 155 + /// Advance one step through the MST. 156 + /// 157 + /// Returns `Ok(Some(item))` for each block encountered (record, missing 158 + /// record, or missing subtree), or `Ok(None)` when traversal is complete. 159 + /// Only errors on structural MST violations (wrong depth, out-of-order keys). 166 160 pub fn step( 167 161 &mut self, 168 162 blocks: &HashMap<ObjectLink, MaybeProcessedBlock>, 169 163 process: impl Fn(Bytes) -> Bytes, 170 - ) -> Result<Step, WalkError> { 171 - while let Some(NodeThing { link, kind }) = self.next_todo() { 172 - let Some(mpb) = blocks.get(&link) else { 173 - return Err(WalkError::MissingBlock(NodeThing { link, kind }.into())); 164 + ) -> Result<Option<WalkItem>, WalkError> { 165 + while let Some(thing) = self.next_todo() { 166 + let Some(mpb) = blocks.get(&thing.link) else { 167 + return Ok(Some(match thing.kind { 168 + ThingKind::Record(key) => { 169 + WalkItem::MissingRecord { key, cid: thing.link.into() } 170 + } 171 + ThingKind::ChildNode => WalkItem::MissingSubtree { cid: thing.link.into() }, 172 + })); 174 173 }; 175 - if let Some(out) = self.mpb_step(NodeThing { link, kind }, mpb, &process)? { 176 - return Ok(Step::Value(out)); 174 + if let Some(out) = self.mpb_step(thing, mpb, &process)? { 175 + return Ok(Some(WalkItem::Record(out))); 177 176 } 178 177 } 179 178 log::debug!("total links: {}", self.links); 180 - Ok(Step::End(None)) 179 + Ok(None) 181 180 } 182 181 183 - // /// Emit every step including MST nodes 184 - // pub fn step_low( 185 - // &mut self, 186 - // blocks: &HashMap<ObjectLink, MaybeProcessedBlock>, 187 - // process: impl Fn(Bytes) -> Bytes, 188 - // ) -> Result<Option<LowStep>, WalkError> { 189 - // let Some(NodeThing { link, kind }) = self.next_todo() else { 190 - // return Ok(None); 191 - // }; 192 - // let Some(mpb) = blocks.get(&link) else { 193 - 194 - // } 195 - // } 196 - 197 - /// Advance through nodes, allowing for missing records 198 - pub fn step_sparse( 199 - &mut self, 200 - blocks: &HashMap<ObjectLink, MaybeProcessedBlock>, 201 - process: impl Fn(Bytes) -> Bytes, 202 - ) -> Result<Step<Output<Option<Bytes>>>, WalkError> { 203 - while let Some(NodeThing { link, kind }) = self.next_todo() { 204 - let mut dummy = false; 205 - let mpb = match blocks.get(&link) { 206 - Some(mpb) => mpb, 207 - None => { 208 - if let ThingKind::Record(_) = kind { 209 - dummy = true; 210 - &MaybeProcessedBlock::Processed(vec![]) 211 - } else { 212 - continue; 213 - } 214 - } 215 - }; 216 - if let Some(out) = self.mpb_step(NodeThing { link, kind }, mpb, |bytes| { 217 - if dummy { bytes } else { process(bytes) } 218 - })? { 219 - // eprintln!(" ----- {}", out.rkey); 220 - return Ok(Step::Value(Output { 221 - cid: out.cid, 222 - rkey: out.rkey, 223 - data: if dummy { None } else { Some(out.data) }, 224 - })); 225 - } 226 - } 227 - Ok(Step::End(None)) 228 - } 229 - 182 + /// Advance past leading missing blocks to find the first present record. 183 + /// 184 + /// Returns the key of the last missing *record* encountered before the 185 + /// first present record — i.e., the `prev_key` for a CAR slice's leading 186 + /// edge. After this returns, the next `step` call yields the first present 187 + /// record (or `None` if the whole tree is absent). 230 188 pub fn step_to_edge( 231 189 &mut self, 232 190 blocks: &HashMap<ObjectLink, MaybeProcessedBlock>, 233 - ) -> Result<Option<Rkey>, WalkError> { 191 + ) -> Result<Option<RepoPath>, WalkError> { 234 192 let mut ant = self.clone(); 235 - let mut rkey_prev = None; 193 + let mut prev_key = None; 236 194 loop { 237 - match ant.step(blocks, noop) { 238 - Err(WalkError::MissingBlock(thing)) => { 239 - if let ThingKind::Record(rkey) = thing.kind { 240 - rkey_prev = Some(rkey); 241 - } 195 + match ant.step(blocks, noop)? { 196 + Some(WalkItem::Record(_)) => { 197 + // ant went one step too far; self holds the leading-edge position 198 + return Ok(prev_key); 199 + } 200 + Some(WalkItem::MissingRecord { key, .. }) => { 201 + prev_key = Some(key); 242 202 *self = ant; 243 203 ant = self.clone(); 244 204 } 245 - Err(anyother) => return Err(anyother), 246 - Ok(z) => { 247 - log::info!("apparently we are too far at {z:?}"); 248 - return Ok(rkey_prev); // oop real record, mutant went too far 205 + Some(WalkItem::MissingSubtree { .. }) => { 206 + *self = ant; 207 + ant = self.clone(); 249 208 } 209 + None => return Ok(prev_key), 250 210 } 251 211 } 252 212 } ··· 257 217 /// only loading child nodes on the path to `target`. O(depth × branching_factor). 258 218 /// 259 219 /// After this returns `Ok(())`, the next call to `step` will yield the first record 260 - /// at or after `target`, or `Step::End` if no such record exists. 220 + /// at or after `target`, or `None` if no such record exists. 261 221 pub fn seek( 262 222 &mut self, 263 223 target: &str, ··· 267 227 enum SeekStep { 268 228 Done, 269 229 EmptyLevel, 270 - SkipRecord(Rkey), 230 + SkipRecord(RepoPath), 271 231 SkipSubtree, 272 232 Descend, 273 233 } ··· 310 270 } 311 271 SeekStep::SkipRecord(key) => { 312 272 self.todo.last_mut().unwrap().pop(); 313 - self.prev_rkey = key; 273 + self.prev_key = key; 314 274 } 315 275 SeekStep::SkipSubtree => { 316 276 self.todo.last_mut().unwrap().pop(); ··· 320 280 // Note: self.todo borrow released before push below 321 281 322 282 let Some(mpb) = blocks.get(&child.link) else { 323 - return Err(WalkError::MissingBlock(child.into())); 283 + // Missing subtree on the seek path; skip it and continue 284 + // (seek is best-effort for sparse trees) 285 + continue; 324 286 }; 325 287 let MaybeProcessedBlock::Raw(data) = mpb else { 326 288 return Err(WalkError::BadCommitFingerprint); ··· 351 313 } 352 314 } 353 315 354 - /// blocking!!!!!! 316 + /// blocking!!!!! 355 317 pub fn disk_step( 356 318 &mut self, 357 319 blocks: &DiskStore, 358 320 process: impl Fn(Bytes) -> Bytes, 359 - ) -> Result<Step, WalkError> { 360 - while let Some(NodeThing { link, kind }) = self.next_todo() { 361 - let Some(block_slice) = blocks.get(&link.to_bytes())? else { 362 - return Err(WalkError::MissingBlock(NodeThing { link, kind }.into())); 321 + ) -> Result<Option<WalkItem>, WalkError> { 322 + while let Some(thing) = self.next_todo() { 323 + let Some(block_slice) = blocks.get(&thing.link.to_bytes())? else { 324 + return Ok(Some(match thing.kind { 325 + ThingKind::Record(key) => { 326 + WalkItem::MissingRecord { key, cid: thing.link.into() } 327 + } 328 + ThingKind::ChildNode => WalkItem::MissingSubtree { cid: thing.link.into() }, 329 + })); 363 330 }; 364 331 let mpb = MaybeProcessedBlock::from_bytes(block_slice.to_vec()); 365 - if let Some(out) = self.mpb_step(NodeThing { link, kind }, &mpb, &process)? { 366 - return Ok(Step::Value(out)); 332 + if let Some(out) = self.mpb_step(thing, &mpb, &process)? { 333 + return Ok(Some(WalkItem::Record(out))); 367 334 } 368 335 } 369 - Ok(Step::End(None)) 336 + Ok(None) 370 337 } 371 338 }
+12 -12
tests/car-slices.rs
··· 13 13 expected_records: usize, 14 14 expected_sum: usize, 15 15 expect_preceeding: Option<&str>, 16 - expect_rkey: Option<&str>, 16 + expect_key: Option<&str>, 17 17 expect_proceeding: Option<&str>, 18 18 ) { 19 19 let mut mem_car = match DriverBuilder::new() ··· 26 26 Err(e) => panic!("{e}"), 27 27 }; 28 28 29 - assert_eq!(mem_car.prev_rkey.as_deref(), expect_preceeding); 29 + assert_eq!(mem_car.prev_key.as_deref(), expect_preceeding); 30 30 31 31 let mut found_records = 0; 32 32 let mut sum = 0; 33 - let mut found_expected_rkey = false; 34 - let mut prev_rkey = "".to_string(); 33 + let mut found_expected_key = false; 34 + let mut prev_key = "".to_string(); 35 35 36 36 loop { 37 37 match mem_car.next_chunk(256).unwrap() { 38 38 Step::Value(records) => { 39 - for Output { rkey, cid: _, data } in records { 39 + for Output { key, cid: _, data } in records { 40 40 found_records += 1; 41 41 42 42 let (int_bytes, _) = data.split_at(size_of::<usize>()); 43 43 let size = usize::from_ne_bytes(int_bytes.try_into().unwrap()); 44 44 45 45 sum += size; 46 - if Some(rkey.as_str()) == expect_rkey { 47 - found_expected_rkey = true; 46 + if Some(key.as_str()) == expect_key { 47 + found_expected_key = true; 48 48 } 49 - eprintln!("!!!! {rkey}"); 50 - assert!(rkey > prev_rkey, "rkeys are streamed in order"); 51 - prev_rkey = rkey; 49 + eprintln!("!!!! {key}"); 50 + assert!(key > prev_key, "keys are streamed in order"); 51 + prev_key = key; 52 52 } 53 53 } 54 54 Step::End(proceeding) => { ··· 60 60 61 61 assert_eq!(found_records, expected_records); 62 62 if expected_records > 0 { 63 - assert!(found_expected_rkey); 63 + assert!(found_expected_key); 64 64 assert_eq!(sum, expected_sum); 65 65 } else { 66 - assert!(!found_expected_rkey); 66 + assert!(!found_expected_key); 67 67 } 68 68 } 69 69
+5 -5
tests/non-huge-cars.rs
··· 22 22 let mut records = 0; 23 23 let mut sum = 0; 24 24 let mut found_bsky_profile = false; 25 - let mut prev_rkey = "".to_string(); 25 + let mut prev_key = "".to_string(); 26 26 27 27 while let Step::Value(pairs) = mem_car.next_chunk(256).unwrap() { 28 - for Output { rkey, cid: _, data } in pairs { 28 + for Output { key, cid: _, data } in pairs { 29 29 records += 1; 30 30 31 31 let (int_bytes, _) = data.split_at(size_of::<usize>()); 32 32 let size = usize::from_ne_bytes(int_bytes.try_into().unwrap()); 33 33 34 34 sum += size; 35 - if rkey == "app.bsky.actor.profile/self" { 35 + if key == "app.bsky.actor.profile/self" { 36 36 found_bsky_profile = true; 37 37 } 38 - assert!(rkey > prev_rkey, "rkeys are streamed in order"); 39 - prev_rkey = rkey; 38 + assert!(key > prev_key, "keys are streamed in order"); 39 + prev_key = key; 40 40 } 41 41 } 42 42