Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

Nicer disk reading api and more api stuff and

phil 03cb98aa 98d40b68

+180 -64
+3 -5
benches/huge-car.rs
··· 22 22 let reader = tokio::fs::File::open(filename).await.unwrap(); 23 23 let reader = tokio::io::BufReader::new(reader); 24 24 25 - let mb = 2_usize.pow(20); 26 - 27 - let mut driver = match Driver::load_car(reader, |block| block.len(), 1024 * mb) 25 + let mut driver = match Driver::load_car(reader, |block| block.len(), 1024) 28 26 .await 29 27 .unwrap() 30 28 { 31 - Driver::Lil(_, mem_driver) => mem_driver, 32 - Driver::Big(_) => panic!("not doing disk for benchmark"), 29 + Driver::Memory(_, mem_driver) => mem_driver, 30 + Driver::Disk(_) => panic!("not doing disk for benchmark"), 33 31 }; 34 32 35 33 let mut n = 0;
+3 -3
benches/non-huge-cars.rs
··· 25 25 } 26 26 27 27 async fn drive_car(bytes: &[u8]) -> usize { 28 - let mut driver = match Driver::load_car(bytes, |block| block.len(), 32 * 2_usize.pow(20)) 28 + let mut driver = match Driver::load_car(bytes, |block| block.len(), 32) 29 29 .await 30 30 .unwrap() 31 31 { 32 - Driver::Lil(_, mem_driver) => mem_driver, 33 - Driver::Big(_) => panic!("not benching big cars here"), 32 + Driver::Memory(_, mem_driver) => mem_driver, 33 + Driver::Disk(_) => panic!("not benching big cars here"), 34 34 }; 35 35 36 36 let mut n = 0;
+12 -5
examples/disk-read-file/main.rs
··· 1 + /*! 2 + Read a CAR file by spilling to disk 3 + */ 4 + 1 5 extern crate repo_stream; 2 6 use clap::Parser; 3 - use repo_stream::{Driver, noop}; 7 + use repo_stream::{Driver, process::noop}; 4 8 use std::path::PathBuf; 5 9 6 10 #[derive(Debug, Parser)] ··· 24 28 25 29 // configure how much memory can be used before spilling to disk. 26 30 // real memory usage may differ somewhat. 27 - let in_mem_limit = 10 * 2_usize.pow(20); 31 + let in_mem_limit = 10; // MiB 28 32 29 33 // configure how much memory sqlite is allowed to use when dumping to disk 30 - let db_cache_mb = 32; 34 + let db_cache_mb = 32; // MiB 31 35 32 36 log::info!("hello! reading the car..."); 33 37 34 38 // in this example we only bother handling CARs that are too big for memory 35 39 // `noop` helper means: do no block processing, store the raw blocks 36 40 let driver = match Driver::load_car(reader, noop, in_mem_limit).await? { 37 - Driver::Lil(_, _) => panic!("try this on a bigger car"), 38 - Driver::Big(big_stuff) => { 41 + Driver::Memory(_, _) => panic!("try this on a bigger car"), 42 + Driver::Disk(big_stuff) => { 39 43 // we reach here if the repo was too big and needs to be spilled to 40 44 // disk to continue 41 45 ··· 80 84 81 85 log::info!("arrived! joining rx..."); 82 86 87 + // clean up the database. would be nice to do this in drop so it happens 88 + // automatically, but some blocking work happens, so that's not allowed in 89 + // async rust. 🤷‍♀️ 83 90 join.await?.reset_store().await?; 84 91 85 92 log::info!("done. n={n} zeros={zeros}");
+8 -4
examples/read-file/main.rs
··· 1 + /*! 2 + Read a CAR file with in-memory processing 3 + */ 4 + 1 5 extern crate repo_stream; 2 6 use clap::Parser; 3 7 use repo_stream::Driver; ··· 20 24 let reader = tokio::io::BufReader::new(reader); 21 25 22 26 let (commit, mut driver) = 23 - match Driver::load_car(reader, |block| block.len(), 16 * 1024 * 1024).await? { 24 - Driver::Lil(commit, mem_driver) => (commit, mem_driver), 25 - Driver::Big(_) => panic!("can't handle big cars yet"), 27 + match Driver::load_car(reader, |block| block.len(), 16 /* MiB */).await? { 28 + Driver::Memory(commit, mem_driver) => (commit, mem_driver), 29 + Driver::Disk(_) => panic!("this example doesn't handle big CARs"), 26 30 }; 27 31 28 32 log::info!("got commit: {commit:?}"); ··· 32 36 n += pairs.len(); 33 37 // log::info!("got {rkey:?}"); 34 38 } 35 - log::info!("bye! {n}"); 39 + log::info!("bye! total records={n}"); 36 40 37 41 Ok(()) 38 42 }
+1 -1
src/disk.rs
··· 43 43 // let insert_stmt = tx.prepare("INSERT INTO blocks (key, val) VALUES (?1, ?2)")?; 44 44 Ok(SqliteWriter { tx }) 45 45 } 46 - pub fn get_reader(&'_ self) -> Result<SqliteReader<'_>, rusqlite::Error> { 46 + pub fn get_reader<'conn>(&'conn self) -> Result<SqliteReader<'conn>, rusqlite::Error> { 47 47 let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?; 48 48 Ok(SqliteReader { select_stmt }) 49 49 }
+83 -37
src/drive.rs
··· 101 101 } 102 102 103 103 pub enum Driver<R: AsyncRead + Unpin, T: Processable> { 104 - Lil(Commit, MemDriver<T>), 105 - Big(BigCar<R, T>), 104 + Memory(Commit, MemDriver<T>), 105 + Disk(BigCar<R, T>), 106 106 } 107 107 108 108 impl<R: AsyncRead + Unpin, T: Processable> Driver<R, T> { 109 109 pub async fn load_car( 110 110 reader: R, 111 111 process: fn(Vec<u8>) -> T, 112 - max_size: usize, 112 + max_size_mb: usize, 113 113 ) -> Result<Driver<R, T>, DriveError> { 114 + let max_size = max_size_mb * 2_usize.pow(20); 114 115 let mut mem_blocks = HashMap::new(); 115 116 116 117 let mut car = CarReader::new(reader).await?; ··· 142 143 mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size(); 143 144 mem_blocks.insert(cid, maybe_processed); 144 145 if mem_size >= max_size { 145 - return Ok(Driver::Big(BigCar { 146 + return Ok(Driver::Disk(BigCar { 146 147 car, 147 148 root, 148 149 process, ··· 158 159 159 160 let walker = Walker::new(commit.data); 160 161 161 - Ok(Driver::Lil( 162 + Ok(Driver::Memory( 162 163 commit, 163 164 MemDriver { 164 165 blocks: mem_blocks, ··· 321 322 commit, 322 323 BigCarReady { 323 324 process: self.process, 324 - store, 325 - walker, 325 + state: Some(BigState { store, walker }), 326 326 }, 327 327 )) 328 328 } 329 329 } 330 330 331 + struct BigState { 332 + store: SqliteStore, 333 + walker: Walker, 334 + } 335 + 331 336 pub struct BigCarReady<T: Clone> { 332 337 process: fn(Vec<u8>) -> T, 333 - store: SqliteStore, 334 - walker: Walker, 338 + state: Option<BigState>, 335 339 } 336 340 337 341 impl<T: Processable + Send + 'static> BigCarReady<T> { 338 - pub async fn next_chunk( 339 - mut self, 340 - n: usize, 341 - ) -> Result<(Self, Option<BlockChunk<T>>), DriveError> { 342 - let mut out = Vec::with_capacity(n); 343 - (self, out) = tokio::task::spawn_blocking(move || { 344 - let store = self.store; 345 - let mut reader = store.get_reader()?; 342 + pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> { 343 + let process = self.process; 344 + 345 + // state should only *ever* be None transiently while inside here 346 + let mut state = self 347 + .state 348 + .take() 349 + .expect("BigCarReady must have Some(state)"); 346 350 347 - for _ in 0..n { 348 - // walk as far as we can until we run out of blocks or find a record 349 - match self.walker.disk_step(&mut reader, self.process)? { 350 - Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)), 351 - Step::Finish => break, 352 - Step::Found { rkey, data } => { 353 - out.push((rkey, data)); 354 - continue; 351 + // the big pain here is that we don't want to leave self.state in an 352 + // invalid state (None), so all the error paths have to make sure it 353 + // comes out again. 354 + let (state, res) = tokio::task::spawn_blocking( 355 + move || -> (BigState, Result<BlockChunk<T>, DriveError>) { 356 + let mut reader_res = state.store.get_reader(); 357 + let reader: &mut _ = match reader_res { 358 + Ok(ref mut r) => r, 359 + Err(ref mut e) => { 360 + // unfortunately we can't return the error directly because 361 + // (for some reason) it's attached to the lifetime of the 362 + // reader? 363 + // hack a mem::swap so we can get it out :/ 364 + let mut e_swapped = 365 + rusqlite::Error::InvalidParameterName("this error was stolen".into()); 366 + std::mem::swap(e, &mut e_swapped); 367 + // the pain: `state` *has to* outlive the reader 368 + drop(reader_res); 369 + return (state, Err(e_swapped.into())); 355 370 } 356 371 }; 357 - } 358 372 359 - drop(reader); // cannot outlive store 360 - self.store = store; 361 - Ok::<_, DriveError>((self, out)) 362 - }) 363 - .await??; 373 + let mut out = Vec::with_capacity(n); 374 + 375 + for _ in 0..n { 376 + // walk as far as we can until we run out of blocks or find a record 377 + let step = match state.walker.disk_step(reader, process) { 378 + Ok(s) => s, 379 + Err(e) => { 380 + // the pain: `state` *has to* outlive the reader 381 + drop(reader_res); 382 + return (state, Err(e.into())); 383 + } 384 + }; 385 + match step { 386 + Step::Missing(cid) => { 387 + // the pain: `state` *has to* outlive the reader 388 + drop(reader_res); 389 + return (state, Err(DriveError::MissingBlock(cid))); 390 + } 391 + Step::Finish => break, 392 + Step::Found { rkey, data } => out.push((rkey, data)), 393 + }; 394 + } 395 + 396 + // `state` *has to* outlive the reader 397 + drop(reader_res); 398 + 399 + (state, Ok::<_, DriveError>(out)) 400 + }, 401 + ) 402 + .await?; // on tokio JoinError, we'll be left with invalid state :( 403 + 404 + // *must* restore state before dealing with the actual result 405 + self.state = Some(state); 406 + 407 + let out = res?; 364 408 365 409 if out.is_empty() { 366 - Ok((self, None)) 410 + Ok(None) 367 411 } else { 368 - Ok((self, Some(out))) 412 + Ok(Some(out)) 369 413 } 370 414 } 371 415 ··· 374 418 n: usize, 375 419 tx: mpsc::Sender<Result<BlockChunk<T>, DriveError>>, 376 420 ) -> Result<(), mpsc::error::SendError<Result<BlockChunk<T>, DriveError>>> { 377 - let mut reader = match self.store.get_reader() { 421 + let BigState { store, walker } = self.state.as_mut().expect("valid state"); 422 + let mut reader = match store.get_reader() { 378 423 Ok(r) => r, 379 424 Err(e) => return tx.blocking_send(Err(e.into())), 380 425 }; ··· 385 430 for _ in 0..n { 386 431 // walk as far as we can until we run out of blocks or find a record 387 432 388 - let step = match self.walker.disk_step(&mut reader, self.process) { 433 + let step = match walker.disk_step(&mut reader, self.process) { 389 434 Ok(s) => s, 390 435 Err(e) => return tx.blocking_send(Err(e.into())), 391 436 }; ··· 433 478 434 479 pub async fn reset_store(mut self) -> Result<SqliteStore, DriveError> { 435 480 tokio::task::spawn_blocking(move || { 436 - self.store.reset()?; 437 - Ok(self.store) 481 + let BigState { mut store, .. } = self.state.take().expect("valid state"); 482 + store.reset()?; 483 + Ok(store) 438 484 }) 439 485 .await? 440 486 }
+67 -4
src/lib.rs
··· 1 - //! Fast and robust atproto CAR file processing in rust 2 - //! 3 - //! For now see the [examples](https://tangled.org/@microcosm.blue/repo-stream/tree/main/examples) 1 + /*! 2 + A robust CAR file -> MST walker for atproto 3 + 4 + Small CARs have their blocks buffered in memory. If a configurable memory limit 5 + is reached while reading blocks, CAR reading is suspended, and can be continued 6 + by providing disk storage to buffer the CAR blocks instead. 7 + 8 + A `process` function can be provided for tasks where records are transformed 9 + into a smaller representation, to save memory (and disk) during block reading. 10 + 11 + Once blocks are loaded, the MST is walked and emitted as chunks of pairs of 12 + `(rkey, processed_block)` pairs, in order (depth first, left-to-right). 13 + 14 + Some MST validations are applied 15 + - Keys must appear in order 16 + - Keys must be at the correct MST tree depth 17 + 18 + `iroh_car` additionally applies a block size limit of `2MiB`. 19 + 20 + ``` 21 + use repo_stream::{Driver, SqliteStore}; 22 + 23 + # #[tokio::main] 24 + # async fn main() -> Result<(), Box<dyn std::error::Error>> { 25 + # let reader = include_bytes!("../car-samples/tiny.car").as_slice(); 26 + let mut total_size = 0; 27 + let process = |rec: Vec<u8>| rec.len(); // block processing: just extract the size 28 + let in_mem_limit = 10; /* MiB */ 29 + let db_cache_size = 32; /* MiB */ 30 + 31 + match Driver::load_car(reader, process, in_mem_limit).await? { 32 + 33 + // if all blocks fit within memory 34 + Driver::Memory(_commit, mut driver) => { 35 + while let Some(chunk) = driver.next_chunk(256).await? { 36 + for (_rkey, size) in chunk { 37 + total_size += size; 38 + } 39 + } 40 + }, 41 + 42 + // if the CAR was too big for in-memory processing 43 + Driver::Disk(paused) => { 44 + // set up a disk store we can spill to 45 + let store = SqliteStore::new("some/path.sqlite".into(), db_cache_size).await?; 46 + // do the spilling, get back a (similar) driver 47 + let (_commit, mut driver) = paused.finish_loading(store).await?; 48 + 49 + while let Some(chunk) = driver.next_chunk(256).await? { 50 + for (_rkey, size) in chunk { 51 + total_size += size; 52 + } 53 + } 54 + 55 + // clean up the disk store (drop tables etc) 56 + driver.reset_store().await?; 57 + } 58 + }; 59 + println!("sum of size of all records: {total_size}"); 60 + # Ok(()) 61 + # } 62 + ``` 63 + 64 + Find more [examples in the repo](https://tangled.org/@microcosm.blue/repo-stream/tree/main/examples). 65 + 66 + */ 4 67 5 68 mod mst; 6 69 mod walk; ··· 11 74 12 75 pub use disk::SqliteStore; 13 76 pub use drive::{DriveError, Driver}; 14 - pub use process::{Processable, noop}; 77 + pub use process::Processable;
+3 -5
tests/non-huge-cars.rs
··· 6 6 const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car"); 7 7 8 8 async fn test_car(bytes: &[u8], expected_records: usize, expected_sum: usize) { 9 - let mb = 2_usize.pow(20); 10 - 11 - let mut driver = match Driver::load_car(bytes, |block| block.len(), 10 * mb) 9 + let mut driver = match Driver::load_car(bytes, |block| block.len(), 10 /* MiB */) 12 10 .await 13 11 .unwrap() 14 12 { 15 - Driver::Lil(_commit, mem_driver) => mem_driver, 16 - Driver::Big(_) => panic!("too big"), 13 + Driver::Memory(_commit, mem_driver) => mem_driver, 14 + Driver::Disk(_) => panic!("too big"), 17 15 }; 18 16 19 17 let mut records = 0;