Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

continue loading in mem

gives a chance to move into a spawn_blocking and perhaps limit big-car concurrency

phil 5dc34770 1bb9664b

+66 -7
+1 -1
Cargo.lock
··· 2599 2599 2600 2600 [[package]] 2601 2601 name = "repo-stream" 2602 - version = "0.5.0-alpha.1" 2602 + version = "0.5.0-alpha.2" 2603 2603 dependencies = [ 2604 2604 "async-channel", 2605 2605 "cid",
+6 -6
Cargo.toml
··· 1 1 [package] 2 2 name = "repo-stream" 3 - version = "0.5.0-alpha.1" 3 + version = "0.5.0-alpha.2" 4 4 edition = "2024" 5 5 license = "MIT OR Apache-2.0" 6 6 description = "Fast and robust atproto CAR file processing" 7 7 repository = "https://tangled.org/@microcosm.blue/repo-stream" 8 8 9 9 [dependencies] 10 + cid = { version = "0.11.1", features = ["serde"] } 10 11 fjall = { version = "3.0.1", default-features = false } 11 - jacquard-repo = { path = "../jacquard/crates/jacquard-repo", optional = true } 12 12 hashbrown = { version = "0.16.1", optional = true } 13 - cid = { version = "0.11.1", features = ["serde"] } 14 13 iroh-car = "0.5.1" 14 + jacquard-repo = { path = "../jacquard/crates/jacquard-repo", optional = true } 15 15 log = "0.4.28" 16 16 serde = { version = "1.0.228", features = ["derive"] } 17 17 serde_bytes = "0.11.19" ··· 26 26 jacquard = ["dep:jacquard-repo"] 27 27 28 28 [dev-dependencies] 29 + async-channel = "2.5.0" 29 30 clap = { version = "4.5.48", features = ["derive"] } 30 31 criterion = { version = "0.7.0", features = ["async_tokio"] } 31 32 env_logger = "0.11.8" 33 + hmac-sha256 = "1.1.12" 34 + mimalloc = "0.1.48" 32 35 multibase = "0.9.2" 33 36 tempfile = "3.23.0" 34 37 tokio = { version = "1.47.1", features = ["full"] } 35 - mimalloc = "0.1.48" 36 - hmac-sha256 = "1.1.12" 37 - async-channel = "2.5.0" 38 38 39 39 [profile.profiling] 40 40 inherits = "release"
+59
src/mem.rs
··· 338 338 // --------------------------------------------------------------------------- 339 339 340 340 impl<R: AsyncRead + Unpin> PartialCar<R> { 341 + /// Attempt to finish loading into memory with a raised limit. 342 + /// 343 + /// Resumes reading from where `load_car` stopped and applies `new_limit_mb` 344 + /// as the new ceiling. Returns a ready [`MemCar`] if all remaining blocks fit. 345 + /// 346 + /// If the new limit is also exceeded, returns 347 + /// `Err(LoadError::MemoryLimitReached(partial))` with the updated partial 348 + /// state, so the caller can raise the limit again or fall back to disk via 349 + /// [`finish_loading`]. 350 + pub async fn continue_loading(mut self, new_limit_mb: usize) -> Result<MemCar, LoadError<R>> { 351 + let new_max_size = new_limit_mb * 2_usize.pow(20); 352 + 353 + // Count bytes already in the block map so we measure against the new limit. 354 + let mut mem_size: usize = self.blocks.values().map(|b| b.len()).sum(); 355 + 356 + while let Some((cid, data)) = self.car.next_block().await? { 357 + if cid == self.root { 358 + let c: Commit = serde_ipld_dagcbor::from_slice(&data)?; 359 + self.commit = Some(c); 360 + continue; 361 + } 362 + let maybe_processed = MaybeProcessedBlock::maybe(self.process, data); 363 + mem_size += maybe_processed.len(); 364 + self.blocks.insert(cid.into(), maybe_processed); 365 + if mem_size >= new_max_size { 366 + return Err(LoadError::MemoryLimitReached(Box::new(PartialCar { 367 + car: self.car, 368 + root: self.root, 369 + process: self.process, 370 + max_size: new_max_size, 371 + blocks: self.blocks, 372 + commit: self.commit, 373 + }))); 374 + } 375 + } 376 + 377 + let commit = self.commit.ok_or(LoadError::MissingCommit)?; 378 + 379 + let (root_node, root_bytes) = match self 380 + .blocks 381 + .get(&commit.data) 382 + .ok_or(LoadError::MissingCommit)? 383 + { 384 + MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?, 385 + MaybeProcessedBlock::Raw(bytes) => { 386 + (serde_ipld_dagcbor::from_slice(bytes)?, bytes.clone()) 387 + } 388 + }; 389 + let root_cid: Cid = commit.data.clone().into(); 390 + 391 + Ok(MemCar { 392 + commit, 393 + prev_key: None, 394 + blocks: self.blocks, 395 + walker: Walker::new(root_node, root_cid, root_bytes), 396 + process: self.process, 397 + }) 398 + } 399 + 341 400 pub async fn finish_loading( 342 401 mut self, 343 402 mut store: DiskStore,