Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

generic processing (wip)

small apparent slowdown, lots of cleanup to do

phil d7be121c 6944151f

+35 -18
+6 -2
examples/read-file/main.rs
··· 35 35 // let stream = Box::pin(reader.stream()); 36 36 let stream = std::pin::pin!(reader.stream()); 37 37 38 - let (commit, v) = repo_stream::drive::Vehicle::init(&root, stream).await?; 38 + let (commit, v) = repo_stream::drive::Vehicle::init( 39 + &root, 40 + stream, 41 + |block| Ok(block.len()), 42 + ).await?; 39 43 let mut record_stream = std::pin::pin!(v.stream()); 40 44 41 45 log::info!("got commit: {commit:?}"); 42 46 43 47 while let Some((rkey, rec)) = record_stream.try_next().await? { 44 - log::info!("got {rkey:?} {rec:?}"); 48 + log::info!("got {rkey:?}"); 45 49 } 46 50 log::info!("bye!"); 47 51
+17 -11
src/drive.rs
··· 60 60 Processed(Result<T, Box<dyn Error>>), 61 61 } 62 62 63 - pub struct Vehicle<E, S: Stream<Item = CarBlock<E>>> { 63 + pub struct Vehicle<E, S: Stream<Item = CarBlock<E>>, T> { 64 64 block_stream: S, 65 - blocks: HashMap<Cid, MaybeProcessedBlock<usize>>, 65 + blocks: HashMap<Cid, MaybeProcessedBlock<T>>, 66 66 walker: Walker, 67 67 walked_out: bool, 68 + process: fn(&[u8]) -> Result<T, Box<dyn Error>>, 68 69 } 69 70 70 - impl<E: Error + 'static, S: Stream<Item = CarBlock<E>> + Unpin> Vehicle<E, S> { 71 - pub async fn init(root: &Cid, mut block_stream: S) -> Result<(Commit, Self), DriveError> { 71 + impl<E: Error + 'static, S: Stream<Item = CarBlock<E>> + Unpin, T: Clone> Vehicle<E, S, T> { 72 + pub async fn init( 73 + root: &Cid, 74 + mut block_stream: S, 75 + process: fn(&[u8]) -> Result<T, Box<dyn Error>>, 76 + ) -> Result<(Commit, Self), DriveError> { 72 77 let mut blocks = HashMap::new(); 73 78 74 79 let mut commit = None; ··· 98 103 blocks, 99 104 walker, 100 105 walked_out: false, 106 + process, 101 107 }; 102 108 Ok((commit, me)) 103 109 } 104 110 105 - pub async fn next_record(&mut self) -> Result<Option<(Rkey, usize)>, DriveError> { 111 + pub async fn next_record(&mut self) -> Result<Option<(Rkey, T)>, DriveError> { 106 112 drive_ahead(self).await 107 113 } 108 114 109 - pub fn stream(self) -> impl Stream<Item = Result<(Rkey, usize), DriveError>> { 115 + pub fn stream(self) -> impl Stream<Item = Result<(Rkey, T), DriveError>> { 110 116 futures::stream::try_unfold(self, |mut this| async move { 111 117 let maybe_record = drive_ahead(&mut this).await?; 112 118 Ok(maybe_record.map(|b| (b, this))) ··· 114 120 } 115 121 } 116 122 117 - async fn drive_ahead<E: Error + 'static, S: Stream<Item = CarBlock<E>> + Unpin>( 118 - vehicle: &mut Vehicle<E, S>, 119 - ) -> Result<Option<(Rkey, usize)>, DriveError> { 123 + async fn drive_ahead<E: Error + 'static, S: Stream<Item = CarBlock<E>> + Unpin, T: Clone>( 124 + vehicle: &mut Vehicle<E, S, T>, 125 + ) -> Result<Option<(Rkey, T)>, DriveError> { 120 126 // trying smth: load all blocks first 121 127 if !vehicle.walked_out { 122 128 // stopped at a rest, try to load more blocks first ··· 129 135 let val = if Node::could_be(&data) { 130 136 MaybeProcessedBlock::Raw(data) 131 137 } else { 132 - MaybeProcessedBlock::Processed(Ok(data.len())) 138 + MaybeProcessedBlock::Processed((vehicle.process)(&data)) 133 139 }; 134 140 vehicle.blocks.insert(cid, val); 135 141 }; ··· 140 146 loop { 141 147 142 148 // walk as far as we can until we run out of blocks or find a record 143 - match vehicle.walker.walk(&mut vehicle.blocks)? { 149 + match vehicle.walker.walk(&mut vehicle.blocks, vehicle.process)? { 144 150 Step::Rest => { 145 151 log::trace!("walker is resting, get another block"); 146 152 panic!("we should have had all blocks already");
+12 -5
src/walk.rs
··· 18 18 EntryPrefixOutOfbounds, 19 19 #[error("RKey was not utf-8")] 20 20 EntryRkeyNotUtf8(#[from] std::string::FromUtf8Error), 21 + #[error("Process failed: {0}")] 22 + ProcessFailed(Box<dyn std::error::Error>), 21 23 } 22 24 23 25 #[derive(Debug)] ··· 175 177 } 176 178 } 177 179 178 - pub fn walk(&mut self, blocks: &mut HashMap<Cid, MaybeProcessedBlock<usize>>) -> Result<Step<usize>, Trip> { 180 + pub fn walk<T: Clone>( 181 + &mut self, 182 + blocks: &mut HashMap<Cid, MaybeProcessedBlock<T>>, 183 + process: fn(&[u8]) -> Result<T, Box<dyn std::error::Error>>, 184 + ) -> Result<Step<T>, Trip> { 179 185 loop { 180 186 let Some(current_node) = self.stack.last_mut() else { 181 187 log::trace!("tried to walk but we're actually done."); ··· 214 220 }; 215 221 let rkey = rkey.clone(); 216 222 let data = match data { 217 - MaybeProcessedBlock::Raw(data) => data.len(), 218 - MaybeProcessedBlock::Processed(Ok(t)) => *t, 219 - MaybeProcessedBlock::Processed(e) => 220 - return Err(Trip::RecordFailedProcessing(format!("booo: {e:?}").into())), // TODO 223 + MaybeProcessedBlock::Raw(data) => process(&data), 224 + MaybeProcessedBlock::Processed(Ok(t)) => Ok(t.clone()), 225 + MaybeProcessedBlock::Processed(_e) => 226 + return Err(Trip::RecordFailedProcessing("booo".into())), // TODO 221 227 }; 222 228 223 229 // found node, make sure we remember 224 230 current_node.found(cid); 225 231 226 232 log::trace!("emitting a block as a step. depth={}", self.stack.len()); 233 + let data = data.map_err(Trip::ProcessFailed)?; 227 234 return Ok(Step::Step { rkey, data }); 228 235 } 229 236 }