Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

eagerly process records during init

and a little refactor

phil 6dbdc163 7bcf6579

+53 -51
+2 -2
examples/read-file/main.rs
··· 36 36 let stream = std::pin::pin!(reader.stream()); 37 37 38 38 let (commit, v) = repo_stream::drive::Vehicle::init( 39 - &root, 39 + root, 40 40 stream, 41 41 |block| Ok(block.len()), 42 42 ).await?; ··· 44 44 45 45 log::info!("got commit: {commit:?}"); 46 46 47 - while let Some((rkey, rec)) = record_stream.try_next().await? { 47 + while let Some((rkey, _rec)) = record_stream.try_next().await? { 48 48 log::info!("got {rkey:?}"); 49 49 } 50 50 log::info!("bye!");
+51 -49
src/drive.rs
··· 65 65 // TODO: generic error not box dyn nonsense. 66 66 pub type ProcRes<T> = Result<T, Box<dyn Error>>; 67 67 68 - pub struct Vehicle<E, S: Stream<Item = CarBlock<E>>, T, F: Fn(&[u8]) -> ProcRes<T>> { 68 + pub struct Vehicle<SE, S, T, P> 69 + where 70 + S: Stream<Item = CarBlock<SE>>, 71 + P: Fn(&[u8]) -> ProcRes<T>, 72 + { 69 73 block_stream: S, 70 74 blocks: HashMap<Cid, MaybeProcessedBlock<T>>, 71 75 walker: Walker, 72 - process: F, 76 + process: P, 73 77 } 74 78 75 - impl<E: Error + 'static, S: Stream<Item = CarBlock<E>> + Unpin, T: Clone, F: Fn(&[u8]) -> ProcRes<T>> Vehicle<E, S, T, F> { 79 + impl<SE, S, T: Clone, P> Vehicle<SE, S, T, P> 80 + where 81 + SE: Error + 'static, 82 + S: Stream<Item = CarBlock<SE>> + Unpin, 83 + P: Fn(&[u8]) -> ProcRes<T>, 84 + { 76 85 pub async fn init( 77 - root: &Cid, 86 + root: Cid, 78 87 mut block_stream: S, 79 - process: F, 88 + process: P, 80 89 ) -> Result<(Commit, Self), DriveError> { 81 90 let mut blocks = HashMap::new(); 82 91 83 92 let mut commit = None; 84 - while let Some((block_cid, data)) = block_stream 93 + 94 + while let Some((cid, data)) = block_stream 85 95 .try_next() 86 96 .await 87 97 .map_err(|e| DriveError::CarBlockError(e.into()))? 88 98 { 89 - if block_cid == *root { 99 + if cid == root { 90 100 let c: Commit = serde_ipld_dagcbor::from_slice(&data) 91 101 .map_err(|e| DriveError::BadCommit(e.into()))?; 92 102 commit = Some(c); 93 103 break; // inner while 104 + } else { 105 + blocks.insert(cid, if Node::could_be(&data) { 106 + MaybeProcessedBlock::Raw(data) 107 + } else { 108 + MaybeProcessedBlock::Processed(process(&data)) 109 + }); 94 110 } 95 - // lazy: before the commit just stash raw blocks 96 - // TODO: eh??? 97 - blocks.insert(block_cid, MaybeProcessedBlock::Raw(data)); 98 111 } 99 112 100 113 // we either broke out or read all the blocks without finding the commit... ··· 111 124 Ok((commit, me)) 112 125 } 113 126 114 - pub async fn next_record(&mut self) -> Result<Option<(Rkey, T)>, DriveError> { 115 - 'outer: loop { 116 - // walk until we can't load a block 117 - let cid_needed = loop { 118 - // walk as far as we can until we run out of blocks or find a record 119 - match self.walker.walk(&mut self.blocks, &self.process)? { 120 - Step::Rest(cid) => { 121 - log::trace!("walker is resting, get another block"); 122 - // panic!("we should have had all blocks already"); 123 - // self.walked_out = true; 124 - break cid; 125 - } 126 - Step::Finish => { 127 - log::trace!("walker finished"); 128 - return Ok(None); 129 - } 130 - Step::Step { rkey, data } => { 131 - return Ok(Some((Rkey(rkey), data))); 132 - } 133 - } 134 - }; 127 + async fn drive_until(&mut self, cid_needed: Cid) -> Result<(), DriveError> { 128 + while let Some((cid, data)) = self 129 + .block_stream 130 + .try_next() 131 + .await 132 + .map_err(|e| DriveError::CarBlockError(e.into()))? 133 + { 134 + self.blocks.insert(cid, if Node::could_be(&data) { 135 + MaybeProcessedBlock::Raw(data) 136 + } else { 137 + MaybeProcessedBlock::Processed((self.process)(&data)) 138 + }); 139 + if cid == cid_needed { 140 + return Ok(()); 141 + } 142 + }; 135 143 136 - // load blocks until we reach that cid 137 - while let Some((cid, data)) = self 138 - .block_stream 139 - .try_next() 140 - .await 141 - .map_err(|e| DriveError::CarBlockError(e.into()))? 142 - { 143 - let val = if Node::could_be(&data) { 144 - MaybeProcessedBlock::Raw(data) 145 - } else { 146 - MaybeProcessedBlock::Processed((self.process)(&data)) 147 - }; 148 - self.blocks.insert(cid, val); 144 + // if we never found the block 145 + return Err(DriveError::MissingBlock(cid_needed)); 146 + } 149 147 150 - if cid == cid_needed { 151 - continue 'outer; 152 - } 148 + pub async fn next_record(&mut self) -> Result<Option<(Rkey, T)>, DriveError> { 149 + loop { 150 + // walk as far as we can until we run out of blocks or find a record 151 + let cid_needed = match self.walker.walk(&mut self.blocks, &self.process)? { 152 + Step::Rest(cid) => cid, 153 + Step::Finish => return Ok(None), 154 + Step::Step { rkey, data } => return Ok(Some((Rkey(rkey), data))), 153 155 }; 154 156 155 - // if we never found the block that the walker said we need 156 - return Err(DriveError::MissingBlock(cid_needed)); 157 + // load blocks until we reach that cid 158 + self.drive_until(cid_needed).await?; 157 159 } 158 160 } 159 161