Fast and robust atproto CAR file processing in rust
15
fork

Configure Feed

Select the types of activity you want to include in your feed.

a "step" closer

phil 4d416aa4 4cb25a90

+46 -9
+32 -6
src/drive.rs
··· 1 1 //! Consume a CAR from an AsyncRead, producing an ordered stream of records 2 2 3 - use crate::link::ObjectLink; 3 + use crate::link::{ObjectLink, NodeThing, ThingKind}; 4 4 use crate::{ 5 5 Bytes, HashMap, Rkey, Step, 6 6 disk::{DiskError, DiskStore}, ··· 253 253 blocks: mem_blocks, 254 254 walker, 255 255 process, 256 + next_missing: None, 256 257 }, 257 258 )) 258 259 } ··· 276 277 blocks: HashMap<ObjectLink, MaybeProcessedBlock>, 277 278 walker: Walker, 278 279 process: fn(Bytes) -> Bytes, 280 + next_missing: Option<NodeThing>, 279 281 } 280 282 281 283 impl MemDriver { 282 284 /// Step through the record outputs, in rkey order 283 285 pub async fn next_chunk(&mut self, n: usize) -> Result<Step<BlockChunk>, DriveError> { 286 + if let Some(missing) = &self.next_missing { 287 + println!("other side???"); 288 + // TODO: make the walker finish walking to verify no more present blocks (oops sparse tree) 289 + // HACK: just get the last rkey if it's there -- i think we might actually need to walk for it though 290 + // ...and walk to verify rkey order of the rest of the nodes anyway? 291 + return Ok(match &missing.kind { 292 + ThingKind::ChildNode => Step::End(None), 293 + ThingKind::Record(rkey) => Step::End(Some(rkey.clone())), 294 + }); 295 + } 296 + println!("stepping in..."); 284 297 let mut out = Vec::with_capacity(n); 298 + // let mut err; 285 299 for _ in 0..n { 286 - // walk as far as we can until we run out of blocks or find a record 287 - let Step::Value(output) = self.walker.step(&self.blocks, self.process)? else { 288 - break; 289 - }; 290 - out.push(output); 300 + match self.walker.step(&self.blocks, self.process) { 301 + Ok(Step::Value(record)) => { 302 + println!("got one! {record:?}"); 303 + out.push(record); 304 + }, 305 + Ok(Step::End(None)) => break, 306 + Ok(Step::End(_)) => todo!("actually this should be unreachable?"), 307 + Err(WalkError::MissingBlock(missing)) => { 308 + eprintln!("got missing so we should be bailing normally now"); 309 + self.next_missing = Some(*missing); 310 + return Ok(Step::Value(out)) // nb: might be empty! 311 + } 312 + Err(other) => { 313 + eprintln!("wait we errored??? {other:?}"); 314 + return Err(other.into()) 315 + }, 316 + } 291 317 } 292 318 if out.is_empty() { 293 319 Ok(Step::End(None))
+14 -3
tests/car-slices.rs
··· 8 8 expected_records: usize, 9 9 expected_sum: usize, 10 10 expect_rkey: &str, 11 + expect_preceeding: &str, 12 + expect_proceeding: &str, 11 13 ) { 12 - let mut driver = match Driver::load_car( 14 + let (mut driver, before) = match Driver::load_car( 13 15 bytes, 14 16 |block| block.len().to_ne_bytes().to_vec(), 15 17 10, /* MiB */ ··· 17 19 .await 18 20 .unwrap() 19 21 { 20 - Driver::Memory(_commit, _, mem_driver) => mem_driver, 22 + Driver::Memory(_commit, before, mem_driver) => (mem_driver, before), 21 23 Driver::Disk(_) => panic!("too big"), 22 24 }; 25 + 26 + assert_eq!(before, Some(expect_preceeding.into())); 23 27 24 28 let mut records = 0; 25 29 let mut sum = 0; ··· 49 53 50 54 #[tokio::test] 51 55 async fn test_record_slice_car() { 52 - test_car_slice(RECORD_SLICE, 1, 0, "app.bsky.feed.like/3mcg72x6bi32z").await 56 + test_car_slice( 57 + RECORD_SLICE, 58 + 1, 59 + 212, 60 + "app.bsky.feed.like/3mcg72x6bi32z", 61 + "", 62 + "", 63 + ).await 53 64 }