Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

go faster

don't interleave block loading and mst walking -- with current CARs there's no actual early blocks we can use, and it's slowwwwwww as it was written

hack in a "processing" hard-coding (save the record length): this does reduce memory (~half on huge repo) and speed things up (~2x on huge repo). obviously this is an extreme case of processing, most stuff will benefit less

file reader example: use buffering on the reader

all in this takes the huge repo from ~17s -> 0.45s on my dev machine.

phil 6944151f 3578d7be

+95 -27
+4
Cargo.toml
··· 21 21 env_logger = "0.11.8" 22 22 multibase = "0.9.2" 23 23 tokio = { version = "1.47.1", features = ["full"] } 24 + 25 + [profile.profiling] 26 + inherits = "release" 27 + debug = true
+2 -1
examples/read-file/main.rs
··· 18 18 19 19 let Args { file } = Args::parse(); 20 20 let reader = tokio::fs::File::open(file).await?; 21 + let reader = tokio::io::BufReader::new(reader); 21 22 22 23 println!("hello!"); 23 24 ··· 40 41 log::info!("got commit: {commit:?}"); 41 42 42 43 while let Some((rkey, rec)) = record_stream.try_next().await? { 43 - log::info!("got {rkey:?} {}", rec.len()); 44 + log::info!("got {rkey:?} {rec:?}"); 44 45 } 45 46 log::info!("bye!"); 46 47
+47 -21
src/drive.rs
··· 3 3 use std::collections::HashMap; 4 4 use std::error::Error; 5 5 6 - use crate::mst::Commit; 6 + use crate::mst::{Commit, Node}; 7 7 use crate::walk::{Step, Trip, Walker}; 8 8 9 9 #[derive(Debug, thiserror::Error)] ··· 31 31 #[derive(Debug)] 32 32 pub struct Rkey(pub String); 33 33 34 + #[derive(Debug)] 35 + pub enum MaybeProcessedBlock<T> { 36 + /// A block that's *probably* a Node (but we can't know yet) 37 + /// 38 + /// It *can be* a record that suspiciously looks a lot like a node, so we 39 + /// cannot eagerly turn it into a Node. We only know for sure what it is 40 + /// when we actually walk down the MST 41 + Raw(Vec<u8>), 42 + /// A processed record from a block that was definitely not a Node 43 + /// 44 + /// Processing has to be fallible because the CAR can have totally-unused 45 + /// blocks, which can just be garbage. since we're eagerly trying to process 46 + /// record blocks without knowing for sure that they *are* records, we 47 + /// discard any definitely-not-nodes that fail processing and keep their 48 + /// error in the buffer for them. if we later try to retreive them as a 49 + /// record, then we can surface the error. 50 + /// 51 + /// If we _never_ needed this block, then we may have wasted a bit of effort 52 + /// trying to process it. Oh well. 53 + /// 54 + /// It would be nice to store the real error type from the processing 55 + /// function, but I'm leaving that generics puzzle for later. 56 + /// 57 + /// There's an alternative here, which would be to kick unprocessable blocks 58 + /// back to Raw, or maybe even a new RawUnprocessable variant. Then we could 59 + /// surface the typed error later if needed by trying to reprocess. 60 + Processed(Result<T, Box<dyn Error>>), 61 + } 62 + 34 63 pub struct Vehicle<E, S: Stream<Item = CarBlock<E>>> { 35 64 block_stream: S, 36 - blocks: HashMap<Cid, Vec<u8>>, 65 + blocks: HashMap<Cid, MaybeProcessedBlock<usize>>, 37 66 walker: Walker, 38 67 walked_out: bool, 39 68 } ··· 54 83 commit = Some(c); 55 84 break; // inner while 56 85 } 57 - blocks.insert(block_cid, data); 86 + // lazy: before the commit just stash raw blocks 87 + // TODO: eh??? 88 + blocks.insert(block_cid, MaybeProcessedBlock::Raw(data)); 58 89 } 59 90 60 91 // we either broke out or read all the blocks without finding the commit... ··· 71 102 Ok((commit, me)) 72 103 } 73 104 74 - pub async fn next_record(&mut self) -> Result<Option<(Rkey, Vec<u8>)>, DriveError> { 105 + pub async fn next_record(&mut self) -> Result<Option<(Rkey, usize)>, DriveError> { 75 106 drive_ahead(self).await 76 107 } 77 108 78 - pub fn stream(self) -> impl Stream<Item = Result<(Rkey, Vec<u8>), DriveError>> { 109 + pub fn stream(self) -> impl Stream<Item = Result<(Rkey, usize), DriveError>> { 79 110 futures::stream::try_unfold(self, |mut this| async move { 80 111 let maybe_record = drive_ahead(&mut this).await?; 81 112 Ok(maybe_record.map(|b| (b, this))) ··· 85 116 86 117 async fn drive_ahead<E: Error + 'static, S: Stream<Item = CarBlock<E>> + Unpin>( 87 118 vehicle: &mut Vehicle<E, S>, 88 - ) -> Result<Option<(Rkey, Vec<u8>)>, DriveError> { 119 + ) -> Result<Option<(Rkey, usize)>, DriveError> { 89 120 // trying smth: load all blocks first 90 121 if !vehicle.walked_out { 91 122 // stopped at a rest, try to load more blocks first ··· 93 124 .block_stream 94 125 .try_next() 95 126 .await 96 - .map_err(|e| DriveError::CarBlockError(e.into()))? { 97 - vehicle.blocks.insert(cid, data); 127 + .map_err(|e| DriveError::CarBlockError(e.into()))? 128 + { 129 + let val = if Node::could_be(&data) { 130 + MaybeProcessedBlock::Raw(data) 131 + } else { 132 + MaybeProcessedBlock::Processed(Ok(data.len())) 133 + }; 134 + vehicle.blocks.insert(cid, val); 98 135 }; 99 136 vehicle.walked_out = true; 137 + // pause to let macos activity monitor's memory stat update, definitely the best way to do this 138 + // tokio::time::sleep(std::time::Duration::from_secs(30)).await; 100 139 } 101 140 loop { 102 - // if vehicle.walked_out { 103 - // // stopped at a rest, try to load more blocks first 104 - // let Some((cid, data)) = vehicle 105 - // .block_stream 106 - // .try_next() 107 - // .await 108 - // .map_err(|e| DriveError::CarBlockError(e.into()))? 109 - // else { 110 - // return Err(DriveError::Dnf); 111 - // }; 112 - // vehicle.blocks.insert(cid, data); 113 - // vehicle.walked_out = false; 114 - // } 115 141 116 142 // walk as far as we can until we run out of blocks or find a record 117 143 match vehicle.walker.walk(&mut vehicle.blocks)? {
+25
src/mst.rs
··· 52 52 } 53 53 54 54 impl Node { 55 + /// test if a block could possibly be a node 56 + /// 57 + /// we can't eagerly decode records except where we're *sure* they cannot be 58 + /// an mst node (and even then we can only attempt) because you can't know 59 + /// with certainty what a block is supposed to be without actually walking 60 + /// the tree. 61 + /// 62 + /// so if a block *could be* a node, any record converter must postpone 63 + /// processing. if it turns out it happens to be a very node-looking record, 64 + /// well, sorry, it just has to only be processed later when that's known. 65 + pub fn could_be(bytes: impl AsRef<[u8]>) -> bool { 66 + const NODE_FINGERPRINT: [u8; 3] = [ 67 + 0xA2, // map length 2 (for "l" and "e" keys) 68 + 0x61, // text length 1 69 + b'e', // "e" before "l" because map keys have to be lex-sorted 70 + // 0x8?: "e" contains an array (0x8 nibble) of some length (low nib) 71 + ]; 72 + let bytes = bytes.as_ref(); 73 + bytes.starts_with(&NODE_FINGERPRINT) 74 + && bytes.get(3).map(|b| b & 0xF0 == 0x80).unwrap_or(false) 75 + } 76 + } 77 + 78 + 79 + impl Node { 55 80 /// Check if a node has any entries 56 81 /// 57 82 /// An empty repository with no records is represented as a single MST node
+17 -5
src/walk.rs
··· 1 1 //! Depth-first MST traversal 2 2 3 3 use crate::mst::Node; 4 + use crate::drive::MaybeProcessedBlock; 4 5 use ipld_core::cid::Cid; 5 6 use std::collections::HashMap; 6 7 use std::fmt; ··· 11 12 NodeEmpty, 12 13 #[error("Failed to decode commit block: {0}")] 13 14 BadCommit(Box<dyn std::error::Error>), 15 + #[error("Failed to process record: {0}")] 16 + RecordFailedProcessing(Box<dyn std::error::Error>), 14 17 #[error("Failed to compute an rkey due to invalid prefix_len")] 15 18 EntryPrefixOutOfbounds, 16 19 #[error("RKey was not utf-8")] ··· 18 21 } 19 22 20 23 #[derive(Debug)] 21 - pub enum Step { 24 + pub enum Step<T> { 22 25 Rest, 23 26 Finish, 24 - Step { rkey: String, data: Vec<u8> }, 27 + Step { rkey: String, data: T }, 25 28 } 26 29 27 30 /// some block we need (or have found) ··· 172 175 } 173 176 } 174 177 175 - pub fn walk(&mut self, blocks: &mut HashMap<Cid, Vec<u8>>) -> Result<Step, Trip> { 178 + pub fn walk(&mut self, blocks: &mut HashMap<Cid, MaybeProcessedBlock<usize>>) -> Result<Step<usize>, Trip> { 176 179 loop { 177 180 let Some(current_node) = self.stack.last_mut() else { 178 181 log::trace!("tried to walk but we're actually done."); ··· 190 193 log::trace!("node not found, resting"); 191 194 return Ok(Step::Rest); 192 195 }; 193 - let node = serde_ipld_dagcbor::from_slice::<Node>(&block) 196 + 197 + let MaybeProcessedBlock::Raw(data) = block else { 198 + return Err(Trip::BadCommit("failed commit fingerprint".into())); 199 + }; 200 + let node = serde_ipld_dagcbor::from_slice::<Node>(&data) 194 201 .map_err(|e| Trip::BadCommit(e.into()))?; 195 202 196 203 // found node, make sure we remember ··· 206 213 return Ok(Step::Rest); 207 214 }; 208 215 let rkey = rkey.clone(); 209 - let data = data.to_vec(); 216 + let data = match data { 217 + MaybeProcessedBlock::Raw(data) => data.len(), 218 + MaybeProcessedBlock::Processed(Ok(t)) => *t, 219 + MaybeProcessedBlock::Processed(e) => 220 + return Err(Trip::RecordFailedProcessing(format!("booo: {e:?}").into())), // TODO 221 + }; 210 222 211 223 // found node, make sure we remember 212 224 current_node.found(cid);