Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

big rework wip

phil de2d74d9 455306cb

+566 -682
+14 -22
examples/disk-read-file/main.rs
··· 9 9 static GLOBAL: MiMalloc = MiMalloc; 10 10 11 11 use clap::Parser; 12 - use repo_stream::{DiskBuilder, Driver, DriverBuilder, Step}; 12 + use repo_stream::{DiskBuilder, DriverBuilder, LoadError, Step}; 13 13 use std::path::PathBuf; 14 14 use std::time::Instant; 15 15 ··· 37 37 38 38 // in this example we only bother handling CARs that are too big for memory 39 39 // `noop` helper means: do no block processing, store the raw blocks 40 - let driver = match DriverBuilder::new() 40 + let partial = match DriverBuilder::new() 41 41 .with_mem_limit_mb(32) // how much memory can be used before disk spill 42 42 .load_car(reader) 43 - .await? 43 + .await 44 44 { 45 - Driver::Memory(_, _, _) => panic!("try this on a bigger car"), 46 - Driver::Disk(big_stuff) => { 47 - // we reach here if the repo was too big and needs to be spilled to 48 - // disk to continue 49 - 50 - // set up a disk store we can spill to 51 - let disk_store = DiskBuilder::new().open(tmpfile).await?; 52 - 53 - // do the spilling, get back a (similar) driver 54 - let (commit, _, driver) = big_stuff.finish_loading(disk_store).await?; 45 + Ok(_mem_car) => panic!("try this on a bigger car"), 46 + Err(LoadError::MemoryLimitReached(partial)) => partial, 47 + Err(e) => return Err(e.into()), 48 + }; 55 49 56 - // at this point you might want to fetch the account's signing key 57 - // via the DID from the commit, and then verify the signature. 58 - log::warn!("big's comit ({:?}): {:?}", t0.elapsed(), commit); 50 + // set up a disk store we can spill to 51 + let disk_store = DiskBuilder::new().open(tmpfile).await?; 59 52 60 - // log::info!("now is good time to check mem usage..."); 61 - // tokio::time::sleep(std::time::Duration::from_secs(15)).await; 53 + // do the spilling, get back a disk driver 54 + let (commit, _, driver) = partial.finish_loading(disk_store).await?; 62 55 63 - // pop the driver back out to get some code indentation relief 64 - driver 65 - } 66 - }; 56 + // at this point you might want to fetch the account's signing key 57 + // via the DID from the commit, and then verify the signature. 58 + log::warn!("big's commit ({:?}): {:?}", t0.elapsed(), commit); 67 59 68 60 // collect some random stats about the blocks 69 61 let mut n = 0;
-123
examples/mst-node-survival/main.rs
··· 1 - extern crate repo_stream; 2 - use repo_stream::link::ThingKind; 3 - use clap::Parser; 4 - use repo_stream::{Driver, DriverBuilder}; 5 - use std::path::PathBuf; 6 - 7 - type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; 8 - 9 - #[derive(Debug, Parser)] 10 - struct Args { 11 - #[arg()] 12 - a: PathBuf, 13 - // #[arg()] 14 - // b: PathBuf, 15 - } 16 - 17 - #[tokio::main] 18 - async fn main() -> Result<()> { 19 - env_logger::init(); 20 - 21 - let Args { a } = Args::parse(); 22 - let reader_a = tokio::fs::File::open(a.clone()).await?; 23 - let reader_a = tokio::io::BufReader::new(reader_a); 24 - 25 - // let reader_b = tokio::fs::File::open(b.clone()).await?; 26 - // let reader_b = tokio::io::BufReader::new(reader_b); 27 - 28 - let builder = DriverBuilder::new() 29 - .with_mem_limit_mb(1000) 30 - .with_block_processor(|_| vec![]); 31 - 32 - log::info!("loading {a:?}..."); 33 - let driver_a = match builder.clone().load_car(reader_a).await? { 34 - Driver::Memory(_, _, mem_driver) => mem_driver, 35 - Driver::Disk(_) => panic!("this example doesn't handle big CARs"), 36 - }; 37 - 38 - // log::info!("loading {b:?}..."); 39 - // let driver_b = match builder.load_car(reader_b).await? { 40 - // Driver::Memory(_, _, mem_driver) => mem_driver, 41 - // Driver::Disk(_) => panic!("this example doesn't handle big CARs"), 42 - // }; 43 - 44 - let mut total_referenced_nodes = 0; 45 - let mut total_l0_nodes = 0; 46 - let mut total_referenced_records = 0; 47 - let mut total_low_records = 0; 48 - // let mut records_a = 0; 49 - // let mut layer0_a = 0; 50 - 51 - for (_link, mpb) in driver_a.blocks { 52 - // // skips (probably nodes) 53 - // if !mpb.unknown_depth() { 54 - // continue; 55 - // } 56 - 57 - // // records 58 - // if mpb.to_node().is_some() { 59 - // continue; 60 - // } 61 - 62 - 63 - // nodes 64 - let Some(node) = mpb.to_node() else { 65 - continue; 66 - }; 67 - let Some(depth) = node.depth else { 68 - continue; 69 - }; 70 - for thing in node.things { 71 - match thing.kind { 72 - ThingKind::Record(_) => { 73 - total_referenced_records += 1; 74 - if depth <= 1 { 75 - total_low_records += 1; 76 - } 77 - } 78 - ThingKind::ChildNode => { 79 - total_referenced_nodes += 1; 80 - if depth == 1 { 81 - total_l0_nodes += 1; 82 - } 83 - } 84 - } 85 - } 86 - 87 - // // levels 88 - // let Some(node) = mpb.to_node() else { 89 - // continue; 90 - // }; 91 - // if node.depth != Some(1) { 92 - // continue; 93 - // } 94 - 95 - // total_a += 1; 96 - 97 - // if driver_b.blocks.contains_key(&link) { 98 - // surviving_a += 1; 99 - // } 100 - } 101 - 102 - eprintln!("referenced nodes: {total_referenced_nodes}"); 103 - eprintln!("referenced records: {total_referenced_records}"); 104 - let total_links = total_referenced_nodes + total_referenced_records; 105 - eprintln!("total links: {}", total_links); 106 - 107 - eprintln!("layer 0+1 records: {total_low_records}"); 108 - eprintln!("low recs of records: {:.1}", 109 - 100. * f64::try_from(total_low_records).unwrap() / f64::try_from(total_referenced_records).unwrap() 110 - ); 111 - 112 - eprintln!("layer 0 nodes: {total_l0_nodes}"); 113 - eprintln!("low nodes of nodes: {:.1}", 114 - 100. * f64::try_from(total_l0_nodes).unwrap() / f64::try_from(total_referenced_nodes).unwrap() 115 - ); 116 - 117 - let low_links = total_l0_nodes + total_low_records; 118 - eprintln!("low links of all links: {:.1}", 119 - 100. * f64::try_from(low_links).unwrap() / f64::try_from(total_links).unwrap() 120 - ); 121 - 122 - Ok(()) 123 - }
-32
examples/print-tree/main.rs
··· 1 - /*! 2 - Read a CAR slice in memory and show some info about it. 3 - */ 4 - 5 - extern crate repo_stream; 6 - use repo_stream::{Driver, DriverBuilder}; 7 - 8 - type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; 9 - 10 - #[tokio::main] 11 - async fn main() -> Result<()> { 12 - env_logger::init(); 13 - let reader = tokio::io::BufReader::new(tokio::io::stdin()); 14 - 15 - let (commit, driver) = match DriverBuilder::new() 16 - .with_block_processor(|block| block.len().to_ne_bytes().to_vec()) 17 - .load_car(reader) 18 - .await? 19 - { 20 - Driver::Memory(commit, _, mem_driver) => (commit, mem_driver), 21 - Driver::Disk(_) => panic!("this example doesn't handle big CARs"), 22 - }; 23 - 24 - println!( 25 - "\nthis slice is from {}, repo rev {}\n\n", 26 - commit.did, commit.rev 27 - ); 28 - 29 - driver.viz(commit.data)?; 30 - 31 - Ok(()) 32 - }
+7 -16
examples/read-file/main.rs
··· 4 4 5 5 extern crate repo_stream; 6 6 use clap::Parser; 7 - use repo_stream::{Driver, DriverBuilder, Output, Step}; 7 + use repo_stream::{DriverBuilder, Output, Step}; 8 8 use std::path::PathBuf; 9 9 10 10 type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; ··· 23 23 let reader = tokio::fs::File::open(file).await?; 24 24 let reader = tokio::io::BufReader::new(reader); 25 25 26 - let (commit, mut driver) = match DriverBuilder::new() 26 + let mut mem_car = DriverBuilder::new() 27 27 .with_mem_limit_mb(1000) 28 28 .with_block_processor(|block| block.len().to_ne_bytes().to_vec()) 29 29 .load_car(reader) 30 - .await? 31 - { 32 - Driver::Memory(commit, _, mem_driver) => (commit, mem_driver), 33 - Driver::Disk(_) => panic!("this example doesn't handle big CARs"), 34 - }; 30 + .await?; 35 31 36 - log::info!("got commit: {commit:?}"); 32 + log::info!("got commit: {:?}", mem_car.commit); 37 33 38 - while let Step::Value(records) = driver.next_chunk(256).await? { 39 - for Output { rkey, cid, data } in records { 40 - // let size = usize::from_ne_bytes(data.try_into().unwrap()); 41 - // print!("0x"); 42 - // for byte in cid.to_bytes() { 43 - // print!("{byte:>02x}"); 44 - // } 45 - // println!(": {rkey} => record of len {}", size); 34 + while let Step::Value(records) = mem_car.next_chunk(256)? { 35 + for Output { rkey: _, cid: _, data: _ } in records { 36 + // process records 46 37 } 47 38 } 48 39
+9 -8
examples/read-slice/main.rs
··· 3 3 */ 4 4 5 5 extern crate repo_stream; 6 - use repo_stream::{Driver, DriverBuilder, Output, Step}; 6 + use repo_stream::{DriverBuilder, LoadError, Output, Step}; 7 7 8 8 type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; 9 9 ··· 12 12 env_logger::init(); 13 13 let reader = tokio::io::BufReader::new(tokio::io::stdin()); 14 14 15 - let (commit, prev_rkey, mut driver) = match DriverBuilder::new() 15 + let mut mem_car = match DriverBuilder::new() 16 16 .with_block_processor(|block| block.len().to_ne_bytes().to_vec()) 17 17 .load_car(reader) 18 - .await? 18 + .await 19 19 { 20 - Driver::Memory(commit, prev, mem_driver) => (commit, prev, mem_driver), 21 - Driver::Disk(_) => panic!("this example doesn't handle big CARs"), 20 + Ok(mc) => mc, 21 + Err(LoadError::MemoryLimitReached(_)) => panic!("this example doesn't handle big CARs"), 22 + Err(e) => return Err(e.into()), 22 23 }; 23 24 24 25 println!( 25 26 "\nthis slice is from {}, repo rev {}", 26 - commit.did, commit.rev 27 + mem_car.commit.did, mem_car.commit.rev 27 28 ); 28 - if let Some(rkey) = prev_rkey { 29 + if let Some(rkey) = &mem_car.prev_rkey { 29 30 println!(" -> key immediately before CAR slice: {rkey}"); 30 31 } else { 31 32 println!( ··· 35 36 36 37 println!("included records:"); 37 38 let end = loop { 38 - match driver.next_chunk(256).await? { 39 + match mem_car.next_chunk(256)? { 39 40 Step::Value(chunk) => { 40 41 for Output { cid, rkey, .. } in chunk { 41 42 print!(" SHA256 ");
+103
hacking.md
··· 1 + things are changing a lot right now, but hopefully this file won't get too out of date. 2 + 3 + the latest published repo-stream release works well but it turns out there's still a lot left to do 4 + 5 + 6 + ### memory limit 7 + 8 + the last release had a kind of singular concept of a "memory limit", after which it would refuse to process in-memory and give you the partial state to finish dealing with using the disk driver. 9 + 10 + the idea for sending the state back to the user was that disk resources might be something they'd want to constrain, while allowing high concurrency of in-memory processing (eg., for large network backfills) 11 + 12 + one problem is that the high cost of disk spilling leads to high memory limits. queuing partial work, loaded to the max mem limit, can end up occupying a lot of memory! we probably actually need to pull the concept of a "disk worker" back to something like a "high-resource worker": 13 + 14 + - lower mem limit for normal in-memory processing 15 + - queue lower-limit partial state for an available high-resource worker 16 + - high resource worker might continue trying to process in-memory to a higher limit before disk spilling 17 + 18 + this should keep things a little more under control without giving up higher-memory in-memory performance -- generally everything should behave more predictably hopefully. 19 + 20 + 21 + ### disk spilling 22 + 23 + the switch to fjall was a major performance boost compared to sqlite, but it might not be the ultimate best fit for this 24 + 25 + - its WAL can't be turned off. we don't need a WAL. WAL writes go at least into the OS page cache, and while they might never hit disk they still use resources. if the page cache needs to evict, (high overall memory utilization, what we are designing for!), then this could suddenly increase IO, slowing down the high-resource worker, increasing contention over disk bandwidth, and using more disk space. 26 + - its memory isn't super well under control. the amount it actually uses is currently higher than what the user configures with repo-stream (an internal repo-stream problem mostly, not fully fjall's problem) -- some of its impressive performance is probably due to this. 27 + - it launches background workers for compaction etc (extra resource usage, maybe some unfair perf vs sqlite comes from here too) 28 + - it opens a lot of files. if we keep fjall, we should make a global database instance and have individual workers create and drop keyspaces in it, instead of opening many fjall dbs and making the user's app hit ulimit. 29 + 30 + i'm interested in seeing whether using fjall's LSM-Tree storage engine directly might help address all of these points. 31 + 32 + other storage engines have been tested (redb, microsoft's new neat one, candystore, heed, cask) and so far fjall and sqlite have kept the best balance of controllable resource usage and performance. but i'm still interested in new ones to try. 33 + 34 + new ones to try: 35 + - https://github.com/arthurprs/canopydb: B+ tree so not holding my breath but let's see 36 + 37 + sekoia has some nice ideas for a custom storage engine for repo-stream: that's what we'll ultimately switch to most likely! 38 + 39 + 40 + ### partial CAR files 41 + 42 + this is the big one currently: repo-stream originally assumed it was working with full CAR exports (every MST link present), but that's not the case for CAR slices (from `com.atproto.sync.getRecord`) or firehose commits (`com.atproto.sync.subscribeRepos` contains a spars trees), and it won't be the case in the future for the sync1.1 collection-subset repo export. 43 + 44 + my original attempt focused too closely on CAR slices for `getRecord`, making annoying assumptions that limited it. instead we really just need richer APIs. the `getRecord` case and a collection-subset case could both be served by a range iterator, where getRecord would just tighten the bounds to one exact key. more below. 45 + 46 + 47 + ### (de)serialization 48 + 49 + there is a custom MST node deserializer right now which tries to parse the node directly into local data structures. it might have been a very small perf win, but annoyingly it means we lost *serialization* functionality. 50 + 51 + we could (maybe should?) implement a custom serializer. or we could just go back to the original `derive` impl so we get it back for free. 52 + 53 + (i had been thinking that the custom derive would eventually lead to a custom CBOR binary parser specialized for MST nodes -- i really don't see why not since the subset we need to handle is very small. but anyway.) 54 + 55 + it turns out there are use-cases for emitting not just records but MST nodes as well from repo-stream: for example, to build a converter to STAR formats. 56 + 57 + so we need to at least have a proper `Node` type we can emit, and ideally that thing derives `serde::Serialize`. 58 + 59 + 60 + ### iroh-car 61 + 62 + iroh-car is good but annoyingly async. since storage engines in rust are mostly sync, it makes a bit of friction. wrapping its async calls in a blocking executor might be ok but kind of annoying. also most projects will probably wind up using it in an overal async context. 63 + 64 + i want to fork iroh-car and refactor it to a sans-io core, with sync/async wrapping interfaces. 65 + 66 + 67 + ### richer apis 68 + 69 + the apis kind of go out in a few dimensions 70 + 71 + - output MST nodes or not 72 + - output record contents or just keys and CIDs 73 + - chunked APIs or individual 74 + - failure on missing blocks or Optional output values 75 + 76 + feels like there should be 77 + 78 + low-level: 79 + 80 + - iterate all blocks forward, optional everything 81 + - seek: skip to some part of the tree 82 + 83 + for now leaving reverse iteration out for reconsideration if a use-case arises 84 + 85 + higher-level 86 + 87 + - function to iterate all records, expecting them to all be there (output: (key, cid, contents)) 88 + - function to iterate over a range of bounds 89 + - function to get a specific key 90 + - function to iterate over a prefix (with validation of proven correct start/stop bounds?) 91 + 92 + 93 + ### MST validity 94 + 95 + - maximum number of entries should be 200 (see previous work with Sekoia) 96 + - maximum number entries of a two-level subtree should be 800 or whatever (get real number, again prev work) 97 + 98 + we should also try to make a standards push to get those limits explicitly stated in the spec, to avoid hurting interop. 99 + 100 + 101 + ### processor function 102 + 103 + TODO: describe
+17 -10
readme.md
··· 11 11 [sponsor-badge]: https://img.shields.io/badge/at-microcosm-b820f9?labelColor=b820f9&logo=githubsponsors&logoColor=fff 12 12 13 13 ```rust no_run 14 - use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder, Output, Step}; 14 + use repo_stream::{DriverBuilder, LoadError, DiskBuilder, Output, Step}; 15 15 16 16 #[tokio::main] 17 17 async fn main() -> Result<(), Box<dyn std::error::Error>> { ··· 27 27 .with_block_processor( // block processing: just extract the raw record size 28 28 |rec| rec.len().to_ne_bytes().to_vec()) 29 29 .load_car(reader) 30 - .await? 30 + .await 31 31 { 32 - 33 32 // if all blocks fit within memory 34 - Driver::Memory(_commit, _prev_rkey, mut driver) => { 35 - while let Step::Value(chunk) = driver.next_chunk(256).await? { 33 + Ok(mut mem_car) => { 34 + while let Step::Value(chunk) = mem_car.next_chunk(256)? { 36 35 for Output { rkey: _, cid: _, data } in chunk { 37 - let size = usize::from_ne_bytes(data.try_into().unwrap()); 36 + let size = usize::from_ne_bytes(<[u8; 8]>::try_from(data).unwrap()); 38 37 total_size += size; 39 38 } 40 39 } 41 40 }, 42 41 43 42 // if the CAR was too big for in-memory processing 44 - Driver::Disk(paused) => { 43 + Err(LoadError::MemoryLimitReached(partial)) => { 45 44 // set up a disk store we can spill to 46 45 let store = DiskBuilder::new().open("some/path.db".into()).await?; 47 - // do the spilling, get back a (similar) driver 48 - let (_commit, _prev_rkey, mut driver) = paused.finish_loading(store).await?; 46 + // do the spilling, get back a disk driver 47 + let (_commit, _prev_rkey, mut driver) = partial.finish_loading(store).await?; 49 48 50 49 while let Step::Value(chunk) = driver.next_chunk(256).await? { 51 50 for Output { rkey: _, cid: _, data } in chunk { 52 - let size = usize::from_ne_bytes(data.try_into().unwrap()); 51 + let size = usize::from_ne_bytes(<[u8; 8]>::try_from(data).unwrap()); 53 52 total_size += size; 54 53 } 55 54 } 56 55 } 56 + 57 + Err(e) => return Err(e.into()), 57 58 }; 58 59 println!("sum of size of all records: {total_size}"); 59 60 Ok(()) ··· 71 72 - [ ] since the disk k/v get/set interface is now so similar to HashMap (blocking, no transactions,), it's probably possible to make a single `Driver` and move the thread stuff from the disk one to generic helper functions. (might create async footguns though) 72 73 - [ ] fork iroh-car into a sync version so we can drop tokio as a hard requirement, and offer async via wrapper helper things 73 74 - [ ] feature-flag the sha2 crate for hmac-sha256? if someone wanted fewer deps?? then maybe make `hashbrown` also optional vs builtin hashmap? 75 + 76 + 77 + ## contributing 78 + 79 + see ['./hacking.md'](./hacking.md) 80 + 74 81 75 82 ----- 76 83
+85
src/block.rs
··· 1 + use crate::{Bytes, mst::MstNode}; 2 + 3 + #[derive(Debug, Clone)] 4 + pub enum MaybeProcessedBlock { 5 + /// A block that's *probably* a Node (but we can't know yet) 6 + /// 7 + /// It *can be* a record that suspiciously looks a lot like a node, so we 8 + /// cannot eagerly turn it into a Node. We only know for sure what it is 9 + /// when we actually walk down the MST 10 + Raw(Bytes), 11 + /// A processed record from a block that was definitely not a Node 12 + /// 13 + /// Processing has to be fallible because the CAR can have totally-unused 14 + /// blocks, which can just be garbage. since we're eagerly trying to process 15 + /// record blocks without knowing for sure that they *are* records, we 16 + /// discard any definitely-not-nodes that fail processing and keep their 17 + /// error in the buffer for them. if we later try to retreive them as a 18 + /// record, then we can surface the error. 19 + /// 20 + /// If we _never_ needed this block, then we may have wasted a bit of effort 21 + /// trying to process it. Oh well. 22 + /// 23 + /// There's an alternative here, which would be to kick unprocessable blocks 24 + /// back to Raw, or maybe even a new RawUnprocessable variant. Then we could 25 + /// surface the typed error later if needed by trying to reprocess. 26 + Processed(Bytes), 27 + } 28 + 29 + impl MaybeProcessedBlock { 30 + pub fn to_node(&self) -> Option<MstNode> { 31 + let Self::Raw(bytes) = self else { 32 + return None; 33 + }; 34 + serde_ipld_dagcbor::from_slice(bytes).ok() 35 + } 36 + pub fn unknown_depth(&self) -> bool { 37 + let Self::Raw(bytes) = self else { 38 + return false; 39 + }; 40 + let Ok(node) = serde_ipld_dagcbor::from_slice::<MstNode>(bytes) else { 41 + return false; 42 + }; 43 + node.depth.is_none() 44 + } 45 + pub(crate) fn maybe(process: fn(Bytes) -> Bytes, data: Bytes) -> Self { 46 + if MstNode::could_be(&data) { 47 + MaybeProcessedBlock::Raw(data) 48 + } else { 49 + MaybeProcessedBlock::Processed(process(data)) 50 + } 51 + } 52 + pub(crate) fn len(&self) -> usize { 53 + match self { 54 + MaybeProcessedBlock::Raw(b) => b.len(), 55 + MaybeProcessedBlock::Processed(b) => b.len(), 56 + } 57 + } 58 + pub(crate) fn into_bytes(self) -> Bytes { 59 + match self { 60 + MaybeProcessedBlock::Raw(mut b) => { 61 + b.push(0x00); 62 + b 63 + } 64 + MaybeProcessedBlock::Processed(mut b) => { 65 + b.push(0x01); 66 + b 67 + } 68 + } 69 + } 70 + pub(crate) fn from_bytes(mut b: Bytes) -> Self { 71 + // TODO: make sure bytes is not empty, that it's explicitly 0 or 1, etc 72 + let suffix = b.pop().unwrap(); 73 + if suffix == 0x00 { 74 + MaybeProcessedBlock::Raw(b) 75 + } else { 76 + MaybeProcessedBlock::Processed(b) 77 + } 78 + } 79 + } 80 + 81 + /// Processor that just returns the raw blocks 82 + #[inline] 83 + pub fn noop(block: Bytes) -> Bytes { 84 + block 85 + }
+163 -271
src/drive.rs
··· 3 3 use crate::link::{NodeThing, ObjectLink, ThingKind}; 4 4 use crate::{ 5 5 Bytes, HashMap, Rkey, Step, 6 + block::{MaybeProcessedBlock, noop}, 6 7 disk::{DiskError, DiskStore}, 7 8 mst::MstNode, 8 9 walk::{MstError, Output}, ··· 14 15 15 16 use crate::mst::Commit; 16 17 use crate::walk::{WalkError, Walker}; 18 + use thiserror::Error; 17 19 18 - /// Errors that can happen while consuming and emitting blocks and records 19 - #[derive(Debug, thiserror::Error)] 20 - pub enum DriveError { 21 - #[error("Error from iroh_car: {0}")] 20 + /// An in-order chunk of Rkey + CID + (processed) Block 21 + pub type BlockChunk = Vec<Output>; 22 + 23 + /// Errors that can occur while loading a CAR into memory 24 + #[derive(Debug, Error)] 25 + pub enum LoadError<R: AsyncRead + Unpin> { 26 + #[error("failed reading CAR: {0}")] 22 27 CarReader(#[from] iroh_car::Error), 23 - #[error("Failed to decode commit block: {0}")] 28 + #[error("failed to decode cbor: {0}")] 24 29 BadBlock(#[from] serde_ipld_dagcbor::DecodeError<Infallible>), 25 - #[error("The Commit block reference by the root was not found")] 30 + #[error("missing commit")] 26 31 MissingCommit, 27 - #[error("Failed to walk the mst tree: {0}")] 28 - WalkError(#[from] WalkError), 29 - #[error("CAR file had no roots")] 32 + #[error("missing mst root node")] 30 33 MissingRoot, 31 - #[error("Storage error")] 32 - StorageError(#[from] DiskError), 33 - #[error("Tried to send on a closed channel")] 34 - ChannelSendError, // SendError takes <T> which we don't need 35 - #[error("Failed to join a task: {0}")] 36 - JoinError(#[from] tokio::task::JoinError), 37 - } 38 - 39 - impl From<MstError> for DriveError { 40 - fn from(me: MstError) -> DriveError { 41 - DriveError::WalkError(WalkError::MstError(me)) 42 - } 43 - } 44 - 45 - /// An in-order chunk of Rkey + CID + (processed) Block 46 - pub type BlockChunk = Vec<Output>; 47 - 48 - #[derive(Debug, Clone)] 49 - pub enum MaybeProcessedBlock { 50 - /// A block that's *probably* a Node (but we can't know yet) 34 + #[error("failed to walk mst: {0}")] 35 + WalkError(#[from] WalkError), 36 + /// The memory limit was reached before all blocks were loaded. 51 37 /// 52 - /// It *can be* a record that suspiciously looks a lot like a node, so we 53 - /// cannot eagerly turn it into a Node. We only know for sure what it is 54 - /// when we actually walk down the MST 55 - Raw(Bytes), 56 - /// A processed record from a block that was definitely not a Node 57 - /// 58 - /// Processing has to be fallible because the CAR can have totally-unused 59 - /// blocks, which can just be garbage. since we're eagerly trying to process 60 - /// record blocks without knowing for sure that they *are* records, we 61 - /// discard any definitely-not-nodes that fail processing and keep their 62 - /// error in the buffer for them. if we later try to retreive them as a 63 - /// record, then we can surface the error. 64 - /// 65 - /// If we _never_ needed this block, then we may have wasted a bit of effort 66 - /// trying to process it. Oh well. 67 - /// 68 - /// There's an alternative here, which would be to kick unprocessable blocks 69 - /// back to Raw, or maybe even a new RawUnprocessable variant. Then we could 70 - /// surface the typed error later if needed by trying to reprocess. 71 - Processed(Bytes), 38 + /// The partial state is returned so the caller can decide what to do 39 + /// (e.g. resume with disk storage via `PartialCar::finish_loading`). 40 + #[error("partially loaded car")] 41 + MemoryLimitReached(PartialCar<R>), 72 42 } 73 43 74 - impl MaybeProcessedBlock { 75 - pub fn to_node(&self) -> Option<MstNode> { 76 - let Self::Raw(bytes) = self else { 77 - return None; 78 - }; 79 - serde_ipld_dagcbor::from_slice(bytes).ok() 80 - } 81 - pub fn unknown_depth(&self) -> bool { 82 - let Self::Raw(bytes) = self else { 83 - return false; 84 - }; 85 - let Ok(node) = serde_ipld_dagcbor::from_slice::<MstNode>(bytes) else { 86 - return false; 87 - }; 88 - node.depth.is_none() 89 - } 90 - pub(crate) fn maybe(process: fn(Bytes) -> Bytes, data: Bytes) -> Self { 91 - if MstNode::could_be(&data) { 92 - MaybeProcessedBlock::Raw(data) 93 - } else { 94 - MaybeProcessedBlock::Processed(process(data)) 95 - } 96 - } 97 - pub(crate) fn len(&self) -> usize { 98 - match self { 99 - MaybeProcessedBlock::Raw(b) => b.len(), 100 - MaybeProcessedBlock::Processed(b) => b.len(), 101 - } 102 - } 103 - pub(crate) fn into_bytes(self) -> Bytes { 104 - match self { 105 - MaybeProcessedBlock::Raw(mut b) => { 106 - b.push(0x00); 107 - b 108 - } 109 - MaybeProcessedBlock::Processed(mut b) => { 110 - b.push(0x01); 111 - b 112 - } 113 - } 114 - } 115 - pub(crate) fn from_bytes(mut b: Bytes) -> Self { 116 - // TODO: make sure bytes is not empty, that it's explicitly 0 or 1, etc 117 - let suffix = b.pop().unwrap(); 118 - if suffix == 0x00 { 119 - MaybeProcessedBlock::Raw(b) 120 - } else { 121 - MaybeProcessedBlock::Processed(b) 122 - } 123 - } 124 - } 125 44 126 - /// Read a CAR file, buffering blocks in memory or to disk 127 - pub enum Driver<R: AsyncRead + Unpin> { 128 - /// All blocks fit within the memory limit 129 - /// 130 - /// You probably want to check the commit's signature. You can go ahead and 131 - /// walk the MST right away. 132 - Memory(Commit, Option<Rkey>, MemDriver), 133 - /// Blocks exceed the memory limit 134 - /// 135 - /// You'll need to provide a disk storage to continue. The commit will be 136 - /// returned and can be validated only once all blocks are loaded. 137 - Disk(NeedDisk<R>), 138 - } 139 - 140 - /// Processor that just returns the raw blocks 141 - #[inline] 142 - pub fn noop(block: Bytes) -> Bytes { 143 - block 45 + /// A partially memory-loaded CAR file that hit the memory limit mid-stream. 46 + /// 47 + /// Can be resumed with disk storage via `finish_loading`, or discarded. 48 + #[derive(Debug)] 49 + pub struct PartialCar<R: AsyncRead + Unpin> { 50 + pub(crate) car: CarReader<R>, 51 + pub(crate) root: Cid, 52 + pub(crate) process: fn(Bytes) -> Bytes, 53 + pub(crate) max_size: usize, 54 + pub(crate) blocks: HashMap<ObjectLink, MaybeProcessedBlock>, 55 + /// The commit block, if it was seen before the memory limit was reached 56 + pub commit: Option<Commit>, 144 57 } 145 58 146 59 /// Builder-style driver setup ··· 153 66 impl Default for DriverBuilder { 154 67 fn default() -> Self { 155 68 Self { 156 - mem_limit_mb: 16, 69 + mem_limit_mb: 10, 157 70 block_processor: noop, 158 71 } 159 72 } ··· 164 77 pub fn new() -> Self { 165 78 Default::default() 166 79 } 80 + 167 81 /// Set the in-memory size limit, in MiB 168 82 /// 169 - /// Default: 16 MiB 83 + /// Default: 10 MiB 170 84 pub fn with_mem_limit_mb(mut self, new_limit: usize) -> Self { 171 85 self.mem_limit_mb = new_limit; 172 86 self ··· 175 89 /// Set the block processor 176 90 /// 177 91 /// Default: noop, raw blocks will be emitted 178 - pub fn with_block_processor(mut self, new_processor: fn(Bytes) -> Bytes) -> DriverBuilder { 92 + pub fn with_block_processor(mut self, new_processor: fn(Bytes) -> Bytes) -> Self { 179 93 self.block_processor = new_processor; 180 94 self 181 95 } 182 96 183 - /// Begin processing an atproto MST from a CAR file 184 - pub async fn load_car<R: AsyncRead + Unpin>(&self, reader: R) -> Result<Driver<R>, DriveError> { 185 - Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await 186 - } 187 - } 188 - 189 - impl<R: AsyncRead + Unpin> Driver<R> { 190 - 191 - /// Begin processing an atproto MST from a CAR file 192 - /// 193 - /// Blocks will be loaded, processed, and buffered in memory. If the entire 194 - /// processed size is under the `mem_limit_mb` limit, a `Driver::Memory` 195 - /// will be returned along with a `Commit` ready for validation. 97 + /// Load an atproto repository CAR into memory. 196 98 /// 197 - /// If the `mem_limit_mb` limit is reached before loading all blocks, the 198 - /// partial state will be returned as `Driver::Disk(needed)`, which can be 199 - /// resumed by providing a `SqliteStorage` for on-disk block storage. 200 - pub async fn load_car( 99 + /// Returns a `MemCar` ready for walking. If the blocks exceed the memory 100 + /// limit, returns `Err(LoadError::MemoryLimitReached(partial))` containing 101 + /// the partial state, which can be resumed with disk storage. 102 + pub async fn load_car<R: AsyncRead + Unpin>( 103 + &self, 201 104 reader: R, 202 - process: fn(Bytes) -> Bytes, 203 - mem_limit_mb: usize, 204 - ) -> Result<Driver<R>, DriveError> { 205 - let mut block_count = 0; 105 + ) -> Result<MemCar, LoadError<R>> { 106 + load_car(reader, self.block_processor, self.mem_limit_mb).await 107 + } 108 + } 206 109 207 - let max_size = mem_limit_mb * 2_usize.pow(20); 208 - let mut mem_blocks = HashMap::new(); 110 + async fn load_car<R: AsyncRead + Unpin>( 111 + reader: R, 112 + process: fn(Bytes) -> Bytes, 113 + mem_limit_mb: usize, 114 + ) -> Result<MemCar, LoadError<R>> { 115 + let mut block_count = 0; 209 116 210 - let mut car = CarReader::new(reader).await?; 117 + let max_size = mem_limit_mb * 2_usize.pow(20); 118 + let mut mem_blocks = HashMap::new(); 211 119 212 - let roots = car.header().roots(); 213 - assert_eq!(roots.len(), 1); 120 + let mut car = CarReader::new(reader).await?; 214 121 215 - let root = *roots.first().ok_or(DriveError::MissingRoot)?; 216 - log::debug!("root: {root:?}"); 122 + let roots = car.header().roots(); 123 + assert_eq!(roots.len(), 1); 217 124 218 - let mut commit = None; 125 + let root = *roots.first().ok_or(LoadError::MissingRoot)?; 126 + log::debug!("root: {root:?}"); 219 127 220 - // try to load all the blocks into memory 221 - let mut mem_size = 0; 222 - while let Some((cid, data)) = car.next_block().await? { 223 - block_count += 1; 224 - // the root commit is a Special Third Kind of block that we need to make 225 - // sure not to optimistically send to the processing function 226 - if cid == root { 227 - let c: Commit = serde_ipld_dagcbor::from_slice(&data)?; 228 - commit = Some(c); 229 - continue; 230 - } 128 + let mut commit = None; 231 129 232 - // remaining possible types: node, record, other. optimistically process 233 - let maybe_processed = MaybeProcessedBlock::maybe(process, data); 130 + let mut mem_size = 0; 131 + while let Some((cid, data)) = car.next_block().await? { 132 + block_count += 1; 133 + // The root commit block is handled separately — never passed to the processor 134 + if cid == root { 135 + let c: Commit = serde_ipld_dagcbor::from_slice(&data)?; 136 + commit = Some(c); 137 + continue; 138 + } 234 139 235 - // stash (maybe processed) blocks in memory as long as we have room 236 - mem_size += maybe_processed.len(); 237 - mem_blocks.insert(cid.into(), maybe_processed); 238 - if mem_size >= max_size { 239 - log::debug!("blocks loaded before disk needed: {block_count}"); 140 + let maybe_processed = MaybeProcessedBlock::maybe(process, data); 240 141 241 - return Ok(Driver::Disk(NeedDisk { 242 - car, 243 - root, 244 - process, 245 - max_size, 246 - mem_blocks, 247 - commit, 248 - })); 249 - } 142 + mem_size += maybe_processed.len(); 143 + mem_blocks.insert(cid.into(), maybe_processed); 144 + if mem_size >= max_size { 145 + log::debug!("blocks loaded before memory limit: {block_count}"); 146 + return Err(LoadError::MemoryLimitReached(PartialCar { 147 + car, 148 + root, 149 + process, 150 + max_size, 151 + blocks: mem_blocks, 152 + commit, 153 + })); 250 154 } 155 + } 251 156 252 - log::debug!("blocks: {block_count}"); 157 + log::debug!("blocks: {block_count}"); 253 158 254 - // all blocks loaded and we fit in memory! hopefully we found the commit... 255 - let commit = commit.ok_or(DriveError::MissingCommit)?; 159 + let commit = commit.ok_or(LoadError::MissingCommit)?; 256 160 257 - // the commit always must point to a Node; empty node => empty MST special case 258 - let root_node: MstNode = match mem_blocks 259 - .get(&commit.data) 260 - .ok_or(DriveError::MissingCommit)? 261 - { 262 - MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?, 263 - MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(bytes)?, 264 - }; 265 - let mut walker = Walker::new(root_node); 161 + let root_node: MstNode = match mem_blocks 162 + .get(&commit.data) 163 + .ok_or(LoadError::MissingCommit)? 164 + { 165 + MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?, 166 + MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(bytes)?, 167 + }; 168 + let mut walker = Walker::new(root_node); 266 169 267 - // eprintln!("going to edge..."); 268 - let edge = walker.step_to_edge(&mem_blocks)?; 269 - // eprintln!("got edge? {edge:?}"); 170 + let prev_rkey = walker.step_to_edge(&mem_blocks)?; 270 171 271 - Ok(Driver::Memory( 272 - commit, 273 - edge, 274 - MemDriver { 275 - blocks: mem_blocks, 276 - walker, 277 - process, 278 - next_missing: None, 279 - }, 280 - )) 281 - } 172 + Ok(MemCar { 173 + commit, 174 + prev_rkey, 175 + blocks: mem_blocks, 176 + walker, 177 + process, 178 + next_missing: None, 179 + }) 282 180 } 283 181 284 - /// The core driver between the block stream and MST walker 285 - /// 286 - /// In the future, PDSs will export CARs in a stream-friendly order that will 287 - /// enable processing them with tiny memory overhead. But that future is not 288 - /// here yet. 289 - /// 290 - /// CARs are almost always in a stream-unfriendly order, so I'm reverting the 291 - /// optimistic stream features: we load all block first, then walk the MST. 292 - /// 293 - /// This makes things much simpler: we only need to worry about spilling to disk 294 - /// in one place, and we always have a reasonable expecatation about how much 295 - /// work the init function will do. We can drop the CAR reader before walking, 296 - /// so the sync/async boundaries become a little easier to work around. 182 + /// A fully loaded in-memory CAR file, ready for MST walking. 297 183 #[derive(Debug)] 298 - pub struct MemDriver { 184 + pub struct MemCar { 185 + pub commit: Commit, 186 + /// For CAR slices: the rkey of the last record before this slice's leading edge. 187 + /// `None` if this slice (or full CAR) starts from the leftmost record in the tree. 188 + pub prev_rkey: Option<Rkey>, 299 189 pub blocks: HashMap<ObjectLink, MaybeProcessedBlock>, 300 190 walker: Walker, 301 - process: fn(Bytes) -> Bytes, // TODO: impl Fn(bytes) -> Bytes? 191 + process: fn(Bytes) -> Bytes, 302 192 next_missing: Option<NodeThing>, 303 193 } 304 194 305 - impl MemDriver { 306 - pub fn viz(&self, tree: ObjectLink) -> Result<(), WalkError> { 307 - self.walker.viz(&self.blocks, tree) 195 + impl MemCar { 196 + 197 + /// Seek forward to the first record at or after `target`. 198 + /// 199 + /// Uses the MST structure to skip entire subtrees efficiently. 200 + /// After this returns, the next `next_chunk` call will start at or after `target`. 201 + pub fn seek(&mut self, target: &str) -> Result<(), WalkError> { 202 + self.walker.seek(target, &self.blocks) 203 + } 204 + 205 + /// Get the next record 206 + pub fn next(&mut self) -> Result<Option<Output>, WalkError> { 207 + todo!() 308 208 } 309 - /// Step through the record outputs, in rkey order 310 - pub async fn next_chunk(&mut self, n: usize) -> Result<Step<BlockChunk>, DriveError> { 209 + 210 + /// Iterate up to `n` records in rkey order. 211 + /// 212 + /// Returns `Step::Value(records)` while records remain, then `Step::End(next_rkey)` 213 + /// where `next_rkey` is the first rkey after the slice (for CAR slices), or `None`. 214 + pub fn next_chunk(&mut self, n: usize) -> Result<Step<BlockChunk>, WalkError> { 311 215 if let Some(ref mut missing) = self.next_missing { 312 216 while let Step::Value(sparse_out) = 313 217 self.walker.step_sparse(&self.blocks, self.process)? ··· 319 223 }; 320 224 } 321 225 } 322 - // TODO: l asdflkja slfkja lkdfj lakjd f 323 - // TODO: make the walker finish walking to verify no more present blocks (oops sparse tree) 324 - // HACK: just get the last rkey if it's there -- i think we might actually need to walk for it though 325 - // ...and walk to verify rkey order of the rest of the nodes anyway? 326 226 return Ok(match &missing.kind { 327 227 ThingKind::ChildNode => Step::End(None), 328 228 ThingKind::Record(rkey) => Step::End(Some(rkey.clone())), 329 229 }); 330 230 } 331 231 let mut out = Vec::with_capacity(n); 332 - // let mut err; 333 232 for _ in 0..n { 334 233 match self.walker.step(&self.blocks, self.process) { 335 234 Ok(Step::Value(record)) => out.push(record), 336 235 Ok(Step::End(None)) => break, 337 - Ok(Step::End(_)) => todo!("actually this should be unreachable?"), 236 + Ok(Step::End(_)) => unreachable!(), 338 237 Err(WalkError::MissingBlock(missing)) => { 339 238 self.next_missing = Some(*missing); 340 - return Ok(Step::Value(out)); // nb: might be empty! 239 + return Ok(Step::Value(out)); // may be empty 341 240 } 342 - Err(other) => return Err(other.into()), 241 + Err(other) => return Err(other), 343 242 } 344 243 } 345 244 if out.is_empty() { ··· 350 249 } 351 250 } 352 251 353 - /// A partially memory-loaded car file that needs disk spillover to continue 354 - pub struct NeedDisk<R: AsyncRead + Unpin> { 355 - car: CarReader<R>, 356 - root: Cid, 357 - process: fn(Bytes) -> Bytes, 358 - max_size: usize, 359 - mem_blocks: HashMap<ObjectLink, MaybeProcessedBlock>, 360 - pub commit: Option<Commit>, 252 + // --------------------------------------------------------------------------- 253 + // Disk path (kept for future wiring, not yet part of the primary API) 254 + // --------------------------------------------------------------------------- 255 + 256 + /// Errors that can happen while consuming blocks via the disk path 257 + #[derive(Debug, thiserror::Error)] 258 + pub enum DriveError { 259 + #[error("Error from iroh_car: {0}")] 260 + CarReader(#[from] iroh_car::Error), 261 + #[error("Failed to decode commit block: {0}")] 262 + BadBlock(#[from] serde_ipld_dagcbor::DecodeError<Infallible>), 263 + #[error("The Commit block reference by the root was not found")] 264 + MissingCommit, 265 + #[error("Failed to walk the mst tree: {0}")] 266 + WalkError(#[from] WalkError), 267 + #[error("CAR file had no roots")] 268 + MissingRoot, 269 + #[error("Storage error")] 270 + StorageError(#[from] DiskError), 271 + #[error("Tried to send on a closed channel")] 272 + ChannelSendError, 273 + #[error("Failed to join a task: {0}")] 274 + JoinError(#[from] tokio::task::JoinError), 361 275 } 362 276 363 - impl<R: AsyncRead + Unpin> NeedDisk<R> { 277 + impl From<MstError> for DriveError { 278 + fn from(me: MstError) -> DriveError { 279 + DriveError::WalkError(WalkError::MstError(me)) 280 + } 281 + } 282 + 283 + impl<R: AsyncRead + Unpin> PartialCar<R> { 364 284 pub async fn finish_loading( 365 285 mut self, 366 286 mut store: DiskStore, 367 287 ) -> Result<(Commit, Option<Rkey>, DiskDriver), DriveError> { 368 - // move store in and back out so we can manage lifetimes 369 - // dump mem blocks into the store 370 288 store = tokio::task::spawn(async move { 371 289 let kvs = self 372 - .mem_blocks 290 + .blocks 373 291 .into_iter() 374 292 .map(|(k, v)| (k.to_bytes(), v.into_bytes())); 375 293 ··· 388 306 store.put_many(kvs)?; 389 307 } 390 308 Ok::<_, DriveError>(store) 391 - }); // await later 309 + }); 392 310 393 - // dump the rest to disk (in chunks) 394 311 log::debug!("dumping the rest of the stream..."); 395 312 loop { 396 313 let mut mem_size = 0; ··· 399 316 let Some((cid, data)) = self.car.next_block().await? else { 400 317 break; 401 318 }; 402 - // we still gotta keep checking for the root since we might not have it 403 319 if cid == self.root { 404 320 let c: Commit = serde_ipld_dagcbor::from_slice(&data)?; 405 321 self.commit = Some(c); ··· 409 325 let link = cid.into(); 410 326 let data = Bytes::from(data); 411 327 412 - // remaining possible types: node, record, other. optimistically process 413 - // TODO: get the actual in-memory size to compute disk spill 414 328 let maybe_processed = MaybeProcessedBlock::maybe(self.process, data); 415 329 mem_size += maybe_processed.len(); 416 330 chunk.push((link, maybe_processed)); 417 331 if mem_size >= (self.max_size / 2) { 418 - // soooooo if we're setting the db cache to max_size and then letting 419 - // multiple chunks in the queue that are >= max_size, then at any time 420 - // we might be using some multiple of max_size? 421 332 break; 422 333 } 423 334 } ··· 498 409 pub async fn next_chunk(&mut self, n: usize) -> Result<Step<Vec<Output>>, DriveError> { 499 410 let process = self.process; 500 411 501 - // state should only *ever* be None transiently while inside here 502 412 let mut state = self.state.take().expect("DiskDriver must have Some(state)"); 503 413 504 - // the big pain here is that we don't want to leave self.state in an 505 - // invalid state (None), so all the error paths have to make sure it 506 - // comes out again. 507 414 let (state, res) = 508 415 tokio::task::spawn_blocking(move || -> (BigState, Result<BlockChunk, DriveError>) { 509 416 let mut out = Vec::with_capacity(n); 510 417 511 418 for _ in 0..n { 512 - // walk as far as we can until we run out of blocks or find a record 513 419 let step = match state.walker.disk_step(&state.store, process) { 514 420 Ok(s) => s, 515 421 Err(e) => { ··· 524 430 525 431 (state, Ok::<_, DriveError>(out)) 526 432 }) 527 - .await?; // on tokio JoinError, we'll be left with invalid state :( 433 + .await?; 528 434 529 - // *must* restore state before dealing with the actual result 530 435 self.state = Some(state); 531 436 532 437 let out = res?; ··· 549 454 let mut out: BlockChunk = Vec::with_capacity(n); 550 455 551 456 for _ in 0..n { 552 - // walk as far as we can until we run out of blocks or find a record 553 - 554 457 let step = match walker.disk_step(store, self.process) { 555 458 Ok(s) => s, 556 459 Err(e) => return tx.blocking_send(Err(e.into())), ··· 573 476 574 477 /// Spawn the disk reading task into a tokio blocking thread 575 478 /// 576 - /// The idea is to avoid so much sending back and forth to the blocking 577 - /// thread, letting a blocking task do all the disk reading work and sending 578 - /// records and rkeys back through an `mpsc` channel instead. 579 - /// 580 - /// This might also allow the disk work to continue while processing the 581 - /// records. It's still not yet clear if this method actually has much 582 - /// benefit over just using `.next_chunk(n)`. 583 - /// 584 479 /// ```no_run 585 480 /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, Step, noop}; 586 481 /// # #[tokio::main] ··· 607 502 ) { 608 503 let (tx, rx) = mpsc::channel::<Result<Step<BlockChunk>, DriveError>>(1); 609 504 610 - // sketch: this worker is going to be allowed to execute without a join handle 611 505 let chan_task = tokio::task::spawn_blocking(move || { 612 506 if let Err(mpsc::error::SendError(_)) = self.read_tx_blocking(n, tx) { 613 507 log::debug!("big car reader exited early due to dropped receiver channel"); ··· 619 513 } 620 514 621 515 /// Reset the disk storage so it can be reused. 622 - /// 623 - /// The store is returned, so it can be reused for another `DiskDriver`. 624 516 pub async fn reset_store(mut self) -> Result<DiskStore, DriveError> { 625 517 let BigState { store, .. } = self.state.take().expect("valid state"); 626 518 store.reset().await?;
+48 -50
src/lib.rs
··· 1 1 /*! 2 2 A robust CAR file -> MST walker for atproto 3 3 4 - Small CARs have their blocks buffered in memory. If a configurable memory limit 5 - is reached while reading blocks, CAR reading is suspended, and can be continued 6 - by providing disk storage to buffer the CAR blocks instead. 4 + Blocks are buffered in memory up to a configurable limit (default 10 MiB). 5 + If the limit is reached, `load_car` returns `Err(LoadError::MemoryLimitReached(partial))` 6 + containing the partial state, which can later be resumed with disk storage. 7 7 8 - A `process` function can be provided for tasks where records are transformed 9 - into a smaller representation, to save memory (and disk) during block reading. 8 + A `block_processor` function can be provided for tasks where records are 9 + transformed into a smaller representation to save memory. 10 10 11 - Once blocks are loaded, the MST is walked and emitted as chunks of pairs of 12 - `(rkey, processed_block)` pairs, in order (depth first, left-to-right). 11 + Once blocks are loaded, the MST is walked and emitted as chunks of 12 + `(rkey, cid, processed_block)` records in left-to-right order. 13 13 14 - Some MST validations are applied 14 + Some MST validations are applied: 15 15 - Keys must appear in order 16 16 - Keys must be at the correct MST tree depth 17 17 18 18 `iroh_car` additionally applies a block size limit of `2MiB`. 19 19 20 20 ``` 21 - use repo_stream::{Driver, DriverBuilder, DiskBuilder, Step}; 21 + use repo_stream::{DriverBuilder, Step}; 22 22 23 23 # #[tokio::main] 24 24 # async fn main() -> Result<(), Box<dyn std::error::Error>> { 25 25 # let reader = include_bytes!("../car-samples/tiny.car").as_slice(); 26 26 let mut total_size = 0; 27 27 28 - match DriverBuilder::new() 28 + let mut mem_car = DriverBuilder::new() 29 29 .with_mem_limit_mb(10) 30 - .with_block_processor( 31 - |rec| rec.len().to_ne_bytes().to_vec() 32 - ) // block processing: just extract the raw record size 30 + .with_block_processor(|rec| rec.len().to_ne_bytes().to_vec()) 33 31 .load_car(reader) 34 - .await? 35 - { 32 + .await?; 36 33 37 - // if all blocks fit within memory 38 - Driver::Memory(_commit, _prev_rkey, mut driver) => { 39 - while let Step::Value(chunk) = driver.next_chunk(256).await? { 40 - for output in chunk { 41 - let size = usize::from_ne_bytes(output.data.try_into().unwrap()); 42 - 43 - total_size += size; 44 - } 45 - } 46 - }, 34 + while let Step::Value(chunk) = mem_car.next_chunk(256)? { 35 + for output in chunk { 36 + let size = usize::from_ne_bytes(output.data.try_into().unwrap()); 37 + total_size += size; 38 + } 39 + } 40 + println!("sum of size of all records: {total_size}"); 41 + # Ok(()) 42 + # } 43 + ``` 47 44 48 - // if the CAR was too big for in-memory processing 49 - Driver::Disk(paused) => { 50 - // set up a disk store we can spill to 51 - let store = DiskBuilder::new().open("some/path.db".into()).await?; 52 - // do the spilling, get back a (similar) driver 53 - let (_commit, _prev_rkey, mut driver) = paused.finish_loading(store).await?; 45 + If the CAR is too large for memory, handle the `MemoryLimitReached` error: 54 46 55 - while let Step::Value(chunk) = driver.next_chunk(256).await? { 56 - for output in chunk { 57 - let size = usize::from_ne_bytes(output.data.try_into().unwrap()); 47 + ```no_run 48 + use repo_stream::{DriverBuilder, LoadError, Step}; 58 49 59 - total_size += size; 60 - } 50 + # #[tokio::main] 51 + # async fn main() -> Result<(), Box<dyn std::error::Error>> { 52 + # let reader = tokio::io::stdin(); 53 + match DriverBuilder::new() 54 + .with_mem_limit_mb(10) 55 + .load_car(reader) 56 + .await 57 + { 58 + Ok(mut mem_car) => { 59 + while let Step::Value(chunk) = mem_car.next_chunk(256)? { 60 + // process records 61 61 } 62 62 } 63 - }; 64 - println!("sum of size of all records: {total_size}"); 63 + Err(LoadError::MemoryLimitReached(partial)) => { 64 + // resume with disk storage (see DiskBuilder) 65 + eprintln!("CAR too large for memory"); 66 + } 67 + Err(e) => return Err(e.into()), 68 + } 65 69 # Ok(()) 66 70 # } 67 71 ``` 68 72 69 - Disk spilling suspends and returns a `Driver::Disk(paused)` instead of going 70 - ahead and eagerly using disk I/O. This means you have to write a bit more code 71 - to handle both cases, but it allows you to have finer control over resource 72 - usage. For example, you can drive a number of parallel memory CAR workers, and 73 - separately have a different number of disk workers picking up suspended disk 74 - tasks from a queue. 75 - 76 73 Find more [examples in the repo](https://tangled.org/@microcosm.blue/repo-stream/tree/main/examples). 77 74 78 75 */ 79 76 80 - pub mod mst; 81 - mod walk; 82 - 77 + pub mod block; 83 78 pub mod disk; 84 79 pub mod drive; 85 80 pub mod link; 81 + pub mod mst; 82 + pub mod walk; 86 83 87 84 pub use disk::{DiskBuilder, DiskError, DiskStore}; 88 - pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk, noop}; 85 + pub use block::noop; 86 + pub use drive::{DriveError, DriverBuilder, LoadError, MemCar, PartialCar}; 89 87 pub use link::NodeThing; 90 88 pub use mst::Commit; 91 - pub use walk::{Output, Step}; 89 + pub use walk::{Output, Step, WalkError}; 92 90 93 91 pub type Bytes = Vec<u8>; 94 92
+101 -124
src/walk.rs
··· 2 2 3 3 use crate::link::{NodeThing, ObjectLink, ThingKind}; 4 4 use crate::mst::{Depth, MstNode}; 5 - use crate::{Bytes, HashMap, Rkey, disk::DiskStore, drive::MaybeProcessedBlock, noop}; 5 + use crate::{Bytes, HashMap, Rkey, disk::DiskStore, block::MaybeProcessedBlock, noop}; 6 6 use cid::Cid; 7 7 use std::convert::Infallible; 8 8 ··· 88 88 } 89 89 } 90 90 91 - pub fn viz( 92 - &self, 93 - blocks: &HashMap<ObjectLink, MaybeProcessedBlock>, 94 - root_link: ObjectLink, 95 - ) -> Result<(), WalkError> { 96 - let root_block = blocks.get(&root_link).ok_or(WalkError::MissingBlock( 97 - NodeThing { 98 - link: root_link.clone(), 99 - kind: ThingKind::ChildNode, 100 - } 101 - .into(), 102 - ))?; 103 - 104 - let root_node: MstNode = match root_block { 105 - MaybeProcessedBlock::Processed(_) => return Err(WalkError::BadCommitFingerprint), 106 - MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(bytes)?, 107 - }; 108 - 109 - let mut positions = HashMap::new(); 110 - let mut w = Walker::new(root_node.clone()); 111 - 112 - let mut pos_idx = 0; 113 - while let Step::Value(Output { rkey, .. }) = w.step_sparse(blocks, noop)? { 114 - positions.insert(rkey, pos_idx); 115 - pos_idx += 1; 116 - } 117 - 118 - Self::vnext( 119 - root_node.depth.unwrap(), 120 - vec![root_link], 121 - blocks, 122 - &positions, 123 - )?; 124 - 125 - Ok(()) 126 - } 127 - 128 - pub fn vnext( 129 - level: u32, 130 - links: Vec<ObjectLink>, 131 - blocks: &HashMap<ObjectLink, MaybeProcessedBlock>, 132 - positions: &HashMap<Rkey, usize>, 133 - ) -> Result<Vec<usize>, WalkError> { 134 - let mut offsets = Vec::new(); 135 - let mut level_keys = Vec::new(); 136 - let mut child_links = Vec::new(); 137 - 138 - for link in links { 139 - println!( 140 - "\n{level}~{}..", 141 - link.to_bytes() 142 - .iter() 143 - .take(5) 144 - .map(|c| format!("{c:02x}")) 145 - .collect::<Vec<_>>() 146 - .join("") 147 - ); 148 - 149 - let Some(mpb) = blocks.get(&link) else { 150 - // TODO: drop an 'x' for missing node 151 - continue; 152 - }; 153 - let node: MstNode = match mpb { 154 - MaybeProcessedBlock::Processed(_) => return Err(WalkError::BadCommitFingerprint), 155 - MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(bytes)?, 156 - }; 157 - 158 - let mut last_key = "".to_string(); 159 - let mut last_was_record = true; 160 - for thing in node.things { 161 - let mut node_keys = Vec::new(); 162 - 163 - let has = blocks.contains_key(&thing.link); 164 - 165 - match thing.kind { 166 - ThingKind::ChildNode => { 167 - if has { 168 - child_links.push(thing.link); 169 - last_was_record = false; 170 - } 171 - } 172 - ThingKind::Record(key) => { 173 - let us = positions[&key]; 174 - 175 - if !last_was_record && last_key.is_empty() { 176 - let them = positions[&last_key]; 177 - for i in 0..(them - 1) { 178 - if i < (us + 1) { 179 - print!(" "); 180 - } else { 181 - print!("~~"); 182 - } 183 - } 184 - println!("~"); 185 - } 186 - 187 - for _ in 0..us { 188 - print!(" "); 189 - } 190 - if has { 191 - print!("O"); 192 - } else { 193 - print!("x"); 194 - } 195 - println!(" {key}"); 196 - node_keys.push(key.clone()); 197 - last_key = key; 198 - last_was_record = true; 199 - } 200 - } 201 - level_keys.push(node_keys); 202 - } 203 - 204 - offsets.push(1); 205 - } 206 - 207 - if !child_links.is_empty() { 208 - Self::vnext(level - 1, child_links, blocks, positions)?; // TODO use offsets 209 - } 210 - 211 - Ok(offsets) 212 - } 213 - 214 91 fn mpb_step( 215 92 &mut self, 216 93 thing: NodeThing, ··· 369 246 Ok(z) => { 370 247 log::info!("apparently we are too far at {z:?}"); 371 248 return Ok(rkey_prev); // oop real record, mutant went too far 249 + } 250 + } 251 + } 252 + } 253 + 254 + /// Skip forward to the first record at or after `target`, without emitting anything. 255 + /// 256 + /// Uses the tree structure to skip entire subtrees that are provably before `target`, 257 + /// only loading child nodes on the path to `target`. O(depth × branching_factor). 258 + /// 259 + /// After this returns `Ok(())`, the next call to `step` will yield the first record 260 + /// at or after `target`, or `Step::End` if no such record exists. 261 + pub fn seek( 262 + &mut self, 263 + target: &str, 264 + blocks: &HashMap<ObjectLink, MaybeProcessedBlock>, 265 + ) -> Result<(), WalkError> { 266 + // Classify what to do next without holding a borrow through the action 267 + enum SeekStep { 268 + Done, 269 + EmptyLevel, 270 + SkipRecord(Rkey), 271 + SkipSubtree, 272 + Descend, 273 + } 274 + 275 + loop { 276 + let next = match self.todo.last() { 277 + None => return Ok(()), 278 + Some(level) => { 279 + let n = level.len(); 280 + if n == 0 { 281 + SeekStep::EmptyLevel 282 + } else { 283 + match &level[n - 1].kind { 284 + ThingKind::Record(k) if k.as_str() >= target => SeekStep::Done, 285 + ThingKind::Record(k) => SeekStep::SkipRecord(k.clone()), 286 + ThingKind::ChildNode => { 287 + // The right-bounding record for this child node is at n-2. 288 + // All keys in this subtree are < right_bound, so we can skip 289 + // the whole subtree if right_bound <= target. 290 + let can_skip = n >= 2 291 + && matches!( 292 + &level[n - 2].kind, 293 + ThingKind::Record(k) if k.as_str() <= target 294 + ); 295 + if can_skip { 296 + SeekStep::SkipSubtree 297 + } else { 298 + SeekStep::Descend 299 + } 300 + } 301 + } 302 + } 303 + } 304 + }; // borrow of self.todo released here 305 + 306 + match next { 307 + SeekStep::Done => return Ok(()), 308 + SeekStep::EmptyLevel => { 309 + self.todo.pop(); 310 + } 311 + SeekStep::SkipRecord(key) => { 312 + self.todo.last_mut().unwrap().pop(); 313 + self.prev_rkey = key; 314 + } 315 + SeekStep::SkipSubtree => { 316 + self.todo.last_mut().unwrap().pop(); 317 + } 318 + SeekStep::Descend => { 319 + let child = self.todo.last_mut().unwrap().pop().unwrap(); 320 + // Note: self.todo borrow released before push below 321 + 322 + let Some(mpb) = blocks.get(&child.link) else { 323 + return Err(WalkError::MissingBlock(child.into())); 324 + }; 325 + let MaybeProcessedBlock::Raw(data) = mpb else { 326 + return Err(WalkError::BadCommitFingerprint); 327 + }; 328 + let node: MstNode = 329 + serde_ipld_dagcbor::from_slice(data).map_err(WalkError::BadCommit)?; 330 + if node.is_empty() { 331 + return Err(WalkError::MstError(MstError::EmptyNode)); 332 + } 333 + // Depth validation mirrors mpb_step: todo still has the (possibly empty) 334 + // parent level, so todo.len()-1 is the parent's depth delta from root. 335 + let current_depth = self.root_depth - (self.todo.len() - 1) as u32; 336 + let next_depth = current_depth 337 + .checked_sub(1) 338 + .ok_or(MstError::DepthUnderflow)?; 339 + if let Some(d) = node.depth 340 + && d != next_depth 341 + { 342 + return Err(WalkError::MstError(MstError::WrongDepth { 343 + depth: d, 344 + expected: next_depth, 345 + })); 346 + } 347 + self.links += node.things.len(); 348 + self.todo.push(node.things); 372 349 } 373 350 } 374 351 }
+11 -13
tests/car-slices.rs
··· 1 1 extern crate repo_stream; 2 - use repo_stream::{Driver, Output, Step}; 2 + use repo_stream::{DriverBuilder, LoadError, Output, Step}; 3 3 4 4 const RECORD_SLICE: &'static [u8] = include_bytes!("../car-samples/slice-one.car"); 5 5 const RECORD_NODE_FIRST_KEY: &'static [u8] = ··· 16 16 expect_rkey: Option<&str>, 17 17 expect_proceeding: Option<&str>, 18 18 ) { 19 - let (mut driver, before) = match Driver::load_car( 20 - bytes, 21 - |block| block.len().to_ne_bytes().to_vec(), 22 - 10, /* MiB */ 23 - ) 24 - .await 25 - .unwrap() 19 + let mut mem_car = match DriverBuilder::new() 20 + .with_block_processor(|block| block.len().to_ne_bytes().to_vec()) 21 + .load_car(bytes) 22 + .await 26 23 { 27 - Driver::Memory(_commit, before, mem_driver) => (mem_driver, before), 28 - Driver::Disk(_) => panic!("too big"), 24 + Ok(mc) => mc, 25 + Err(LoadError::MemoryLimitReached(_)) => panic!("too big"), 26 + Err(e) => panic!("{e}"), 29 27 }; 30 28 31 - assert_eq!(before.as_deref(), expect_preceeding); 29 + assert_eq!(mem_car.prev_rkey.as_deref(), expect_preceeding); 32 30 33 31 let mut found_records = 0; 34 32 let mut sum = 0; 35 33 let mut found_expected_rkey = false; 36 34 let mut prev_rkey = "".to_string(); 37 35 38 - while let Ok(step) = driver.next_chunk(256).await { 39 - match step { 36 + loop { 37 + match mem_car.next_chunk(256).unwrap() { 40 38 Step::Value(records) => { 41 39 for Output { rkey, cid: _, data } in records { 42 40 found_records += 1;
+8 -13
tests/non-huge-cars.rs
··· 1 1 extern crate repo_stream; 2 - use repo_stream::{Driver, Output, Step}; 2 + use repo_stream::{DriverBuilder, Output, Step}; 3 3 4 4 const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car"); 5 5 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); ··· 12 12 expected_sum: usize, 13 13 expect_profile: bool, 14 14 ) { 15 - let mut driver = match Driver::load_car( 16 - bytes, 17 - |block| block.len().to_ne_bytes().to_vec(), 18 - 10, /* MiB */ 19 - ) 20 - .await 21 - .unwrap() 22 - { 23 - Driver::Memory(_commit, _, mem_driver) => mem_driver, 24 - Driver::Disk(_) => panic!("too big"), 25 - }; 15 + let mut mem_car = DriverBuilder::new() 16 + .with_mem_limit_mb(10) 17 + .with_block_processor(|block| block.len().to_ne_bytes().to_vec()) 18 + .load_car(bytes) 19 + .await 20 + .expect("should fit in memory"); 26 21 27 22 let mut records = 0; 28 23 let mut sum = 0; 29 24 let mut found_bsky_profile = false; 30 25 let mut prev_rkey = "".to_string(); 31 26 32 - while let Step::Value(pairs) = driver.next_chunk(256).await.unwrap() { 27 + while let Step::Value(pairs) = mem_car.next_chunk(256).unwrap() { 33 28 for Output { rkey, cid: _, data } in pairs { 34 29 records += 1; 35 30