Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

doctest the readme example

phil 42d6a294 cdd41e2c

+16 -14
+9 -6
readme.md
··· 10 10 [docs-badge]: https://docs.rs/repo-stream/badge.svg 11 11 [sponsor-badge]: https://img.shields.io/badge/at-microcosm-b820f9?labelColor=b820f9&logo=githubsponsors&logoColor=fff 12 12 13 - ```rust 13 + ```rust no_run 14 14 use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder}; 15 15 16 16 #[tokio::main] 17 - async fn main() -> Result<(), DriveError> { 17 + async fn main() -> Result<(), Box<dyn std::error::Error>> { 18 18 // repo-stream takes any AsyncRead as input, like a tokio::fs::File 19 - let reader = tokio::fs::File::open("repo.car".into()).await?; 19 + let reader = tokio::fs::File::open("repo.car").await?; 20 20 let reader = tokio::io::BufReader::new(reader); 21 21 22 22 // example repo workload is simply counting the total record bytes ··· 24 24 25 25 match DriverBuilder::new() 26 26 .with_mem_limit_mb(10) 27 - .with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size 27 + .with_block_processor( // block processing: just extract the raw record size 28 + |rec| rec.len().to_ne_bytes().to_vec()) 28 29 .load_car(reader) 29 30 .await? 30 31 { ··· 32 33 // if all blocks fit within memory 33 34 Driver::Memory(_commit, mut driver) => { 34 35 while let Some(chunk) = driver.next_chunk(256).await? { 35 - for (_rkey, size) in chunk { 36 + for (_rkey, processed) in chunk { 37 + let size = usize::from_ne_bytes(processed.try_into().unwrap()); 36 38 total_size += size; 37 39 } 38 40 } ··· 46 48 let (_commit, mut driver) = paused.finish_loading(store).await?; 47 49 48 50 while let Some(chunk) = driver.next_chunk(256).await? { 49 - for (_rkey, size) in chunk { 51 + for (_rkey, processed) in chunk { 52 + let size = usize::from_ne_bytes(processed.try_into().unwrap()); 50 53 total_size += size; 51 54 } 52 55 }
+7 -8
src/lib.rs
··· 28 28 match DriverBuilder::new() 29 29 .with_mem_limit_mb(10) 30 30 .with_block_processor( 31 - |rec| rec.len().to_ne_bytes().to_vec().into() 31 + |rec| rec.len().to_ne_bytes().to_vec() 32 32 ) // block processing: just extract the raw record size 33 33 .load_car(reader) 34 34 .await? ··· 38 38 Driver::Memory(_commit, mut driver) => { 39 39 while let Some(chunk) = driver.next_chunk(256).await? { 40 40 for (_rkey, bytes) in chunk { 41 - 42 - let (int_bytes, _) = bytes.split_at(size_of::<usize>()); 43 - let size = usize::from_ne_bytes(int_bytes.try_into().unwrap()); 41 + let size = usize::from_ne_bytes(bytes.try_into().unwrap()); 44 42 45 43 total_size += size; 46 44 } ··· 56 54 57 55 while let Some(chunk) = driver.next_chunk(256).await? { 58 56 for (_rkey, bytes) in chunk { 59 - 60 - let (int_bytes, _) = bytes.split_at(size_of::<usize>()); 61 - let size = usize::from_ne_bytes(int_bytes.try_into().unwrap()); 57 + let size = usize::from_ne_bytes(bytes.try_into().unwrap()); 62 58 63 59 total_size += size; 64 60 } ··· 91 87 pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk, noop}; 92 88 pub use mst::Commit; 93 89 94 - // pub use bytes::Bytes; 95 90 pub type Bytes = Vec<u8>; 96 91 97 92 pub(crate) use hashbrown::HashMap; 93 + 94 + #[doc = include_str!("../readme.md")] 95 + #[cfg(doctest)] 96 + pub struct ReadmeDoctests;