Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

back to interleaved loading/walking but better

yay

the walker knows which block it needs to be able
to continue, so we pass that to the driver, and
have it load until that block is reached before
trying to walk again.

phil 5eef99d9 d7be121c

+38 -26
+34 -22
src/drive.rs
··· 123 123 async fn drive_ahead<E: Error + 'static, S: Stream<Item = CarBlock<E>> + Unpin, T: Clone>( 124 124 vehicle: &mut Vehicle<E, S, T>, 125 125 ) -> Result<Option<(Rkey, T)>, DriveError> { 126 - // trying smth: load all blocks first 127 - if !vehicle.walked_out { 128 - // stopped at a rest, try to load more blocks first 126 + 127 + 'outer: loop { 128 + // walk until we can't load a block 129 + let cid_needed = loop { 130 + // walk as far as we can until we run out of blocks or find a record 131 + match vehicle.walker.walk(&mut vehicle.blocks, vehicle.process)? { 132 + Step::Rest(cid) => { 133 + log::trace!("walker is resting, get another block"); 134 + // panic!("we should have had all blocks already"); 135 + // vehicle.walked_out = true; 136 + break cid; 137 + } 138 + Step::Finish => { 139 + log::trace!("walker finished"); 140 + return Ok(None); 141 + } 142 + Step::Step { rkey, data } => { 143 + return Ok(Some((Rkey(rkey), data))); 144 + } 145 + } 146 + }; 147 + 148 + let mut found_any = false; 149 + // load blocks until we reach that cid 129 150 while let Some((cid, data)) = vehicle 130 151 .block_stream 131 152 .try_next() 132 153 .await 133 154 .map_err(|e| DriveError::CarBlockError(e.into()))? 134 155 { 156 + found_any = true; 135 157 let val = if Node::could_be(&data) { 136 158 MaybeProcessedBlock::Raw(data) 137 159 } else { 138 160 MaybeProcessedBlock::Processed((vehicle.process)(&data)) 139 161 }; 140 162 vehicle.blocks.insert(cid, val); 141 - }; 142 - vehicle.walked_out = true; 143 - // pause to let macos activity monitor's memory stat update, definitely the best way to do this 144 - // tokio::time::sleep(std::time::Duration::from_secs(30)).await; 145 - } 146 - loop { 147 163 148 - // walk as far as we can until we run out of blocks or find a record 149 - match vehicle.walker.walk(&mut vehicle.blocks, vehicle.process)? { 150 - Step::Rest => { 151 - log::trace!("walker is resting, get another block"); 152 - panic!("we should have had all blocks already"); 153 - // vehicle.walked_out = true; 154 - } 155 - Step::Finish => { 156 - log::trace!("walker finished"); 157 - return Ok(None); 158 - } 159 - Step::Step { rkey, data } => { 160 - return Ok(Some((Rkey(rkey), data))); 164 + if cid == cid_needed { 165 + continue 'outer; 161 166 } 167 + }; 168 + 169 + if !found_any { 170 + panic!("walker unfinished but no more blocks to load"); 162 171 } 163 172 } 173 + 174 + // pause to let macos activity monitor's memory stat update, definitely the best way to do this 175 + // tokio::time::sleep(std::time::Duration::from_secs(30)).await; 164 176 }
+1 -1
src/mst.rs
··· 71 71 ]; 72 72 let bytes = bytes.as_ref(); 73 73 bytes.starts_with(&NODE_FINGERPRINT) 74 - && bytes.get(3).map(|b| b & 0xF0 == 0x80).unwrap_or(false) 74 + // && bytes.get(3).map(|b| b & 0xF0 == 0x80).unwrap_or(false) 75 75 } 76 76 } 77 77
+3 -3
src/walk.rs
··· 24 24 25 25 #[derive(Debug)] 26 26 pub enum Step<T> { 27 - Rest, 27 + Rest(Cid), 28 28 Finish, 29 29 Step { rkey: String, data: T }, 30 30 } ··· 197 197 log::trace!("need node {cid:?}"); 198 198 let Some(block) = blocks.remove(cid) else { 199 199 log::trace!("node not found, resting"); 200 - return Ok(Step::Rest); 200 + return Ok(Step::Rest(*cid)); 201 201 }; 202 202 203 203 let MaybeProcessedBlock::Raw(data) = block else { ··· 216 216 log::trace!("need record {cid:?}"); 217 217 let Some(data) = blocks.get(cid) else { 218 218 log::trace!("record block not found, resting"); 219 - return Ok(Step::Rest); 219 + return Ok(Step::Rest(*cid)); 220 220 }; 221 221 let rkey = rkey.clone(); 222 222 let data = match data {