Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

derive some clones and impl Processable for things

phil 4f4e8566 23c830d5

+24 -6
+1 -1
Cargo.lock
··· 1024 1024 1025 1025 [[package]] 1026 1026 name = "repo-stream" 1027 - version = "0.2.1" 1027 + version = "0.2.2" 1028 1028 dependencies = [ 1029 1029 "bincode", 1030 1030 "clap",
+1 -1
Cargo.toml
··· 1 1 [package] 2 2 name = "repo-stream" 3 - version = "0.2.1" 3 + version = "0.2.2" 4 4 edition = "2024" 5 5 license = "MIT OR Apache-2.0" 6 6 description = "A robust CAR file -> MST walker for atproto"
+2 -1
src/disk.rs
··· 53 53 } 54 54 55 55 /// Builder-style disk store setup 56 + #[derive(Debug, Clone)] 56 57 pub struct DiskBuilder { 57 58 /// Database in-memory cache allowance 58 59 /// ··· 96 97 self 97 98 } 98 99 /// Open and initialize the actual disk storage 99 - pub async fn open(self, path: PathBuf) -> Result<DiskStore, DiskError> { 100 + pub async fn open(&self, path: PathBuf) -> Result<DiskStore, DiskError> { 100 101 DiskStore::new(path, self.cache_size_mb, self.max_stored_mb).await 101 102 } 102 103 }
+4 -2
src/drive.rs
··· 116 116 } 117 117 118 118 /// Builder-style driver setup 119 + #[derive(Debug, Clone)] 119 120 pub struct DriverBuilder { 120 121 pub mem_limit_mb: usize, 121 122 } ··· 153 154 } 154 155 /// Begin processing an atproto MST from a CAR file 155 156 pub async fn load_car<R: AsyncRead + Unpin>( 156 - self, 157 + &self, 157 158 reader: R, 158 159 ) -> Result<Driver<R, Vec<u8>>, DriveError> { 159 160 Driver::load_car(reader, crate::process::noop, self.mem_limit_mb).await ··· 163 164 /// Builder-style driver intermediate step 164 165 /// 165 166 /// start from `DriverBuilder` 167 + #[derive(Debug, Clone)] 166 168 pub struct DriverBuilderWithProcessor<T: Processable> { 167 169 pub mem_limit_mb: usize, 168 170 pub block_processor: fn(Vec<u8>) -> T, ··· 178 180 } 179 181 /// Begin processing an atproto MST from a CAR file 180 182 pub async fn load_car<R: AsyncRead + Unpin>( 181 - self, 183 + &self, 182 184 reader: R, 183 185 ) -> Result<Driver<R, T>, DriveError> { 184 186 Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await
+1 -1
src/lib.rs
··· 82 82 pub mod process; 83 83 84 84 pub use disk::{DiskBuilder, DiskError, DiskStore}; 85 - pub use drive::{DriveError, Driver, DriverBuilder}; 85 + pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk}; 86 86 pub use mst::Commit; 87 87 pub use process::Processable;
+15
src/process.rs
··· 85 85 direct_size + items_referenced_size 86 86 } 87 87 } 88 + 89 + impl<Item: Processable> Processable for Option<Item> { 90 + fn get_size(&self) -> usize { 91 + self.as_ref().map(|item| item.get_size()).unwrap_or(0) 92 + } 93 + } 94 + 95 + impl<Item: Processable, Error: Processable> Processable for Result<Item, Error> { 96 + fn get_size(&self) -> usize { 97 + match self { 98 + Ok(item) => item.get_size(), 99 + Err(err) => err.get_size(), 100 + } 101 + } 102 + }