Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

works on at least one getRecord car slice example

phil 42a3e3b0 4d416aa4

+32 -35
+5 -14
src/drive.rs
··· 1 1 //! Consume a CAR from an AsyncRead, producing an ordered stream of records 2 2 3 - use crate::link::{ObjectLink, NodeThing, ThingKind}; 3 + use crate::link::{NodeThing, ObjectLink, ThingKind}; 4 4 use crate::{ 5 5 Bytes, HashMap, Rkey, Step, 6 6 disk::{DiskError, DiskStore}, ··· 242 242 }; 243 243 let mut walker = Walker::new(root_node); 244 244 245 - eprintln!("setpping to edge??"); 246 245 let edge = walker.step_to_edge(&mem_blocks)?; 247 - eprintln!("got edge: {edge:?}"); 248 246 249 247 Ok(Driver::Memory( 250 248 commit, 251 - None, 249 + edge, 252 250 MemDriver { 253 251 blocks: mem_blocks, 254 252 walker, ··· 298 296 // let mut err; 299 297 for _ in 0..n { 300 298 match self.walker.step(&self.blocks, self.process) { 301 - Ok(Step::Value(record)) => { 302 - println!("got one! {record:?}"); 303 - out.push(record); 304 - }, 299 + Ok(Step::Value(record)) => out.push(record), 305 300 Ok(Step::End(None)) => break, 306 301 Ok(Step::End(_)) => todo!("actually this should be unreachable?"), 307 302 Err(WalkError::MissingBlock(missing)) => { 308 - eprintln!("got missing so we should be bailing normally now"); 309 303 self.next_missing = Some(*missing); 310 - return Ok(Step::Value(out)) // nb: might be empty! 304 + return Ok(Step::Value(out)); // nb: might be empty! 311 305 } 312 - Err(other) => { 313 - eprintln!("wait we errored??? {other:?}"); 314 - return Err(other.into()) 315 - }, 306 + Err(other) => return Err(other.into()), 316 307 } 317 308 } 318 309 if out.is_empty() {
+2 -5
src/walk.rs
··· 164 164 let mut rkey_prev = None; 165 165 loop { 166 166 match ant.step(blocks, noop) { 167 - Err(WalkError::MissingBlock(thing)) => { 168 - if let NodeThing { kind: ThingKind::Record(rkey), .. } = *thing { 169 - eprintln!("got one: {rkey}"); 167 + Err(WalkError::MissingBlock(thing)) => { 168 + if let ThingKind::Record(rkey) = thing.kind { 170 169 rkey_prev = Some(rkey); 171 - } else { 172 - eprintln!("got a missing child"); 173 170 } 174 171 *self = ant; 175 172 ant = self.clone();
+25 -16
tests/car-slices.rs
··· 7 7 bytes: &[u8], 8 8 expected_records: usize, 9 9 expected_sum: usize, 10 - expect_rkey: &str, 11 10 expect_preceeding: &str, 11 + expect_rkey: &str, 12 12 expect_proceeding: &str, 13 13 ) { 14 14 let (mut driver, before) = match Driver::load_car( ··· 25 25 26 26 assert_eq!(before, Some(expect_preceeding.into())); 27 27 28 - let mut records = 0; 28 + let mut found_records = 0; 29 29 let mut sum = 0; 30 30 let mut found_expected_rkey = false; 31 31 let mut prev_rkey = "".to_string(); 32 32 33 - while let Step::Value(pairs) = driver.next_chunk(256).await.unwrap() { 34 - for Output { rkey, cid: _, data } in pairs { 35 - records += 1; 33 + while let Ok(step) = driver.next_chunk(256).await { 34 + match step { 35 + Step::Value(records) => { 36 + for Output { rkey, cid: _, data } in records { 37 + found_records += 1; 36 38 37 - let (int_bytes, _) = data.split_at(size_of::<usize>()); 38 - let size = usize::from_ne_bytes(int_bytes.try_into().unwrap()); 39 + let (int_bytes, _) = data.split_at(size_of::<usize>()); 40 + let size = usize::from_ne_bytes(int_bytes.try_into().unwrap()); 39 41 40 - sum += size; 41 - if rkey == expect_rkey { 42 - found_expected_rkey = true; 42 + sum += size; 43 + if rkey == expect_rkey { 44 + found_expected_rkey = true; 45 + } 46 + assert!(rkey > prev_rkey, "rkeys are streamed in order"); 47 + prev_rkey = rkey; 48 + } 43 49 } 44 - assert!(rkey > prev_rkey, "rkeys are streamed in order"); 45 - prev_rkey = rkey; 50 + Step::End(proceeding) => { 51 + assert_eq!(proceeding, Some(expect_proceeding.into())); 52 + break; 53 + } 46 54 } 47 55 } 48 56 49 - assert_eq!(records, expected_records); 57 + assert_eq!(found_records, expected_records); 50 58 assert_eq!(sum, expected_sum); 51 59 assert!(found_expected_rkey); 52 60 } ··· 57 65 RECORD_SLICE, 58 66 1, 59 67 212, 68 + "app.bsky.feed.like/3mcfzfbpaml27", 60 69 "app.bsky.feed.like/3mcg72x6bi32z", 61 - "", 62 - "", 63 - ).await 70 + "app.bsky.feed.like/3mcga2o2efq27", 71 + ) 72 + .await 64 73 }