Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

process: generic Fn instead of fn pointer

phil 2391de49 1d45b0ce

+8 -8
+7 -7
src/drive.rs
··· 62 62 Processed(Result<T, Box<dyn Error>>), 63 63 } 64 64 65 - pub struct Vehicle<E, S: Stream<Item = CarBlock<E>>, T> { 65 + pub struct Vehicle<E, S: Stream<Item = CarBlock<E>>, T, F: Fn(&[u8]) -> Result<T, Box<dyn Error>>> { 66 66 block_stream: S, 67 67 blocks: HashMap<Cid, MaybeProcessedBlock<T>>, 68 68 walker: Walker, 69 69 walked_out: bool, 70 - process: fn(&[u8]) -> Result<T, Box<dyn Error>>, 70 + process: F, 71 71 } 72 72 73 - impl<E: Error + 'static, S: Stream<Item = CarBlock<E>> + Unpin, T: Clone> Vehicle<E, S, T> { 73 + impl<E: Error + 'static, S: Stream<Item = CarBlock<E>> + Unpin, T: Clone, F: Fn(&[u8]) -> Result<T, Box<dyn Error>>> Vehicle<E, S, T, F> { 74 74 pub async fn init( 75 75 root: &Cid, 76 76 mut block_stream: S, 77 - process: fn(&[u8]) -> Result<T, Box<dyn Error>>, 77 + process: F, 78 78 ) -> Result<(Commit, Self), DriveError> { 79 79 let mut blocks = HashMap::new(); 80 80 ··· 122 122 } 123 123 } 124 124 125 - async fn drive_ahead<E: Error + 'static, S: Stream<Item = CarBlock<E>> + Unpin, T: Clone>( 126 - vehicle: &mut Vehicle<E, S, T>, 125 + async fn drive_ahead<E: Error + 'static, S: Stream<Item = CarBlock<E>> + Unpin, T: Clone, F: Fn(&[u8]) -> Result<T, Box<dyn Error>>>( 126 + vehicle: &mut Vehicle<E, S, T, F>, 127 127 ) -> Result<Option<(Rkey, T)>, DriveError> { 128 128 129 129 'outer: loop { 130 130 // walk until we can't load a block 131 131 let cid_needed = loop { 132 132 // walk as far as we can until we run out of blocks or find a record 133 - match vehicle.walker.walk(&mut vehicle.blocks, vehicle.process)? { 133 + match vehicle.walker.walk(&mut vehicle.blocks, &vehicle.process)? { 134 134 Step::Rest(cid) => { 135 135 log::trace!("walker is resting, get another block"); 136 136 // panic!("we should have had all blocks already");
+1 -1
src/walk.rs
··· 180 180 pub fn walk<T: Clone>( 181 181 &mut self, 182 182 blocks: &mut HashMap<Cid, MaybeProcessedBlock<T>>, 183 - process: fn(&[u8]) -> Result<T, Box<dyn std::error::Error>>, 183 + process: impl Fn(&[u8]) -> Result<T, Box<dyn std::error::Error>>, 184 184 ) -> Result<Step<T>, Trip> { 185 185 loop { 186 186 let Some(current_node) = self.stack.last_mut() else {