Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

buffering all blocks first is 2.5x faster :(

phil 3ae4a92a 349f69b2

+56 -59
+28 -14
src/drive.rs
··· 86 86 async fn drive_ahead<E: Error + 'static, S: Stream<Item = CarBlock<E>> + Unpin>( 87 87 vehicle: &mut Vehicle<E, S>, 88 88 ) -> Result<Option<(Rkey, Vec<u8>)>, DriveError> { 89 - loop { 90 - if vehicle.walked_out { 91 - // stopped at a rest, try to load more blocks first 92 - let Some((cid, data)) = vehicle 93 - .block_stream 94 - .try_next() 95 - .await 96 - .map_err(|e| DriveError::CarBlockError(e.into()))? 97 - else { 98 - return Err(DriveError::Dnf); 99 - }; 89 + // trying smth: load all blocks first 90 + if !vehicle.walked_out { 91 + // stopped at a rest, try to load more blocks first 92 + while let Some((cid, data)) = vehicle 93 + .block_stream 94 + .try_next() 95 + .await 96 + .map_err(|e| DriveError::CarBlockError(e.into()))? { 100 97 vehicle.blocks.insert(cid, data); 101 - vehicle.walked_out = false; 102 - } 98 + }; 99 + vehicle.walked_out = true; 100 + } 101 + loop { 102 + // if vehicle.walked_out { 103 + // // stopped at a rest, try to load more blocks first 104 + // let Some((cid, data)) = vehicle 105 + // .block_stream 106 + // .try_next() 107 + // .await 108 + // .map_err(|e| DriveError::CarBlockError(e.into()))? 109 + // else { 110 + // return Err(DriveError::Dnf); 111 + // }; 112 + // vehicle.blocks.insert(cid, data); 113 + // vehicle.walked_out = false; 114 + // } 115 + 103 116 // walk as far as we can until we run out of blocks or find a record 104 117 match vehicle.walker.walk(&mut vehicle.blocks)? { 105 118 Step::Rest => { 106 119 log::trace!("walker is resting, get another block"); 107 - vehicle.walked_out = true; 120 + panic!("we should have had all blocks already"); 121 + // vehicle.walked_out = true; 108 122 } 109 123 Step::Finish => { 110 124 log::trace!("walker finished");
+28 -45
src/walk.rs
··· 133 133 Record { rkey: String, cid: Cid }, 134 134 } 135 135 136 - // impl Need { 137 - // fn cid(&self) -> &Cid { 138 - // match self { 139 - // Need::Node(cid) => cid, 140 - // Need::Record { cid, .. } => cid, 141 - // } 142 - // } 143 - // } 144 - 145 136 impl From<&Node> for ActionNode { 146 137 fn from(node: &Node) -> Self { 147 138 let left_tree = node.left.as_ref().map(Into::into); ··· 182 173 } 183 174 184 175 pub fn walk(&mut self, blocks: &mut HashMap<Cid, Vec<u8>>) -> Result<Step, Trip> { 185 - // //// debug 186 - // for (i, need) in self.stack.iter().enumerate() { 187 - // let k = if let Need::Record { rkey, .. } = need { 188 - // rkey 189 - // } else { 190 - // "[??]" 191 - // }; 192 - // println!("{: <1$} {k}", "", i) 193 - // } 194 - 195 176 loop { 196 177 let Some(current_node) = self.stack.last_mut() else { 197 178 log::trace!("tried to walk but we're actually done."); ··· 238 219 } 239 220 } 240 221 241 - // #[cfg(test)] 242 - // mod test { 243 - // use super::*; 244 - // use crate::mst::Entry; 222 + #[cfg(test)] 223 + mod test { 224 + use super::*; 225 + // use crate::mst::Entry; 245 226 246 - // fn cid1() -> Cid { 247 - // "bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m" 248 - // .parse() 249 - // .unwrap() 250 - // } 227 + fn cid1() -> Cid { 228 + "bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m" 229 + .parse() 230 + .unwrap() 231 + } 251 232 // fn cid2() -> Cid { 252 233 // "QmY7Yh4UquoXHLPFo2XbhXkhBvFoPwmQUSa92pxnxjQuPU" 253 234 // .parse() ··· 289 270 // .unwrap() 290 271 // } 291 272 292 - // #[test] 293 - // fn test_needs_from_node_empty() { 294 - // let node = Node { 295 - // left: None, 296 - // entries: vec![], 297 - // }; 298 - // assert_eq!(needs_from_node(node).unwrap(), vec![]); 299 - // } 273 + #[test] 274 + fn test_next_from_node_empty() { 275 + let node = Node { 276 + left: None, 277 + entries: vec![], 278 + }; 279 + let action_node: ActionNode = (&node).into(); 280 + assert_eq!(action_node.next(), None); 281 + } 300 282 301 - // #[test] 302 - // fn test_needs_from_node_just_left() { 303 - // let node = Node { 304 - // left: Some(cid1()), 305 - // entries: vec![], 306 - // }; 307 - // assert_eq!(needs_from_node(node).unwrap(), vec![Need::Node(cid1()),]); 308 - // } 283 + #[test] 284 + fn test_needs_from_node_just_left() { 285 + let node = Node { 286 + left: Some(cid1()), 287 + entries: vec![], 288 + }; 289 + let action_node: ActionNode = (&node).into(); 290 + assert_eq!(action_node.next(), Some(Need::Node(cid1()))); 291 + } 309 292 310 293 // #[test] 311 294 // fn test_needs_from_node_just_one_record() { ··· 465 448 // ] 466 449 // ); 467 450 // } 468 - // } 451 + }