Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

just vec again

authored by

phil and committed by tangled.org 483f9d6c 6735d5c2

+40 -58
-2
Cargo.lock
··· 850 850 checksum = "b875f1dfe14f557f805b167fb9b0fc54c5560c7a4bd6ae02535b2846f276a8cb" 851 851 dependencies = [ 852 852 "byteorder-lite", 853 - "bytes", 854 853 "byteview", 855 854 "crossbeam-skiplist", 856 855 "enum_dispatch", ··· 1144 1143 name = "repo-stream" 1145 1144 version = "0.2.2" 1146 1145 dependencies = [ 1147 - "bytes", 1148 1146 "cid", 1149 1147 "clap", 1150 1148 "criterion",
+1 -2
Cargo.toml
··· 7 7 repository = "https://tangled.org/@microcosm.blue/repo-stream" 8 8 9 9 [dependencies] 10 - bytes = "1.11.0" 11 - fjall = { version = "3.0.1", default-features = false, features = ["bytes_1"] } 10 + fjall = { version = "3.0.1", default-features = false } 12 11 hashbrown = "0.16.1" 13 12 cid = { version = "0.11.1", features = ["serde"] } 14 13 iroh-car = "0.5.1"
+8 -11
benches/huge-car.rs
··· 22 22 let reader = tokio::fs::File::open(filename).await.unwrap(); 23 23 let reader = tokio::io::BufReader::new(reader); 24 24 25 - let mut driver = match Driver::load_car( 26 - reader, 27 - |block| block.len().to_le_bytes().to_vec().into(), 28 - 1024, 29 - ) 30 - .await 31 - .unwrap() 32 - { 33 - Driver::Memory(_, mem_driver) => mem_driver, 34 - Driver::Disk(_) => panic!("not doing disk for benchmark"), 35 - }; 25 + let mut driver = 26 + match Driver::load_car(reader, |block| block.len().to_le_bytes().to_vec(), 1024) 27 + .await 28 + .unwrap() 29 + { 30 + Driver::Memory(_, mem_driver) => mem_driver, 31 + Driver::Disk(_) => panic!("not doing disk for benchmark"), 32 + }; 36 33 37 34 let mut n = 0; 38 35 while let Some(pairs) = driver.next_chunk(256).await.unwrap() {
+7 -8
benches/non-huge-cars.rs
··· 29 29 } 30 30 31 31 async fn drive_car(bytes: &[u8]) -> usize { 32 - let mut driver = 33 - match Driver::load_car(bytes, |block| block.len().to_le_bytes().to_vec().into(), 32) 34 - .await 35 - .unwrap() 36 - { 37 - Driver::Memory(_, mem_driver) => mem_driver, 38 - Driver::Disk(_) => panic!("not benching big cars here"), 39 - }; 32 + let mut driver = match Driver::load_car(bytes, |block| block.len().to_le_bytes().to_vec(), 32) 33 + .await 34 + .unwrap() 35 + { 36 + Driver::Memory(_, mem_driver) => mem_driver, 37 + Driver::Disk(_) => panic!("not benching big cars here"), 38 + }; 40 39 41 40 let mut n = 0; 42 41 while let Some(pairs) = driver.next_chunk(256).await.unwrap() {
+1 -1
src/disk.rs
··· 17 17 ``` 18 18 */ 19 19 20 + use crate::Bytes; 20 21 use crate::drive::DriveError; 21 - use bytes::Bytes; 22 22 use fjall::config::{CompressionPolicy, PinningPolicy, RestartIntervalPolicy}; 23 23 use fjall::{CompressionType, Database, Error as FjallError, Keyspace, KeyspaceCreateOptions}; 24 24 use std::path::PathBuf;
+9 -20
src/drive.rs
··· 1 1 //! Consume a CAR from an AsyncRead, producing an ordered stream of records 2 2 3 + use crate::Bytes; 3 4 use crate::HashMap; 4 5 use crate::disk::{DiskError, DiskStore}; 5 6 use crate::mst::Node; 6 - use bytes::Bytes; 7 7 use cid::Cid; 8 8 use iroh_car::CarReader; 9 9 use std::convert::Infallible; ··· 21 21 BadBlock(#[from] serde_ipld_dagcbor::DecodeError<Infallible>), 22 22 #[error("The Commit block reference by the root was not found")] 23 23 MissingCommit, 24 - #[error("The MST block {0} could not be found")] 25 - MissingBlock(Cid), 26 24 #[error("Failed to walk the mst tree: {0}")] 27 25 WalkError(#[from] WalkError), 28 26 #[error("CAR file had no roots")] ··· 80 78 } 81 79 pub(crate) fn into_bytes(self) -> Bytes { 82 80 match self { 83 - MaybeProcessedBlock::Raw(b) => { 84 - let mut owned = b.try_into_mut().unwrap(); 85 - owned.extend_from_slice(&[0x00]); 86 - owned.into() 81 + MaybeProcessedBlock::Raw(mut b) => { 82 + b.push(0x00); 83 + b 87 84 } 88 - MaybeProcessedBlock::Processed(b) => { 89 - let mut owned = b.try_into_mut().unwrap(); 90 - owned.extend_from_slice(&[0x01]); 91 - owned.into() 85 + MaybeProcessedBlock::Processed(mut b) => { 86 + b.push(0x01); 87 + b 92 88 } 93 89 } 94 90 } 95 91 pub(crate) fn from_bytes(mut b: Bytes) -> Self { 96 92 // TODO: make sure bytes is not empty, that it's explicitly 0 or 1, etc 97 - let suffix = b.split_off(b.len() - 1); 98 - if *suffix == [0x00] { 93 + let suffix = b.pop().unwrap(); 94 + if suffix == 0x00 { 99 95 MaybeProcessedBlock::Raw(b) 100 96 } else { 101 97 MaybeProcessedBlock::Processed(b) ··· 292 288 for _ in 0..n { 293 289 // walk as far as we can until we run out of blocks or find a record 294 290 match self.walker.step(&mut self.blocks, self.process)? { 295 - Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)), 296 291 Step::Finish => break, 297 292 Step::Found { rkey, data } => { 298 293 out.push((rkey, data)); ··· 465 460 } 466 461 }; 467 462 match step { 468 - Step::Missing(cid) => { 469 - return (state, Err(DriveError::MissingBlock(cid))); 470 - } 471 463 Step::Finish => break, 472 464 Step::Found { rkey, data } => out.push((rkey, data)), 473 465 }; ··· 508 500 }; 509 501 510 502 match step { 511 - Step::Missing(cid) => { 512 - return tx.blocking_send(Err(DriveError::MissingBlock(cid))); 513 - } 514 503 Step::Finish => return Ok(()), 515 504 Step::Found { rkey, data } => { 516 505 out.push((rkey, data));
+3
src/lib.rs
··· 91 91 pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk, noop}; 92 92 pub use mst::Commit; 93 93 94 + // pub use bytes::Bytes; 95 + pub type Bytes = Vec<u8>; 96 + 94 97 pub(crate) use hashbrown::HashMap;
+11 -14
src/walk.rs
··· 1 1 //! Depth-first MST traversal 2 2 3 + use crate::Bytes; 3 4 use crate::HashMap; 4 5 use crate::disk::DiskStore; 5 6 use crate::drive::MaybeProcessedBlock; 6 7 use crate::mst::Node; 7 - use bytes::Bytes; 8 8 use cid::Cid; 9 9 use sha2::{Digest, Sha256}; 10 10 use std::convert::Infallible; ··· 20 20 MstError(#[from] MstError), 21 21 #[error("storage error: {0}")] 22 22 StorageError(#[from] fjall::Error), 23 + #[error("block not found: {0}")] 24 + MissingBlock(Cid), 23 25 } 24 26 25 27 /// Errors from invalid Rkeys ··· 44 46 /// Walker outputs 45 47 #[derive(Debug)] 46 48 pub enum Step { 47 - /// We needed this CID but it's not in the block store 48 - Missing(Cid), 49 49 /// Reached the end of the MST! yay! 50 50 Finish, 51 51 /// A record was found! ··· 189 189 &mut Need::Node { depth, cid } => { 190 190 log::trace!("need node {cid:?}"); 191 191 let Some(block) = blocks.remove(&cid) else { 192 - log::trace!("node not found, resting"); 193 - return Ok(Step::Missing(cid)); 192 + return Err(WalkError::MissingBlock(cid)); 194 193 }; 195 194 196 195 let MaybeProcessedBlock::Raw(data) = block else { ··· 209 208 log::trace!("need record {cid:?}"); 210 209 // note that we cannot *remove* a record block, sadly, since 211 210 // there can be multiple rkeys pointing to the same cid. 212 - let Some(data) = blocks.get_mut(cid) else { 213 - return Ok(Step::Missing(*cid)); 211 + let Some(data) = blocks.get(cid) else { 212 + return Err(WalkError::MissingBlock(*cid)); 214 213 }; 215 214 let rkey = rkey.clone(); 216 215 let data = match data { ··· 251 250 let cid_bytes = cid.to_bytes(); 252 251 log::trace!("need node {cid:?}"); 253 252 let Some(block_slice) = reader.get(&cid_bytes)? else { 254 - log::trace!("node not found, resting"); 255 - return Ok(Step::Missing(cid)); 253 + return Err(WalkError::MissingBlock(cid)); 256 254 }; 257 255 258 - let block = MaybeProcessedBlock::from_bytes(block_slice.into()); // TODO shouldn't fjalls slice already be bytes 256 + let block = MaybeProcessedBlock::from_bytes(block_slice.to_vec()); 259 257 260 258 let MaybeProcessedBlock::Raw(data) = block else { 261 259 return Err(WalkError::BadCommitFingerprint); ··· 273 271 log::trace!("need record {cid:?}"); 274 272 let cid_bytes = cid.to_bytes(); 275 273 let Some(data_slice) = reader.get(&cid_bytes)? else { 276 - log::trace!("record block not found, resting"); 277 - return Ok(Step::Missing(*cid)); 274 + return Err(WalkError::MissingBlock(*cid)); 278 275 }; 279 - let data = MaybeProcessedBlock::from_bytes(data_slice.into()); 276 + let data = MaybeProcessedBlock::from_bytes(data_slice.to_vec()); 280 277 let rkey = rkey.clone(); 281 278 let data = match data { 282 279 MaybeProcessedBlock::Raw(data) => process(data), 283 - MaybeProcessedBlock::Processed(t) => t.clone(), 280 + MaybeProcessedBlock::Processed(t) => t, 284 281 }; 285 282 286 283 // found node, make sure we remember