Fast and robust atproto CAR file processing in rust
15
fork

Configure Feed

Select the types of activity you want to include in your feed.

dead file

phil 78a1ccc4 74787d95

-551
-551
src/drive.rs
··· 1 - //! Consume a CAR from an AsyncRead, producing an ordered stream of records 2 - 3 - use crate::link::ObjectLink; 4 - use crate::{ 5 - Bytes, HashMap, RepoPath, Step, 6 - block::{MaybeProcessedBlock, noop}, 7 - disk::{DiskError, DiskStore}, 8 - mst::MstNode, 9 - walk::{MstError, Output, WalkItem}, 10 - }; 11 - use cid::Cid; 12 - use iroh_car::CarReader; 13 - use std::convert::Infallible; 14 - use tokio::{io::AsyncRead, sync::mpsc}; 15 - 16 - use crate::mst::Commit; 17 - use crate::walk::{WalkError, Walker}; 18 - use thiserror::Error; 19 - 20 - /// An in-order chunk of RepoPath + CID + (processed) Block 21 - pub type BlockChunk = Vec<Output>; 22 - 23 - /// Errors that can occur while loading a CAR into memory 24 - #[derive(Debug, Error)] 25 - pub enum LoadError<R: AsyncRead + Unpin> { 26 - #[error("failed reading CAR: {0}")] 27 - CarReader(#[from] iroh_car::Error), 28 - #[error("failed to decode cbor: {0}")] 29 - BadBlock(#[from] serde_ipld_dagcbor::DecodeError<Infallible>), 30 - #[error("missing commit")] 31 - MissingCommit, 32 - #[error("missing mst root node")] 33 - MissingRoot, 34 - #[error("failed to walk mst: {0}")] 35 - WalkError(#[from] WalkError), 36 - /// The memory limit was reached before all blocks were loaded. 37 - /// 38 - /// The partial state is returned so the caller can decide what to do 39 - /// (e.g. resume with disk storage via `PartialCar::finish_loading`). 40 - #[error("partially loaded car")] 41 - MemoryLimitReached(Box<PartialCar<R>>), 42 - } 43 - 44 - 45 - /// A partially memory-loaded CAR file that hit the memory limit mid-stream. 46 - /// 47 - /// Can be resumed with disk storage via `finish_loading`, or discarded. 48 - #[derive(Debug)] 49 - pub struct PartialCar<R: AsyncRead + Unpin> { 50 - pub(crate) car: CarReader<R>, 51 - pub(crate) root: Cid, 52 - pub(crate) process: fn(Bytes) -> Bytes, 53 - pub(crate) max_size: usize, 54 - pub(crate) blocks: HashMap<ObjectLink, MaybeProcessedBlock>, 55 - /// The commit block, if it was seen before the memory limit was reached 56 - pub commit: Option<Commit>, 57 - } 58 - 59 - /// Builder-style driver setup 60 - #[derive(Debug, Clone)] 61 - pub struct DriverBuilder { 62 - pub mem_limit_mb: usize, 63 - pub block_processor: fn(Bytes) -> Bytes, 64 - } 65 - 66 - impl Default for DriverBuilder { 67 - fn default() -> Self { 68 - Self { 69 - mem_limit_mb: 10, 70 - block_processor: noop, 71 - } 72 - } 73 - } 74 - 75 - impl DriverBuilder { 76 - /// Begin configuring the driver with defaults 77 - pub fn new() -> Self { 78 - Default::default() 79 - } 80 - 81 - /// Set the in-memory size limit, in MiB 82 - /// 83 - /// Default: 10 MiB 84 - pub fn with_mem_limit_mb(mut self, new_limit: usize) -> Self { 85 - self.mem_limit_mb = new_limit; 86 - self 87 - } 88 - 89 - /// Set the block processor 90 - /// 91 - /// Default: noop, raw blocks will be emitted 92 - pub fn with_block_processor(mut self, new_processor: fn(Bytes) -> Bytes) -> Self { 93 - self.block_processor = new_processor; 94 - self 95 - } 96 - 97 - /// Load an atproto repository CAR into memory. 98 - /// 99 - /// Returns a `MemCar` ready for walking. If the blocks exceed the memory 100 - /// limit, returns `Err(LoadError::MemoryLimitReached(partial))` containing 101 - /// the partial state, which can be resumed with disk storage. 102 - pub async fn load_car<R: AsyncRead + Unpin>( 103 - &self, 104 - reader: R, 105 - ) -> Result<MemCar, LoadError<R>> { 106 - load_car(reader, self.block_processor, self.mem_limit_mb).await 107 - } 108 - } 109 - 110 - async fn load_car<R: AsyncRead + Unpin>( 111 - reader: R, 112 - process: fn(Bytes) -> Bytes, 113 - mem_limit_mb: usize, 114 - ) -> Result<MemCar, LoadError<R>> { 115 - let mut block_count = 0; 116 - 117 - let max_size = mem_limit_mb * 2_usize.pow(20); 118 - let mut mem_blocks = HashMap::new(); 119 - 120 - let mut car = CarReader::new(reader).await?; 121 - 122 - let roots = car.header().roots(); 123 - assert_eq!(roots.len(), 1); 124 - 125 - let root = *roots.first().ok_or(LoadError::MissingRoot)?; 126 - log::debug!("root: {root:?}"); 127 - 128 - let mut commit = None; 129 - 130 - let mut mem_size = 0; 131 - while let Some((cid, data)) = car.next_block().await? { 132 - block_count += 1; 133 - // The root commit block is handled separately — never passed to the processor 134 - if cid == root { 135 - let c: Commit = serde_ipld_dagcbor::from_slice(&data)?; 136 - commit = Some(c); 137 - continue; 138 - } 139 - 140 - let maybe_processed = MaybeProcessedBlock::maybe(process, data); 141 - 142 - mem_size += maybe_processed.len(); 143 - mem_blocks.insert(cid.into(), maybe_processed); 144 - if mem_size >= max_size { 145 - log::debug!("blocks loaded before memory limit: {block_count}"); 146 - return Err(LoadError::MemoryLimitReached(PartialCar { 147 - car, 148 - root, 149 - process, 150 - max_size, 151 - blocks: mem_blocks, 152 - commit, 153 - })); 154 - } 155 - } 156 - 157 - log::debug!("blocks: {block_count}"); 158 - 159 - let commit = commit.ok_or(LoadError::MissingCommit)?; 160 - 161 - let root_node: MstNode = match mem_blocks 162 - .get(&commit.data) 163 - .ok_or(LoadError::MissingCommit)? 164 - { 165 - MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?, 166 - MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(bytes)?, 167 - }; 168 - let mut walker = Walker::new(root_node); 169 - 170 - Ok(MemCar { 171 - commit, 172 - prev_key, 173 - blocks: mem_blocks, 174 - walker, 175 - process, 176 - trailing_key: None, 177 - }) 178 - } 179 - 180 - /// A fully loaded in-memory CAR file, ready for MST walking. 181 - #[derive(Debug)] 182 - pub struct MemCar { 183 - pub commit: Commit, 184 - /// For CAR slices: the key of the last record before this slice's leading edge. 185 - /// `None` if this slice (or full CAR) starts from the leftmost record in the tree. 186 - pub prev_key: Option<RepoPath>, 187 - pub blocks: HashMap<ObjectLink, MaybeProcessedBlock>, 188 - walker: Walker, 189 - process: fn(Bytes) -> Bytes, 190 - /// `None` = no gap encountered yet; `Some(k)` = trailing edge determined. 191 - trailing_key: Option<Option<RepoPath>>, 192 - } 193 - 194 - impl MemCar { 195 - 196 - /// Seek forward to the first record at or after `target`. 197 - /// 198 - /// Uses the MST structure to skip entire subtrees efficiently. 199 - /// After this returns, the next `next` or `next_chunk` call will start at or after `target`. 200 - pub fn seek(&mut self, target: &str) -> Result<(), WalkError> { 201 - self.walker.seek(target, &self.blocks) 202 - } 203 - 204 - /// Walk forward past any gaps to determine the trailing edge key. 205 - /// 206 - /// The first record key encountered after a gap (whether the record's block 207 - /// is present or missing) is the trailing edge — the first key not covered 208 - /// by this slice. Sets `self.trailing_key` and returns it. 209 - fn find_trailing_edge(&mut self) -> Result<Option<RepoPath>, WalkError> { 210 - let trailing = loop { 211 - match self.walker.step(&self.blocks, self.process)? { 212 - Some(WalkItem::Record(r)) => break Some(r.key), 213 - Some(WalkItem::MissingRecord { key, .. }) => break Some(key), 214 - Some(WalkItem::MissingSubtree { .. }) => continue, 215 - None => break None, 216 - } 217 - }; 218 - self.trailing_key = Some(trailing.clone()); 219 - Ok(trailing) 220 - } 221 - 222 - /// Get the next record. 223 - /// 224 - /// Returns `Step::Value(output)` for each record in key order, then 225 - /// `Step::End(None)` at the end of a full CAR, or `Step::End(Some(key))` 226 - /// for CAR slices where `key` is the first key immediately after the slice. 227 - pub fn next(&mut self) -> Result<Step, WalkError> { 228 - if let Some(trailing) = &self.trailing_key { 229 - return Ok(Step::End(trailing.clone())); 230 - } 231 - match self.walker.step(&self.blocks, self.process)? { 232 - Some(WalkItem::Record(out)) => Ok(Step::Value(out)), 233 - Some(WalkItem::MissingRecord { key, .. }) => { 234 - self.trailing_key = Some(Some(key.clone())); 235 - Ok(Step::End(Some(key))) 236 - } 237 - Some(WalkItem::MissingSubtree { .. }) => { 238 - let trailing = self.find_trailing_edge()?; 239 - Ok(Step::End(trailing)) 240 - } 241 - None => { 242 - self.trailing_key = Some(None); 243 - Ok(Step::End(None)) 244 - } 245 - } 246 - } 247 - 248 - /// Iterate up to `n` records in key order. 249 - /// 250 - /// Returns `Step::Value(records)` while records remain, then `Step::End(next_key)` 251 - /// where `next_key` is the first key after the slice (for CAR slices), or `None`. 252 - pub fn next_chunk(&mut self, n: usize) -> Result<Step<BlockChunk>, WalkError> { 253 - if let Some(trailing) = &self.trailing_key { 254 - return Ok(Step::End(trailing.clone())); 255 - } 256 - let mut out = Vec::with_capacity(n); 257 - for _ in 0..n { 258 - match self.walker.step(&self.blocks, self.process)? { 259 - Some(WalkItem::Record(record)) => out.push(record), 260 - Some(WalkItem::MissingRecord { key, .. }) => { 261 - self.trailing_key = Some(Some(key.clone())); 262 - return Ok(Step::Value(out)); // may be empty 263 - } 264 - Some(WalkItem::MissingSubtree { .. }) => { 265 - let trailing = self.find_trailing_edge()?; 266 - self.trailing_key = Some(trailing); 267 - return Ok(Step::Value(out)); // may be empty 268 - } 269 - None => break, 270 - } 271 - } 272 - if out.is_empty() { 273 - self.trailing_key = Some(None); 274 - Ok(Step::End(None)) 275 - } else { 276 - Ok(Step::Value(out)) 277 - } 278 - } 279 - } 280 - 281 - // --------------------------------------------------------------------------- 282 - // Disk path (kept for future wiring, not yet part of the primary API) 283 - // --------------------------------------------------------------------------- 284 - 285 - /// Errors that can happen while consuming blocks via the disk path 286 - #[derive(Debug, thiserror::Error)] 287 - pub enum DriveError { 288 - #[error("Error from iroh_car: {0}")] 289 - CarReader(#[from] iroh_car::Error), 290 - #[error("Failed to decode commit block: {0}")] 291 - BadBlock(#[from] serde_ipld_dagcbor::DecodeError<Infallible>), 292 - #[error("The Commit block reference by the root was not found")] 293 - MissingCommit, 294 - #[error("Failed to walk the mst tree: {0}")] 295 - WalkError(#[from] WalkError), 296 - #[error("CAR file had no roots")] 297 - MissingRoot, 298 - #[error("Storage error")] 299 - StorageError(#[from] DiskError), 300 - #[error("Unexpected missing block: {0:?}")] 301 - MissingBlock(Cid), 302 - #[error("Tried to send on a closed channel")] 303 - ChannelSendError, 304 - #[error("Failed to join a task: {0}")] 305 - JoinError(#[from] tokio::task::JoinError), 306 - } 307 - 308 - impl From<MstError> for DriveError { 309 - fn from(me: MstError) -> DriveError { 310 - DriveError::WalkError(WalkError::MstError(me)) 311 - } 312 - } 313 - 314 - impl<R: AsyncRead + Unpin> PartialCar<R> { 315 - pub async fn finish_loading( 316 - mut self, 317 - mut store: DiskStore, 318 - ) -> Result<(Commit, Option<RepoPath>, DiskDriver), DriveError> { 319 - store = tokio::task::spawn(async move { 320 - let kvs = self 321 - .blocks 322 - .into_iter() 323 - .map(|(k, v)| (k.to_bytes(), v.into_bytes())); 324 - 325 - store.put_many(kvs)?; 326 - Ok::<_, DriveError>(store) 327 - }) 328 - .await??; 329 - 330 - let (tx, mut rx) = mpsc::channel::<Vec<(ObjectLink, MaybeProcessedBlock)>>(1); 331 - 332 - let store_worker = tokio::task::spawn_blocking(move || { 333 - while let Some(chunk) = rx.blocking_recv() { 334 - let kvs = chunk 335 - .into_iter() 336 - .map(|(k, v)| (k.to_bytes(), v.into_bytes())); 337 - store.put_many(kvs)?; 338 - } 339 - Ok::<_, DriveError>(store) 340 - }); 341 - 342 - log::debug!("dumping the rest of the stream..."); 343 - loop { 344 - let mut mem_size = 0; 345 - let mut chunk = vec![]; 346 - loop { 347 - let Some((cid, data)) = self.car.next_block().await? else { 348 - break; 349 - }; 350 - if cid == self.root { 351 - let c: Commit = serde_ipld_dagcbor::from_slice(&data)?; 352 - self.commit = Some(c); 353 - continue; 354 - } 355 - 356 - let link = cid.into(); 357 - let data = Bytes::from(data); 358 - 359 - let maybe_processed = MaybeProcessedBlock::maybe(self.process, data); 360 - mem_size += maybe_processed.len(); 361 - chunk.push((link, maybe_processed)); 362 - if mem_size >= (self.max_size / 2) { 363 - break; 364 - } 365 - } 366 - if chunk.is_empty() { 367 - break; 368 - } 369 - tx.send(chunk) 370 - .await 371 - .map_err(|_| DriveError::ChannelSendError)?; 372 - } 373 - drop(tx); 374 - log::debug!("done. waiting for worker to finish..."); 375 - 376 - store = store_worker.await??; 377 - 378 - log::debug!("worker finished."); 379 - 380 - let commit = self.commit.ok_or(DriveError::MissingCommit)?; 381 - 382 - let db_bytes = store 383 - .get(&commit.data.to_bytes()) 384 - .map_err(|e| DriveError::StorageError(DiskError::DbError(e)))? 385 - .ok_or(DriveError::MissingCommit)?; 386 - 387 - let node: MstNode = match MaybeProcessedBlock::from_bytes(db_bytes.to_vec()) { 388 - MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?, 389 - MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(&bytes)?, 390 - }; 391 - let walker = Walker::new(node); 392 - 393 - Ok(( 394 - commit, 395 - None, 396 - DiskDriver { 397 - process: self.process, 398 - state: Some(BigState { store, walker }), 399 - }, 400 - )) 401 - } 402 - } 403 - 404 - struct BigState { 405 - store: DiskStore, 406 - walker: Walker, 407 - } 408 - 409 - /// MST walker that reads from disk instead of an in-memory hashmap 410 - pub struct DiskDriver { 411 - process: fn(Bytes) -> Bytes, 412 - state: Option<BigState>, 413 - } 414 - 415 - // for doctests only 416 - #[doc(hidden)] 417 - pub fn _get_fake_disk_driver() -> DiskDriver { 418 - DiskDriver { 419 - process: noop, 420 - state: None, 421 - } 422 - } 423 - 424 - impl DiskDriver { 425 - /// Walk the MST returning up to `n` key + record pairs 426 - /// 427 - /// ```no_run 428 - /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, Step, noop}; 429 - /// # #[tokio::main] 430 - /// # async fn main() -> Result<(), DriveError> { 431 - /// # let mut disk_driver = _get_fake_disk_driver(); 432 - /// while let Step::Value(outputs) = disk_driver.next_chunk(256).await? { 433 - /// for output in outputs { 434 - /// println!("{}: size={}", output.key, output.data.len()); 435 - /// } 436 - /// } 437 - /// # Ok(()) 438 - /// # } 439 - /// ``` 440 - pub async fn next_chunk(&mut self, n: usize) -> Result<Step<Vec<Output>>, DriveError> { 441 - let process = self.process; 442 - 443 - let mut state = self.state.take().expect("DiskDriver must have Some(state)"); 444 - 445 - let (state, res) = 446 - tokio::task::spawn_blocking(move || -> (BigState, Result<BlockChunk, DriveError>) { 447 - let mut out = Vec::with_capacity(n); 448 - 449 - for _ in 0..n { 450 - match state.walker.disk_step(&state.store, process) { 451 - Err(e) => return (state, Err(e.into())), 452 - Ok(Some(WalkItem::Record(output))) => out.push(output), 453 - Ok(Some(WalkItem::MissingRecord { cid, .. })) 454 - | Ok(Some(WalkItem::MissingSubtree { cid })) => { 455 - return (state, Err(DriveError::MissingBlock(cid))); 456 - } 457 - Ok(None) => break, 458 - } 459 - } 460 - 461 - (state, Ok::<_, DriveError>(out)) 462 - }) 463 - .await?; 464 - 465 - self.state = Some(state); 466 - 467 - let out = res?; 468 - 469 - if out.is_empty() { 470 - Ok(Step::End(None)) 471 - } else { 472 - Ok(Step::Value(out)) 473 - } 474 - } 475 - 476 - fn read_tx_blocking( 477 - &mut self, 478 - n: usize, 479 - tx: mpsc::Sender<Result<Step<BlockChunk>, DriveError>>, 480 - ) -> Result<(), mpsc::error::SendError<Result<Step<BlockChunk>, DriveError>>> { 481 - let BigState { store, walker } = self.state.as_mut().expect("valid state"); 482 - 483 - loop { 484 - let mut out: BlockChunk = Vec::with_capacity(n); 485 - 486 - for _ in 0..n { 487 - match walker.disk_step(store, self.process) { 488 - Err(e) => return tx.blocking_send(Err(e.into())), 489 - Ok(Some(WalkItem::Record(output))) => out.push(output), 490 - Ok(Some(WalkItem::MissingRecord { cid, .. })) 491 - | Ok(Some(WalkItem::MissingSubtree { cid })) => { 492 - return tx.blocking_send(Err(DriveError::MissingBlock(cid))); 493 - } 494 - Ok(None) => break, 495 - } 496 - } 497 - 498 - if out.is_empty() { 499 - break; 500 - } 501 - tx.blocking_send(Ok(Step::Value(out)))?; 502 - } 503 - 504 - Ok(()) 505 - } 506 - 507 - /// Spawn the disk reading task into a tokio blocking thread 508 - /// 509 - /// ```no_run 510 - /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, Step, noop}; 511 - /// # #[tokio::main] 512 - /// # async fn main() -> Result<(), DriveError> { 513 - /// # let mut disk_driver = _get_fake_disk_driver(); 514 - /// let (mut rx, join) = disk_driver.to_channel(512); 515 - /// while let Some(recvd) = rx.recv().await { 516 - /// let outputs = recvd?; 517 - /// let Step::Value(outputs) = outputs else { break; }; 518 - /// for output in outputs { 519 - /// println!("{}: size={}", output.key, output.data.len()); 520 - /// } 521 - /// 522 - /// } 523 - /// # Ok(()) 524 - /// # } 525 - /// ``` 526 - pub fn to_channel( 527 - mut self, 528 - n: usize, 529 - ) -> ( 530 - mpsc::Receiver<Result<Step<BlockChunk>, DriveError>>, 531 - tokio::task::JoinHandle<Self>, 532 - ) { 533 - let (tx, rx) = mpsc::channel::<Result<Step<BlockChunk>, DriveError>>(1); 534 - 535 - let chan_task = tokio::task::spawn_blocking(move || { 536 - if let Err(mpsc::error::SendError(_)) = self.read_tx_blocking(n, tx) { 537 - log::debug!("big car reader exited early due to dropped receiver channel"); 538 - } 539 - self 540 - }); 541 - 542 - (rx, chan_task) 543 - } 544 - 545 - /// Reset the disk storage so it can be reused. 546 - pub async fn reset_store(mut self) -> Result<DiskStore, DriveError> { 547 - let BigState { store, .. } = self.state.take().expect("valid state"); 548 - store.reset().await?; 549 - Ok(store) 550 - } 551 - }