Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

oops forgot to update the output with cid

phil 148baf0c 9d5db998

+36 -29
+1 -1
Cargo.lock
··· 1147 1147 1148 1148 [[package]] 1149 1149 name = "repo-stream" 1150 - version = "0.3.1" 1150 + version = "0.4.0" 1151 1151 dependencies = [ 1152 1152 "cid", 1153 1153 "clap",
+1 -1
Cargo.toml
··· 1 1 [package] 2 2 name = "repo-stream" 3 - version = "0.3.1" 3 + version = "0.4.0" 4 4 edition = "2024" 5 5 license = "MIT OR Apache-2.0" 6 6 description = "Fast and robust atproto CAR file processing"
+7
changelog.md
··· 1 + # v0.4.0 2 + 3 + _2026-01-15_ 4 + 5 + - use `Output { rkey, cid, data }` instead of the `(rkey, data)` tuple so that the `Cid` is exposed. this is to make tap-like diffing possible. 6 + 7 + 1 8 # v0.3.1 2 9 3 10 _2026-01-15_
+2 -2
examples/disk-read-file/main.rs
··· 80 80 // keep a count of the total number of blocks seen 81 81 n += pairs.len(); 82 82 83 - for (_, block) in pairs { 83 + for output in pairs { 84 84 // for each block, count how many bytes are equal to '0' 85 85 // (this is just an example, you probably want to do something more 86 86 // interesting) 87 - zeros += block.into_iter().filter(|&b| b == b'0').count() 87 + zeros += output.data.into_iter().filter(|&b| b == b'0').count() 88 88 } 89 89 } 90 90
+5 -5
readme.md
··· 11 11 [sponsor-badge]: https://img.shields.io/badge/at-microcosm-b820f9?labelColor=b820f9&logo=githubsponsors&logoColor=fff 12 12 13 13 ```rust no_run 14 - use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder}; 14 + use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder, Output}; 15 15 16 16 #[tokio::main] 17 17 async fn main() -> Result<(), Box<dyn std::error::Error>> { ··· 33 33 // if all blocks fit within memory 34 34 Driver::Memory(_commit, mut driver) => { 35 35 while let Some(chunk) = driver.next_chunk(256).await? { 36 - for (_rkey, processed) in chunk { 37 - let size = usize::from_ne_bytes(processed.try_into().unwrap()); 36 + for Output { rkey: _, cid: _, data } in chunk { 37 + let size = usize::from_ne_bytes(data.try_into().unwrap()); 38 38 total_size += size; 39 39 } 40 40 } ··· 48 48 let (_commit, mut driver) = paused.finish_loading(store).await?; 49 49 50 50 while let Some(chunk) = driver.next_chunk(256).await? { 51 - for (_rkey, processed) in chunk { 52 - let size = usize::from_ne_bytes(processed.try_into().unwrap()); 51 + for Output { rkey: _, cid: _, data } in chunk { 52 + let size = usize::from_ne_bytes(data.try_into().unwrap()); 53 53 total_size += size; 54 54 } 55 55 }
+12 -14
src/drive.rs
··· 35 35 JoinError(#[from] tokio::task::JoinError), 36 36 } 37 37 38 - /// An in-order chunk of Rkey + (processed) Block pairs 39 - pub type BlockChunk = Vec<(String, Bytes)>; 38 + /// An in-order chunk of Rkey + CID + (processed) Block 39 + pub type BlockChunk = Vec<Output>; 40 40 41 41 #[derive(Debug, Clone)] 42 42 pub(crate) enum MaybeProcessedBlock { ··· 272 272 let mut out = Vec::with_capacity(n); 273 273 for _ in 0..n { 274 274 // walk as far as we can until we run out of blocks or find a record 275 - let Some(Output { rkey, cid: _, data }) = 276 - self.walker.step(&mut self.blocks, self.process)? 277 - else { 275 + let Some(output) = self.walker.step(&mut self.blocks, self.process)? else { 278 276 break; 279 277 }; 280 - out.push((rkey, data)); 278 + out.push(output); 281 279 } 282 280 if out.is_empty() { 283 281 Ok(None) ··· 424 422 /// # async fn main() -> Result<(), DriveError> { 425 423 /// # let mut disk_driver = _get_fake_disk_driver(); 426 424 /// while let Some(pairs) = disk_driver.next_chunk(256).await? { 427 - /// for (rkey, record) in pairs { 428 - /// println!("{rkey}: size={}", record.len()); 425 + /// for output in pairs { 426 + /// println!("{}: size={}", output.rkey, output.data.len()); 429 427 /// } 430 428 /// } 431 429 /// # Ok(()) ··· 452 450 return (state, Err(e.into())); 453 451 } 454 452 }; 455 - let Some(Output { rkey, cid: _, data }) = step else { 453 + let Some(output) = step else { 456 454 break; 457 455 }; 458 - out.push((rkey, data)); 456 + out.push(output); 459 457 } 460 458 461 459 (state, Ok::<_, DriveError>(out)) ··· 492 490 Err(e) => return tx.blocking_send(Err(e.into())), 493 491 }; 494 492 495 - let Some(Output { rkey, cid: _, data }) = step else { 493 + let Some(output) = step else { 496 494 break; 497 495 }; 498 - out.push((rkey, data)); 496 + out.push(output); 499 497 } 500 498 501 499 if out.is_empty() { ··· 525 523 /// let (mut rx, join) = disk_driver.to_channel(512); 526 524 /// while let Some(recvd) = rx.recv().await { 527 525 /// let pairs = recvd?; 528 - /// for (rkey, record) in pairs { 529 - /// println!("{rkey}: size={}", record.len()); 526 + /// for output in pairs { 527 + /// println!("{}: size={}", output.rkey, output.data.len()); 530 528 /// } 531 529 /// 532 530 /// }
+5 -4
src/lib.rs
··· 37 37 // if all blocks fit within memory 38 38 Driver::Memory(_commit, mut driver) => { 39 39 while let Some(chunk) = driver.next_chunk(256).await? { 40 - for (_rkey, bytes) in chunk { 41 - let size = usize::from_ne_bytes(bytes.try_into().unwrap()); 40 + for output in chunk { 41 + let size = usize::from_ne_bytes(output.data.try_into().unwrap()); 42 42 43 43 total_size += size; 44 44 } ··· 53 53 let (_commit, mut driver) = paused.finish_loading(store).await?; 54 54 55 55 while let Some(chunk) = driver.next_chunk(256).await? { 56 - for (_rkey, bytes) in chunk { 57 - let size = usize::from_ne_bytes(bytes.try_into().unwrap()); 56 + for output in chunk { 57 + let size = usize::from_ne_bytes(output.data.try_into().unwrap()); 58 58 59 59 total_size += size; 60 60 } ··· 86 86 pub use disk::{DiskBuilder, DiskError, DiskStore}; 87 87 pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk, noop}; 88 88 pub use mst::Commit; 89 + pub use walk::Output; 89 90 90 91 pub type Bytes = Vec<u8>; 91 92
+3 -2
tests/non-huge-cars.rs
··· 1 1 extern crate repo_stream; 2 2 use repo_stream::Driver; 3 + use repo_stream::Output; 3 4 4 5 const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car"); 5 6 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); ··· 30 31 let mut prev_rkey = "".to_string(); 31 32 32 33 while let Some(pairs) = driver.next_chunk(256).await.unwrap() { 33 - for (rkey, bytes) in pairs { 34 + for Output { rkey, cid: _, data } in pairs { 34 35 records += 1; 35 36 36 - let (int_bytes, _) = bytes.split_at(size_of::<usize>()); 37 + let (int_bytes, _) = data.split_at(size_of::<usize>()); 37 38 let size = usize::from_ne_bytes(int_bytes.try_into().unwrap()); 38 39 39 40 sum += size;