Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

update api for car slice handling

...car slice handling not yet actually implemented

phil 037cd9c6 9bfba193

+69 -58
+1 -1
benches/huge-car.rs
··· 33 33 let reader = tokio::io::BufReader::new(reader); 34 34 35 35 let mut driver = match Driver::load_car(reader, ser, 1024).await.unwrap() { 36 - Driver::Memory(_, mem_driver) => mem_driver, 36 + Driver::Memory(_, _, mem_driver) => mem_driver, 37 37 Driver::Disk(_) => panic!("not doing disk for benchmark"), 38 38 }; 39 39
+3 -3
benches/non-huge-cars.rs
··· 1 1 extern crate repo_stream; 2 - use repo_stream::Driver; 2 + use repo_stream::{Driver, Step}; 3 3 4 4 use criterion::{Criterion, criterion_group, criterion_main}; 5 5 ··· 40 40 41 41 async fn drive_car(bytes: &[u8]) -> usize { 42 42 let mut driver = match Driver::load_car(bytes, ser, 32).await.unwrap() { 43 - Driver::Memory(_, mem_driver) => mem_driver, 43 + Driver::Memory(_, _, mem_driver) => mem_driver, 44 44 Driver::Disk(_) => panic!("not benching big cars here"), 45 45 }; 46 46 47 47 let mut n = 0; 48 - while let Some(pairs) = driver.next_chunk(256).await.unwrap() { 48 + while let Step::Value(pairs) = driver.next_chunk(256).await.unwrap() { 49 49 n += pairs.len(); 50 50 } 51 51 n
+10 -7
examples/disk-read-file/main.rs
··· 9 9 static GLOBAL: MiMalloc = MiMalloc; 10 10 11 11 use clap::Parser; 12 - use repo_stream::{DiskBuilder, Driver, DriverBuilder}; 12 + use repo_stream::{DiskBuilder, Driver, DriverBuilder, Step}; 13 13 use std::path::PathBuf; 14 14 use std::time::Instant; 15 15 ··· 42 42 .load_car(reader) 43 43 .await? 44 44 { 45 - Driver::Memory(_, _) => panic!("try this on a bigger car"), 45 + Driver::Memory(_, _, _) => panic!("try this on a bigger car"), 46 46 Driver::Disk(big_stuff) => { 47 47 // we reach here if the repo was too big and needs to be spilled to 48 48 // disk to continue ··· 51 51 let disk_store = DiskBuilder::new().open(tmpfile).await?; 52 52 53 53 // do the spilling, get back a (similar) driver 54 - let (commit, driver) = big_stuff.finish_loading(disk_store).await?; 54 + let (commit, _, driver) = big_stuff.finish_loading(disk_store).await?; 55 55 56 56 // at this point you might want to fetch the account's signing key 57 57 // via the DID from the commit, and then verify the signature. ··· 74 74 // this example uses the disk driver's channel mode: the tree walking is 75 75 // spawned onto a blocking thread, and we get chunks of rkey+blocks back 76 76 let (mut rx, join) = driver.to_channel(512); 77 - while let Some(r) = rx.recv().await { 78 - let pairs = r?; 77 + while let Some(step) = rx.recv().await { 78 + let step = step?; 79 + let Step::Value(outputs) = step else { 80 + break; 81 + }; 79 82 80 83 // keep a count of the total number of blocks seen 81 - n += pairs.len(); 84 + n += outputs.len(); 82 85 83 - for output in pairs { 86 + for output in outputs { 84 87 // for each block, count how many bytes are equal to '0' 85 88 // (this is just an example, you probably want to do something more 86 89 // interesting)
+3 -3
examples/read-file/main.rs
··· 4 4 5 5 extern crate repo_stream; 6 6 use clap::Parser; 7 - use repo_stream::{Driver, DriverBuilder}; 7 + use repo_stream::{Driver, DriverBuilder, Step}; 8 8 use std::path::PathBuf; 9 9 10 10 type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; ··· 28 28 .load_car(reader) 29 29 .await? 30 30 { 31 - Driver::Memory(commit, mem_driver) => (commit, mem_driver), 31 + Driver::Memory(commit, _, mem_driver) => (commit, mem_driver), 32 32 Driver::Disk(_) => panic!("this example doesn't handle big CARs"), 33 33 }; 34 34 35 35 log::info!("got commit: {commit:?}"); 36 36 37 37 let mut n = 0; 38 - while let Some(pairs) = driver.next_chunk(256).await? { 38 + while let Step::Value(pairs) = driver.next_chunk(256).await? { 39 39 n += pairs.len(); 40 40 // log::info!("got {rkey:?}"); 41 41 }
+5 -5
readme.md
··· 11 11 [sponsor-badge]: https://img.shields.io/badge/at-microcosm-b820f9?labelColor=b820f9&logo=githubsponsors&logoColor=fff 12 12 13 13 ```rust no_run 14 - use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder, Output}; 14 + use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder, Output, Step}; 15 15 16 16 #[tokio::main] 17 17 async fn main() -> Result<(), Box<dyn std::error::Error>> { ··· 31 31 { 32 32 33 33 // if all blocks fit within memory 34 - Driver::Memory(_commit, mut driver) => { 35 - while let Some(chunk) = driver.next_chunk(256).await? { 34 + Driver::Memory(_commit, _prev_rkey, mut driver) => { 35 + while let Step::Value(chunk) = driver.next_chunk(256).await? { 36 36 for Output { rkey: _, cid: _, data } in chunk { 37 37 let size = usize::from_ne_bytes(data.try_into().unwrap()); 38 38 total_size += size; ··· 45 45 // set up a disk store we can spill to 46 46 let store = DiskBuilder::new().open("some/path.db".into()).await?; 47 47 // do the spilling, get back a (similar) driver 48 - let (_commit, mut driver) = paused.finish_loading(store).await?; 48 + let (_commit, _prev_rkey, mut driver) = paused.finish_loading(store).await?; 49 49 50 - while let Some(chunk) = driver.next_chunk(256).await? { 50 + while let Step::Value(chunk) = driver.next_chunk(256).await? { 51 51 for Output { rkey: _, cid: _, data } in chunk { 52 52 let size = usize::from_ne_bytes(data.try_into().unwrap()); 53 53 total_size += size;
+26 -23
src/drive.rs
··· 1 1 //! Consume a CAR from an AsyncRead, producing an ordered stream of records 2 2 3 3 use crate::{ 4 - Bytes, HashMap, 4 + Bytes, HashMap, Rkey, Step, 5 5 disk::{DiskError, DiskStore}, 6 6 mst::MstNode, 7 7 walk::Output, ··· 107 107 /// 108 108 /// You probably want to check the commit's signature. You can go ahead and 109 109 /// walk the MST right away. 110 - Memory(Commit, MemDriver), 110 + Memory(Commit, Option<Rkey>, MemDriver), 111 111 /// Blocks exceed the memory limit 112 112 /// 113 113 /// You'll need to provide a disk storage to continue. The commit will be ··· 237 237 238 238 Ok(Driver::Memory( 239 239 commit, 240 + None, 240 241 MemDriver { 241 242 blocks: mem_blocks, 242 243 walker, ··· 268 269 269 270 impl MemDriver { 270 271 /// Step through the record outputs, in rkey order 271 - pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk>, DriveError> { 272 + pub async fn next_chunk(&mut self, n: usize) -> Result<Step<BlockChunk>, DriveError> { 272 273 let mut out = Vec::with_capacity(n); 273 274 for _ in 0..n { 274 275 // walk as far as we can until we run out of blocks or find a record 275 - let Some(output) = self.walker.step(&mut self.blocks, self.process)? else { 276 + let Step::Value(output) = self.walker.step(&mut self.blocks, self.process)? else { 276 277 break; 277 278 }; 278 279 out.push(output); 279 280 } 280 281 if out.is_empty() { 281 - Ok(None) 282 + Ok(Step::End(None)) 282 283 } else { 283 - Ok(Some(out)) 284 + Ok(Step::Value(out)) 284 285 } 285 286 } 286 287 } ··· 299 300 pub async fn finish_loading( 300 301 mut self, 301 302 mut store: DiskStore, 302 - ) -> Result<(Commit, DiskDriver), DriveError> { 303 + ) -> Result<(Commit, Option<Rkey>, DiskDriver), DriveError> { 303 304 // move store in and back out so we can manage lifetimes 304 305 // dump mem blocks into the store 305 306 store = tokio::task::spawn(async move { ··· 385 386 386 387 Ok(( 387 388 commit, 389 + None, 388 390 DiskDriver { 389 391 process: self.process, 390 392 state: Some(BigState { store, walker }), ··· 417 419 /// Walk the MST returning up to `n` rkey + record pairs 418 420 /// 419 421 /// ```no_run 420 - /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, noop}; 422 + /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, Step, noop}; 421 423 /// # #[tokio::main] 422 424 /// # async fn main() -> Result<(), DriveError> { 423 425 /// # let mut disk_driver = _get_fake_disk_driver(); 424 - /// while let Some(pairs) = disk_driver.next_chunk(256).await? { 425 - /// for output in pairs { 426 + /// while let Step::Value(outputs) = disk_driver.next_chunk(256).await? { 427 + /// for output in outputs { 426 428 /// println!("{}: size={}", output.rkey, output.data.len()); 427 429 /// } 428 430 /// } 429 431 /// # Ok(()) 430 432 /// # } 431 433 /// ``` 432 - pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk>, DriveError> { 434 + pub async fn next_chunk(&mut self, n: usize) -> Result<Step<Vec<Output>>, DriveError> { 433 435 let process = self.process; 434 436 435 437 // state should only *ever* be None transiently while inside here ··· 450 452 return (state, Err(e.into())); 451 453 } 452 454 }; 453 - let Some(output) = step else { 455 + let Step::Value(output) = step else { 454 456 break; 455 457 }; 456 458 out.push(output); ··· 466 468 let out = res?; 467 469 468 470 if out.is_empty() { 469 - Ok(None) 471 + Ok(Step::End(None)) 470 472 } else { 471 - Ok(Some(out)) 473 + Ok(Step::Value(out)) 472 474 } 473 475 } 474 476 475 477 fn read_tx_blocking( 476 478 &mut self, 477 479 n: usize, 478 - tx: mpsc::Sender<Result<BlockChunk, DriveError>>, 479 - ) -> Result<(), mpsc::error::SendError<Result<BlockChunk, DriveError>>> { 480 + tx: mpsc::Sender<Result<Step<BlockChunk>, DriveError>>, 481 + ) -> Result<(), mpsc::error::SendError<Result<Step<BlockChunk>, DriveError>>> { 480 482 let BigState { store, walker } = self.state.as_mut().expect("valid state"); 481 483 482 484 loop { ··· 490 492 Err(e) => return tx.blocking_send(Err(e.into())), 491 493 }; 492 494 493 - let Some(output) = step else { 495 + let Step::Value(output) = step else { 494 496 break; 495 497 }; 496 498 out.push(output); ··· 499 501 if out.is_empty() { 500 502 break; 501 503 } 502 - tx.blocking_send(Ok(out))?; 504 + tx.blocking_send(Ok(Step::Value(out)))?; 503 505 } 504 506 505 507 Ok(()) ··· 516 518 /// benefit over just using `.next_chunk(n)`. 517 519 /// 518 520 /// ```no_run 519 - /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, noop}; 521 + /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, Step, noop}; 520 522 /// # #[tokio::main] 521 523 /// # async fn main() -> Result<(), DriveError> { 522 524 /// # let mut disk_driver = _get_fake_disk_driver(); 523 525 /// let (mut rx, join) = disk_driver.to_channel(512); 524 526 /// while let Some(recvd) = rx.recv().await { 525 - /// let pairs = recvd?; 526 - /// for output in pairs { 527 + /// let outputs = recvd?; 528 + /// let Step::Value(outputs) = outputs else { break; }; 529 + /// for output in outputs { 527 530 /// println!("{}: size={}", output.rkey, output.data.len()); 528 531 /// } 529 532 /// ··· 535 538 mut self, 536 539 n: usize, 537 540 ) -> ( 538 - mpsc::Receiver<Result<BlockChunk, DriveError>>, 541 + mpsc::Receiver<Result<Step<BlockChunk>, DriveError>>, 539 542 tokio::task::JoinHandle<Self>, 540 543 ) { 541 - let (tx, rx) = mpsc::channel::<Result<BlockChunk, DriveError>>(1); 544 + let (tx, rx) = mpsc::channel::<Result<Step<BlockChunk>, DriveError>>(1); 542 545 543 546 // sketch: this worker is going to be allowed to execute without a join handle 544 547 let chan_task = tokio::task::spawn_blocking(move || {
+6 -6
src/lib.rs
··· 18 18 `iroh_car` additionally applies a block size limit of `2MiB`. 19 19 20 20 ``` 21 - use repo_stream::{Driver, DriverBuilder, DiskBuilder}; 21 + use repo_stream::{Driver, DriverBuilder, DiskBuilder, Step}; 22 22 23 23 # #[tokio::main] 24 24 # async fn main() -> Result<(), Box<dyn std::error::Error>> { ··· 35 35 { 36 36 37 37 // if all blocks fit within memory 38 - Driver::Memory(_commit, mut driver) => { 39 - while let Some(chunk) = driver.next_chunk(256).await? { 38 + Driver::Memory(_commit, _prev_rkey, mut driver) => { 39 + while let Step::Value(chunk) = driver.next_chunk(256).await? { 40 40 for output in chunk { 41 41 let size = usize::from_ne_bytes(output.data.try_into().unwrap()); 42 42 ··· 50 50 // set up a disk store we can spill to 51 51 let store = DiskBuilder::new().open("some/path.db".into()).await?; 52 52 // do the spilling, get back a (similar) driver 53 - let (_commit, mut driver) = paused.finish_loading(store).await?; 53 + let (_commit, _prev_rkey, mut driver) = paused.finish_loading(store).await?; 54 54 55 - while let Some(chunk) = driver.next_chunk(256).await? { 55 + while let Step::Value(chunk) = driver.next_chunk(256).await? { 56 56 for output in chunk { 57 57 let size = usize::from_ne_bytes(output.data.try_into().unwrap()); 58 58 ··· 86 86 pub use disk::{DiskBuilder, DiskError, DiskStore}; 87 87 pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk, noop}; 88 88 pub use mst::Commit; 89 - pub use walk::Output; 89 + pub use walk::{Output, Step}; 90 90 91 91 pub type Bytes = Vec<u8>; 92 92
+12 -6
src/walk.rs
··· 41 41 pub data: Bytes, 42 42 } 43 43 44 + #[derive(Debug, PartialEq)] 45 + pub enum Step<T = Output> { 46 + Value(T), 47 + End(Option<Rkey>), 48 + } 49 + 44 50 /// Traverser of an atproto MST 45 51 /// 46 52 /// Walks the tree from left-to-right in depth-first order ··· 134 140 &mut self, 135 141 blocks: &mut HashMap<Cid, MaybeProcessedBlock>, 136 142 process: impl Fn(Bytes) -> Bytes, 137 - ) -> Result<Option<Output>, WalkError> { 143 + ) -> Result<Step, WalkError> { 138 144 while let Some(NodeThing { cid, kind }) = self.next_todo() { 139 145 let Some(mpb) = blocks.get(&cid) else { 140 146 return Err(WalkError::MissingBlock(cid)); 141 147 }; 142 148 if let Some(out) = self.mpb_step(kind, cid, mpb, &process)? { 143 - return Ok(Some(out)); 149 + return Ok(Step::Value(out)); 144 150 } 145 151 } 146 - Ok(None) 152 + Ok(Step::End(None)) 147 153 } 148 154 149 155 /// blocking!!!!!! ··· 151 157 &mut self, 152 158 blocks: &mut DiskStore, 153 159 process: impl Fn(Bytes) -> Bytes, 154 - ) -> Result<Option<Output>, WalkError> { 160 + ) -> Result<Step, WalkError> { 155 161 while let Some(NodeThing { cid, kind }) = self.next_todo() { 156 162 let Some(block_slice) = blocks.get(&cid.to_bytes())? else { 157 163 return Err(WalkError::MissingBlock(cid)); 158 164 }; 159 165 let mpb = MaybeProcessedBlock::from_bytes(block_slice.to_vec()); 160 166 if let Some(out) = self.mpb_step(kind, cid, &mpb, &process)? { 161 - return Ok(Some(out)); 167 + return Ok(Step::Value(out)); 162 168 } 163 169 } 164 - Ok(None) 170 + Ok(Step::End(None)) 165 171 } 166 172 }
+3 -4
tests/non-huge-cars.rs
··· 1 1 extern crate repo_stream; 2 - use repo_stream::Driver; 3 - use repo_stream::Output; 2 + use repo_stream::{Driver, Output, Step}; 4 3 5 4 const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car"); 6 5 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); ··· 21 20 .await 22 21 .unwrap() 23 22 { 24 - Driver::Memory(_commit, mem_driver) => mem_driver, 23 + Driver::Memory(_commit, _, mem_driver) => mem_driver, 25 24 Driver::Disk(_) => panic!("too big"), 26 25 }; 27 26 ··· 30 29 let mut found_bsky_profile = false; 31 30 let mut prev_rkey = "".to_string(); 32 31 33 - while let Some(pairs) = driver.next_chunk(256).await.unwrap() { 32 + while let Step::Value(pairs) = driver.next_chunk(256).await.unwrap() { 34 33 for Output { rkey, cid: _, data } in pairs { 35 34 records += 1; 36 35