Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

nicer proc type, move the step function to struct

idk why it was an independent function in iroh_car which this was loosely based off of

phil 7bcf6579 2391de49

+51 -56
+49 -54
src/drive.rs
··· 62 62 Processed(Result<T, Box<dyn Error>>), 63 63 } 64 64 65 - pub struct Vehicle<E, S: Stream<Item = CarBlock<E>>, T, F: Fn(&[u8]) -> Result<T, Box<dyn Error>>> { 65 + // TODO: generic error not box dyn nonsense. 66 + pub type ProcRes<T> = Result<T, Box<dyn Error>>; 67 + 68 + pub struct Vehicle<E, S: Stream<Item = CarBlock<E>>, T, F: Fn(&[u8]) -> ProcRes<T>> { 66 69 block_stream: S, 67 70 blocks: HashMap<Cid, MaybeProcessedBlock<T>>, 68 71 walker: Walker, 69 - walked_out: bool, 70 72 process: F, 71 73 } 72 74 73 - impl<E: Error + 'static, S: Stream<Item = CarBlock<E>> + Unpin, T: Clone, F: Fn(&[u8]) -> Result<T, Box<dyn Error>>> Vehicle<E, S, T, F> { 75 + impl<E: Error + 'static, S: Stream<Item = CarBlock<E>> + Unpin, T: Clone, F: Fn(&[u8]) -> ProcRes<T>> Vehicle<E, S, T, F> { 74 76 pub async fn init( 75 77 root: &Cid, 76 78 mut block_stream: S, ··· 104 106 block_stream, 105 107 blocks, 106 108 walker, 107 - walked_out: false, 108 109 process, 109 110 }; 110 111 Ok((commit, me)) 111 112 } 112 113 113 114 pub async fn next_record(&mut self) -> Result<Option<(Rkey, T)>, DriveError> { 114 - drive_ahead(self).await 115 + 'outer: loop { 116 + // walk until we can't load a block 117 + let cid_needed = loop { 118 + // walk as far as we can until we run out of blocks or find a record 119 + match self.walker.walk(&mut self.blocks, &self.process)? { 120 + Step::Rest(cid) => { 121 + log::trace!("walker is resting, get another block"); 122 + // panic!("we should have had all blocks already"); 123 + // self.walked_out = true; 124 + break cid; 125 + } 126 + Step::Finish => { 127 + log::trace!("walker finished"); 128 + return Ok(None); 129 + } 130 + Step::Step { rkey, data } => { 131 + return Ok(Some((Rkey(rkey), data))); 132 + } 133 + } 134 + }; 135 + 136 + // load blocks until we reach that cid 137 + while let Some((cid, data)) = self 138 + .block_stream 139 + .try_next() 140 + .await 141 + .map_err(|e| DriveError::CarBlockError(e.into()))? 142 + { 143 + let val = if Node::could_be(&data) { 144 + MaybeProcessedBlock::Raw(data) 145 + } else { 146 + MaybeProcessedBlock::Processed((self.process)(&data)) 147 + }; 148 + self.blocks.insert(cid, val); 149 + 150 + if cid == cid_needed { 151 + continue 'outer; 152 + } 153 + }; 154 + 155 + // if we never found the block that the walker said we need 156 + return Err(DriveError::MissingBlock(cid_needed)); 157 + } 115 158 } 116 159 117 160 pub fn stream(self) -> impl Stream<Item = Result<(Rkey, T), DriveError>> { 118 161 futures::stream::try_unfold(self, |mut this| async move { 119 - let maybe_record = drive_ahead(&mut this).await?; 162 + let maybe_record = this.next_record().await?; 120 163 Ok(maybe_record.map(|b| (b, this))) 121 164 }) 122 165 } 123 166 } 124 - 125 - async fn drive_ahead<E: Error + 'static, S: Stream<Item = CarBlock<E>> + Unpin, T: Clone, F: Fn(&[u8]) -> Result<T, Box<dyn Error>>>( 126 - vehicle: &mut Vehicle<E, S, T, F>, 127 - ) -> Result<Option<(Rkey, T)>, DriveError> { 128 - 129 - 'outer: loop { 130 - // walk until we can't load a block 131 - let cid_needed = loop { 132 - // walk as far as we can until we run out of blocks or find a record 133 - match vehicle.walker.walk(&mut vehicle.blocks, &vehicle.process)? { 134 - Step::Rest(cid) => { 135 - log::trace!("walker is resting, get another block"); 136 - // panic!("we should have had all blocks already"); 137 - // vehicle.walked_out = true; 138 - break cid; 139 - } 140 - Step::Finish => { 141 - log::trace!("walker finished"); 142 - return Ok(None); 143 - } 144 - Step::Step { rkey, data } => { 145 - return Ok(Some((Rkey(rkey), data))); 146 - } 147 - } 148 - }; 149 - 150 - // load blocks until we reach that cid 151 - while let Some((cid, data)) = vehicle 152 - .block_stream 153 - .try_next() 154 - .await 155 - .map_err(|e| DriveError::CarBlockError(e.into()))? 156 - { 157 - let val = if Node::could_be(&data) { 158 - MaybeProcessedBlock::Raw(data) 159 - } else { 160 - MaybeProcessedBlock::Processed((vehicle.process)(&data)) 161 - }; 162 - vehicle.blocks.insert(cid, val); 163 - 164 - if cid == cid_needed { 165 - continue 'outer; 166 - } 167 - }; 168 - 169 - return Err(DriveError::MissingBlock(cid_needed)); 170 - } 171 - }
+2 -2
src/walk.rs
··· 1 1 //! Depth-first MST traversal 2 2 3 3 use crate::mst::Node; 4 - use crate::drive::MaybeProcessedBlock; 4 + use crate::drive::{MaybeProcessedBlock, ProcRes}; 5 5 use ipld_core::cid::Cid; 6 6 use std::collections::HashMap; 7 7 use std::fmt; ··· 180 180 pub fn walk<T: Clone>( 181 181 &mut self, 182 182 blocks: &mut HashMap<Cid, MaybeProcessedBlock<T>>, 183 - process: impl Fn(&[u8]) -> Result<T, Box<dyn std::error::Error>>, 183 + process: impl Fn(&[u8]) -> ProcRes<T>, 184 184 ) -> Result<Step<T>, Trip> { 185 185 loop { 186 186 let Some(current_node) = self.stack.last_mut() else {