Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

more failing tests for slice edges

phil 12fe017e 42a3e3b0

+83 -2
car-samples/slice-node-after.car

This is a binary file and will not be displayed.

car-samples/slice-node-before.car

This is a binary file and will not be displayed.

+62
examples/read-slice/main.rs
··· 1 + /*! 2 + Read a CAR slice in memory and show some info about it. 3 + */ 4 + 5 + extern crate repo_stream; 6 + use repo_stream::{Driver, DriverBuilder, Output, Step}; 7 + 8 + type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; 9 + 10 + #[tokio::main] 11 + async fn main() -> Result<()> { 12 + env_logger::init(); 13 + let reader = tokio::io::BufReader::new(tokio::io::stdin()); 14 + 15 + let (commit, prev_rkey, mut driver) = match DriverBuilder::new() 16 + .with_block_processor(|block| block.len().to_ne_bytes().to_vec()) 17 + .load_car(reader) 18 + .await? 19 + { 20 + Driver::Memory(commit, prev, mem_driver) => (commit, prev, mem_driver), 21 + Driver::Disk(_) => panic!("this example doesn't handle big CARs"), 22 + }; 23 + 24 + println!( 25 + "\nthis slice is from {}, repo rev {}", 26 + commit.did, commit.rev 27 + ); 28 + if let Some(rkey) = prev_rkey { 29 + println!(" -> key immediately before CAR slice: {rkey}"); 30 + } else { 31 + println!( 32 + " -> no key preceeding the CAR slice, so it includes the leading edge of the tree." 33 + ); 34 + } 35 + 36 + println!("included records:"); 37 + let end = loop { 38 + match driver.next_chunk(256).await? { 39 + Step::Value(chunk) => { 40 + for Output { cid, rkey, .. } in chunk { 41 + print!(" SHA256 "); 42 + for byte in cid.to_bytes().iter().skip(4).take(5) { 43 + print!("{byte:02x}"); 44 + } 45 + println!("...\t{rkey}"); 46 + } 47 + } 48 + Step::End(e) => break e, 49 + } 50 + }; 51 + 52 + println!("done walking records present in the slice."); 53 + if let Some(rkey) = end { 54 + println!(" -> key immediately after CAR slice: {rkey}"); 55 + } else { 56 + println!( 57 + " -> no key proceeding the CAR slice, so it includes the trailing edge of the tree." 58 + ); 59 + } 60 + 61 + Ok(()) 62 + }
-2
src/drive.rs
··· 282 282 /// Step through the record outputs, in rkey order 283 283 pub async fn next_chunk(&mut self, n: usize) -> Result<Step<BlockChunk>, DriveError> { 284 284 if let Some(missing) = &self.next_missing { 285 - println!("other side???"); 286 285 // TODO: make the walker finish walking to verify no more present blocks (oops sparse tree) 287 286 // HACK: just get the last rkey if it's there -- i think we might actually need to walk for it though 288 287 // ...and walk to verify rkey order of the rest of the nodes anyway? ··· 291 290 ThingKind::Record(rkey) => Step::End(Some(rkey.clone())), 292 291 }); 293 292 } 294 - println!("stepping in..."); 295 293 let mut out = Vec::with_capacity(n); 296 294 // let mut err; 297 295 for _ in 0..n {
+21
tests/car-slices.rs
··· 2 2 use repo_stream::{Driver, Output, Step}; 3 3 4 4 const RECORD_SLICE: &'static [u8] = include_bytes!("../car-samples/slice-one.car"); 5 + const RECORD_NODE_BEFORE: &'static [u8] = include_bytes!("../car-samples/slice-node-before.car"); 6 + const RECORD_NODE_AFTER: &'static [u8] = include_bytes!("../car-samples/slice-node-after.car"); 7 + // TODO: absense proof (zero records in slice) 5 8 6 9 async fn test_car_slice( 7 10 bytes: &[u8], ··· 71 74 ) 72 75 .await 73 76 } 77 + 78 + #[tokio::test] 79 + async fn test_record_slice_node_before() { 80 + test_car_slice(RECORD_NODE_BEFORE, 1, 212, "", "", "").await 81 + } 82 + 83 + #[tokio::test] 84 + async fn test_record_slice_node_after() { 85 + test_car_slice( 86 + RECORD_NODE_AFTER, 87 + 1, 88 + 212, 89 + "app.bsky.feed.like/3mbzi6ttskp2c", 90 + "", 91 + "", 92 + ) 93 + .await 94 + }