Fast and robust atproto CAR file processing in rust
15
fork

Configure Feed

Select the types of activity you want to include in your feed.

example code on readme

phil 336e416d c7618d02

+49
+49
readme.md
··· 8 8 [crates-badge]: https://img.shields.io/crates/v/repo-stream.svg 9 9 [docs-badge]: https://docs.rs/repo-stream/badge.svg 10 10 11 + ```rust 12 + use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder}; 13 + 14 + #[tokio::main] 15 + async fn main() -> Result<(), DriveError> { 16 + // repo-stream takes any AsyncRead as input, like a tokio::fs::File 17 + let reader = tokio::fs::File::open("repo.car".into()).await?; 18 + let reader = tokio::io::BufReader::new(reader); 19 + 20 + // example repo workload is simply counting the total record bytes 21 + let mut total_size = 0; 22 + 23 + match DriverBuilder::new() 24 + .with_mem_limit_mb(10) 25 + .with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size 26 + .load_car(reader) 27 + .await? 28 + { 29 + 30 + // if all blocks fit within memory 31 + Driver::Memory(_commit, mut driver) => { 32 + while let Some(chunk) = driver.next_chunk(256).await? { 33 + for (_rkey, size) in chunk { 34 + total_size += size; 35 + } 36 + } 37 + }, 38 + 39 + // if the CAR was too big for in-memory processing 40 + Driver::Disk(paused) => { 41 + // set up a disk store we can spill to 42 + let store = DiskBuilder::new().open("some/path.db".into()).await?; 43 + // do the spilling, get back a (similar) driver 44 + let (_commit, mut driver) = paused.finish_loading(store).await?; 45 + 46 + while let Some(chunk) = driver.next_chunk(256).await? { 47 + for (_rkey, size) in chunk { 48 + total_size += size; 49 + } 50 + } 51 + 52 + // clean up the disk store (drop tables etc) 53 + driver.reset_store().await?; 54 + } 55 + }; 56 + println!("sum of size of all records: {total_size}"); 57 + Ok(()) 58 + } 59 + ``` 11 60 12 61 more recent todo 13 62