Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

assert rkey order when driving

small (~3%) perf hit for this, which is fine

phil dc33583e bed46ee7

+18 -15
+7 -5
readme.md
··· 23 23 todo 24 24 25 25 - [x] car file test fixtures & validation tests 26 - - [ ] make sure we can get the did and signature out for verification 26 + - [x] make sure we can get the did and signature out for verification 27 + -> yeah the commit is returned from init 27 28 - [ ] spec compliance todos 28 - - [ ] assert that keys are ordered and fail if not 29 + - [x] assert that keys are ordered and fail if not 29 30 - [ ] verify node mst depth from key (possibly pending [interop test fixes](https://github.com/bluesky-social/atproto-interop-tests/issues/5)) 30 31 - [ ] performance todos 31 - - [ ] consume the serialized nodes into a mutable efficient format 32 + - [x] consume the serialized nodes into a mutable efficient format 32 33 - [ ] maybe customize the deserialize impl to do that directly? 33 34 - [x] benchmark and profile 34 35 - [ ] robustness todos 35 36 - [ ] swap the blocks hashmap for a BlockStore trait that can be dumped to redb 36 37 - [ ] maybe keep the redb function behind a feature flag? 37 38 - [ ] can we assert a max size for node blocks? 38 - - [ ] figure out why asserting the upper nibble of the fourth byte of a node fails fingerprinting 39 + - [x] figure out why asserting the upper nibble of the fourth byte of a node fails fingerprinting 40 + -> because it's the upper 3 bytes, not upper 4 byte nibble, oops. 39 41 - [ ] max mst depth (there is actually a hard limit but a malicious repo could do anything) 40 - - [ ] i don't think we need a max recursion depth for processing cbor contents since we leave records to the user to decode 42 + - [ ] i don't *think* we need a max recursion depth for processing cbor contents since we leave records to the user to decode 41 43 42 44 newer ideas 43 45
+11 -8
src/drive.rs
··· 10 10 pub enum DriveError<E: Error> { 11 11 #[error("Failed to initialize CarReader: {0}")] 12 12 CarReader(#[from] iroh_car::Error), 13 - #[error("CAR file requires a root to be present")] 14 - MissingRoot, 15 13 #[error("Car block stream error: {0}")] 16 14 CarBlockError(Box<dyn Error>), 17 15 #[error("Failed to decode commit block: {0}")] 18 16 BadCommit(Box<dyn Error>), 19 - #[error("Failed to decode record block: {0}")] 20 - BadRecord(Box<dyn Error>), 21 17 #[error("The Commit block reference by the root was not found")] 22 18 MissingCommit, 23 19 #[error("The MST block {0} could not be found")] 24 20 MissingBlock(Cid), 25 21 #[error("Failed to walk the mst tree: {0}")] 26 22 Tripped(#[from] Trip<E>), 27 - #[error("Not finished walking, but no more blocks are available to continue")] 28 - Dnf, 23 + #[error("Encountered an rkey out of order while walking the MST")] 24 + RkeyOutOfOrder, 29 25 } 30 26 31 27 type CarBlock<E> = Result<(Cid, Vec<u8>), E>; ··· 75 71 blocks: HashMap<Cid, MaybeProcessedBlock<T, PE>>, 76 72 walker: Walker, 77 73 process: P, 74 + prev_rkey: String, 78 75 } 79 76 80 77 impl<SE, S, T: Clone, P, PE> Vehicle<SE, S, T, P, PE> ··· 102 99 let c: Commit = serde_ipld_dagcbor::from_slice(&data) 103 100 .map_err(|e| DriveError::BadCommit(e.into()))?; 104 101 commit = Some(c); 105 - break; // inner while 102 + break; 106 103 } else { 107 104 blocks.insert( 108 105 cid, ··· 125 122 blocks, 126 123 walker, 127 124 process, 125 + prev_rkey: "".to_string(), 128 126 }; 129 127 Ok((commit, me)) 130 128 } ··· 159 157 let cid_needed = match self.walker.walk(&mut self.blocks, &self.process)? { 160 158 Step::Rest(cid) => cid, 161 159 Step::Finish => return Ok(None), 162 - Step::Step { rkey, data } => return Ok(Some((Rkey(rkey), data))), 160 + Step::Step { rkey, data } => { 161 + if rkey <= self.prev_rkey { 162 + return Err(DriveError::RkeyOutOfOrder); 163 + } 164 + return Ok(Some((Rkey(rkey), data))); 165 + } 163 166 }; 164 167 165 168 // load blocks until we reach that cid
-2
src/walk.rs
··· 12 12 NodeEmpty, 13 13 #[error("Failed to decode commit block: {0}")] 14 14 BadCommit(Box<dyn std::error::Error>), 15 - #[error("Failed to process record: {0}")] 16 - RecordFailedProcessing(Box<dyn Error>), 17 15 #[error("Action node error: {0}")] 18 16 ActionNode(#[from] ActionNodeError), 19 17 #[error("Process failed: {0}")]