···3232 log::info!("got commit: {:?}", mem_car.commit);
33333434 while let Step::Value(records) = mem_car.next_chunk(256)? {
3535- for Output { key: _, cid: _, data: _ } in records {
3535+ for Output {
3636+ key: _,
3737+ cid: _,
3838+ data: _,
3939+ } in records
4040+ {
3641 // process records
3742 }
3843 }
-85
src/block.rs
···11-use crate::{Bytes, mst::MstNode};
22-33-#[derive(Debug, Clone)]
44-pub enum MaybeProcessedBlock {
55- /// A block that's *probably* a Node (but we can't know yet)
66- ///
77- /// It *can be* a record that suspiciously looks a lot like a node, so we
88- /// cannot eagerly turn it into a Node. We only know for sure what it is
99- /// when we actually walk down the MST
1010- Raw(Bytes),
1111- /// A processed record from a block that was definitely not a Node
1212- ///
1313- /// Processing has to be fallible because the CAR can have totally-unused
1414- /// blocks, which can just be garbage. since we're eagerly trying to process
1515- /// record blocks without knowing for sure that they *are* records, we
1616- /// discard any definitely-not-nodes that fail processing and keep their
1717- /// error in the buffer for them. if we later try to retreive them as a
1818- /// record, then we can surface the error.
1919- ///
2020- /// If we _never_ needed this block, then we may have wasted a bit of effort
2121- /// trying to process it. Oh well.
2222- ///
2323- /// There's an alternative here, which would be to kick unprocessable blocks
2424- /// back to Raw, or maybe even a new RawUnprocessable variant. Then we could
2525- /// surface the typed error later if needed by trying to reprocess.
2626- Processed(Bytes),
2727-}
2828-2929-impl MaybeProcessedBlock {
3030- pub fn to_node(&self) -> Option<MstNode> {
3131- let Self::Raw(bytes) = self else {
3232- return None;
3333- };
3434- serde_ipld_dagcbor::from_slice(bytes).ok()
3535- }
3636- pub fn unknown_depth(&self) -> bool {
3737- let Self::Raw(bytes) = self else {
3838- return false;
3939- };
4040- let Ok(node) = serde_ipld_dagcbor::from_slice::<MstNode>(bytes) else {
4141- return false;
4242- };
4343- node.depth.is_none()
4444- }
4545- pub(crate) fn maybe(process: fn(Bytes) -> Bytes, data: Bytes) -> Self {
4646- if MstNode::could_be(&data) {
4747- MaybeProcessedBlock::Raw(data)
4848- } else {
4949- MaybeProcessedBlock::Processed(process(data))
5050- }
5151- }
5252- pub(crate) fn len(&self) -> usize {
5353- match self {
5454- MaybeProcessedBlock::Raw(b) => b.len(),
5555- MaybeProcessedBlock::Processed(b) => b.len(),
5656- }
5757- }
5858- pub(crate) fn into_bytes(self) -> Bytes {
5959- match self {
6060- MaybeProcessedBlock::Raw(mut b) => {
6161- b.push(0x00);
6262- b
6363- }
6464- MaybeProcessedBlock::Processed(mut b) => {
6565- b.push(0x01);
6666- b
6767- }
6868- }
6969- }
7070- pub(crate) fn from_bytes(mut b: Bytes) -> Self {
7171- // TODO: make sure bytes is not empty, that it's explicitly 0 or 1, etc
7272- let suffix = b.pop().unwrap();
7373- if suffix == 0x00 {
7474- MaybeProcessedBlock::Raw(b)
7575- } else {
7676- MaybeProcessedBlock::Processed(b)
7777- }
7878- }
7979-}
8080-8181-/// Processor that just returns the raw blocks
8282-#[inline]
8383-pub fn noop(block: Bytes) -> Bytes {
8484- block
8585-}
+254-13
src/disk.rs
···11/*!
22-Disk storage for blocks on disk
33-44-Currently this uses sqlite. In testing sqlite wasn't the fastest, but it seemed
55-to be the best behaved in terms of both on-disk space usage and memory usage.
22+Disk storage and disk-based MST walking.
6374```no_run
85# use repo_stream::{DiskBuilder, DiskError};
···1714```
1815*/
19162020-use crate::{Bytes, drive::DriveError};
1717+use crate::{
1818+ Bytes, Step,
1919+ mst::ThingKind,
2020+ walk::{MaybeProcessedBlock, MstError, Output, WalkError, WalkItem, Walker},
2121+};
2122use fjall::{Database, Error as FjallError, Keyspace, KeyspaceCreateOptions};
2323+use std::convert::Infallible;
2224use std::path::PathBuf;
2525+use thiserror::Error;
2626+use tokio::sync::mpsc;
2727+2828+// ---------------------------------------------------------------------------
2929+// Disk storage errors
3030+// ---------------------------------------------------------------------------
23312432#[derive(Debug, thiserror::Error)]
2533pub enum DiskError {
2634 /// A wrapped database error
2727- ///
2828- /// (The wrapped err should probably be obscured to remove public-facing
2929- /// sqlite bits)
3035 #[error(transparent)]
3136 DbError(#[from] FjallError),
3237 /// A tokio blocking task failed to join
3338 #[error("Failed to join a tokio blocking task: {0}")]
3439 JoinError(#[from] tokio::task::JoinError),
3540 /// The total size of stored blocks exceeded the allowed size
3636- ///
3737- /// If you need to process *really* big CARs, you can configure a higher
3838- /// limit.
3941 #[error("Maximum disk size reached")]
4042 MaxSizeExceeded,
4143}
42444545+// ---------------------------------------------------------------------------
4646+// Disk driver errors
4747+// ---------------------------------------------------------------------------
4848+4949+/// Errors that can happen while consuming blocks via the disk path
5050+#[derive(Debug, Error)]
5151+pub enum DriveError {
5252+ #[error("Error from iroh_car: {0}")]
5353+ CarReader(#[from] iroh_car::Error),
5454+ #[error("Failed to decode commit block: {0}")]
5555+ BadBlock(#[from] serde_ipld_dagcbor::DecodeError<Infallible>),
5656+ #[error("The Commit block referenced by the root was not found")]
5757+ MissingCommit,
5858+ #[error("Failed to walk the MST: {0}")]
5959+ WalkError(#[from] WalkError),
6060+ #[error("CAR file had no roots")]
6161+ MissingRoot,
6262+ #[error("Storage error: {0}")]
6363+ StorageError(#[from] DiskError),
6464+ #[error("Unexpected missing block: {0:?}")]
6565+ MissingBlock(cid::Cid),
6666+ #[error("Tried to send on a closed channel")]
6767+ ChannelSendError,
6868+ #[error("Failed to join a task: {0}")]
6969+ JoinError(#[from] tokio::task::JoinError),
7070+}
7171+7272+impl From<MstError> for DriveError {
7373+ fn from(me: MstError) -> DriveError {
7474+ DriveError::WalkError(WalkError::MstError(me))
7575+ }
7676+}
7777+7878+// ---------------------------------------------------------------------------
7979+// Disk store
8080+// ---------------------------------------------------------------------------
8181+4382/// Builder-style disk store setup
4483#[derive(Debug, Clone)]
4584pub struct DiskBuilder {
···134173 pub(crate) fn put_many(
135174 &mut self,
136175 kv: impl Iterator<Item = (Vec<u8>, Bytes)>,
137137- ) -> Result<(), DriveError> {
176176+ ) -> Result<(), DiskError> {
138177 let mut batch = self.db.batch();
139178 for (k, v) in kv {
140179 self.stored += v.len();
141180 if self.stored > self.max_stored {
142142- return Err(DiskError::MaxSizeExceeded.into());
181181+ return Err(DiskError::MaxSizeExceeded);
143182 }
144183 batch.insert(&self.keyspace, k, v);
145184 }
···158197 Ok(tokio::task::spawn_blocking(move || keyspace.clear()).await??)
159198 }
160199}
200200+201201+// ---------------------------------------------------------------------------
202202+// disk_step on Walker (impl in this module to avoid walk.rs → disk.rs dep)
203203+// ---------------------------------------------------------------------------
204204+205205+impl Walker {
206206+ /// blocking!!!!!
207207+ pub(crate) fn disk_step(
208208+ &mut self,
209209+ blocks: &DiskStore,
210210+ process: impl Fn(Bytes) -> Bytes,
211211+ ) -> Result<Option<WalkItem>, WalkError> {
212212+ while let Some(thing) = self.next_todo() {
213213+ let Some(block_slice) = blocks.get(&thing.link.to_bytes())? else {
214214+ return Ok(Some(match thing.kind {
215215+ ThingKind::Record(key) => WalkItem::MissingRecord {
216216+ key,
217217+ cid: thing.link.into(),
218218+ },
219219+ ThingKind::ChildNode => WalkItem::MissingSubtree {
220220+ cid: thing.link.into(),
221221+ },
222222+ }));
223223+ };
224224+ let mpb = MaybeProcessedBlock::from_bytes(block_slice.to_vec());
225225+ if let Some(out) = self.mpb_step(thing, &mpb, &process)? {
226226+ return Ok(Some(WalkItem::Record(out)));
227227+ }
228228+ }
229229+ Ok(None)
230230+ }
231231+}
232232+233233+// ---------------------------------------------------------------------------
234234+// Disk driver
235235+// ---------------------------------------------------------------------------
236236+237237+struct BigState {
238238+ store: DiskStore,
239239+ walker: Walker,
240240+}
241241+242242+/// MST walker that reads from disk instead of an in-memory hashmap
243243+pub struct DiskDriver {
244244+ process: fn(Bytes) -> Bytes,
245245+ state: Option<BigState>,
246246+}
247247+248248+// for doctests only
249249+#[doc(hidden)]
250250+pub fn _get_fake_disk_driver() -> DiskDriver {
251251+ DiskDriver {
252252+ process: crate::walk::noop,
253253+ state: None,
254254+ }
255255+}
256256+257257+impl DiskDriver {
258258+ /// Walk the MST returning up to `n` key + record pairs
259259+ ///
260260+ /// ```no_run
261261+ /// # use repo_stream::{disk::{DiskDriver, DriveError, _get_fake_disk_driver}, Step};
262262+ /// # #[tokio::main]
263263+ /// # async fn main() -> Result<(), DriveError> {
264264+ /// # let mut disk_driver = _get_fake_disk_driver();
265265+ /// while let Step::Value(outputs) = disk_driver.next_chunk(256).await? {
266266+ /// for output in outputs {
267267+ /// println!("{}: size={}", output.key, output.data.len());
268268+ /// }
269269+ /// }
270270+ /// # Ok(())
271271+ /// # }
272272+ /// ```
273273+ pub async fn next_chunk(&mut self, n: usize) -> Result<Step<Vec<Output>>, DriveError> {
274274+ let process = self.process;
275275+276276+ let mut state = self.state.take().expect("DiskDriver must have Some(state)");
277277+278278+ let (state, res) =
279279+ tokio::task::spawn_blocking(move || -> (BigState, Result<Vec<Output>, DriveError>) {
280280+ let mut out = Vec::with_capacity(n);
281281+282282+ for _ in 0..n {
283283+ match state.walker.disk_step(&state.store, process) {
284284+ Err(e) => return (state, Err(e.into())),
285285+ Ok(Some(WalkItem::Record(output))) => out.push(output),
286286+ Ok(Some(WalkItem::MissingRecord { cid, .. }))
287287+ | Ok(Some(WalkItem::MissingSubtree { cid })) => {
288288+ return (state, Err(DriveError::MissingBlock(cid)));
289289+ }
290290+ Ok(None) => break,
291291+ }
292292+ }
293293+294294+ (state, Ok::<_, DriveError>(out))
295295+ })
296296+ .await?;
297297+298298+ self.state = Some(state);
299299+300300+ let out = res?;
301301+302302+ if out.is_empty() {
303303+ Ok(Step::End(None))
304304+ } else {
305305+ Ok(Step::Value(out))
306306+ }
307307+ }
308308+309309+ fn read_tx_blocking(
310310+ &mut self,
311311+ n: usize,
312312+ tx: mpsc::Sender<Result<Step<Vec<Output>>, DriveError>>,
313313+ ) -> Result<(), mpsc::error::SendError<Result<Step<Vec<Output>>, DriveError>>> {
314314+ let BigState { store, walker } = self.state.as_mut().expect("valid state");
315315+316316+ loop {
317317+ let mut out: Vec<Output> = Vec::with_capacity(n);
318318+319319+ for _ in 0..n {
320320+ match walker.disk_step(store, self.process) {
321321+ Err(e) => return tx.blocking_send(Err(e.into())),
322322+ Ok(Some(WalkItem::Record(output))) => out.push(output),
323323+ Ok(Some(WalkItem::MissingRecord { cid, .. }))
324324+ | Ok(Some(WalkItem::MissingSubtree { cid })) => {
325325+ return tx.blocking_send(Err(DriveError::MissingBlock(cid)));
326326+ }
327327+ Ok(None) => break,
328328+ }
329329+ }
330330+331331+ if out.is_empty() {
332332+ break;
333333+ }
334334+ tx.blocking_send(Ok(Step::Value(out)))?;
335335+ }
336336+337337+ Ok(())
338338+ }
339339+340340+ /// Spawn the disk reading task into a tokio blocking thread
341341+ ///
342342+ /// ```no_run
343343+ /// # use repo_stream::{disk::{DiskDriver, DriveError, _get_fake_disk_driver}, Step};
344344+ /// # #[tokio::main]
345345+ /// # async fn main() -> Result<(), DriveError> {
346346+ /// # let mut disk_driver = _get_fake_disk_driver();
347347+ /// let (mut rx, join) = disk_driver.to_channel(512);
348348+ /// while let Some(recvd) = rx.recv().await {
349349+ /// let outputs = recvd?;
350350+ /// let Step::Value(outputs) = outputs else { break; };
351351+ /// for output in outputs {
352352+ /// println!("{}: size={}", output.key, output.data.len());
353353+ /// }
354354+ ///
355355+ /// }
356356+ /// # Ok(())
357357+ /// # }
358358+ /// ```
359359+ pub fn to_channel(
360360+ mut self,
361361+ n: usize,
362362+ ) -> (
363363+ mpsc::Receiver<Result<Step<Vec<Output>>, DriveError>>,
364364+ tokio::task::JoinHandle<Self>,
365365+ ) {
366366+ let (tx, rx) = mpsc::channel::<Result<Step<Vec<Output>>, DriveError>>(1);
367367+368368+ let chan_task = tokio::task::spawn_blocking(move || {
369369+ if let Err(mpsc::error::SendError(_)) = self.read_tx_blocking(n, tx) {
370370+ log::debug!("big car reader exited early due to dropped receiver channel");
371371+ }
372372+ self
373373+ });
374374+375375+ (rx, chan_task)
376376+ }
377377+378378+ /// Reset the disk storage so it can be reused.
379379+ pub async fn reset_store(mut self) -> Result<DiskStore, DriveError> {
380380+ let BigState { store, .. } = self.state.take().expect("valid state");
381381+ store.reset().await?;
382382+ Ok(store)
383383+ }
384384+}
385385+386386+// ---------------------------------------------------------------------------
387387+// PartialCar::finish_loading lives in mem.rs but needs DiskDriver — it's
388388+// imported there from this module.
389389+// ---------------------------------------------------------------------------
390390+391391+/// Build a `DiskDriver` from a walker and store. Used by `PartialCar::finish_loading`.
392392+pub(crate) fn make_disk_driver(
393393+ store: DiskStore,
394394+ walker: Walker,
395395+ process: fn(Bytes) -> Bytes,
396396+) -> DiskDriver {
397397+ DiskDriver {
398398+ process,
399399+ state: Some(BigState { store, walker }),
400400+ }
401401+}
+1-3
src/drive.rs
···3838 /// The partial state is returned so the caller can decide what to do
3939 /// (e.g. resume with disk storage via `PartialCar::finish_loading`).
4040 #[error("partially loaded car")]
4141- MemoryLimitReached(PartialCar<R>),
4141+ MemoryLimitReached(Box<PartialCar<R>>),
4242}
43434444···166166 MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(bytes)?,
167167 };
168168 let mut walker = Walker::new(root_node);
169169-170170- let prev_key = walker.step_to_edge(&mem_blocks)?;
171169172170 Ok(MemCar {
173171 commit,
+5-9
src/lib.rs
···13131414Some MST validations are applied:
1515- Keys must appear in order
1616-- Keys must be at the correct MST tree depth
1616+- Keys must be at the correct MST tree layer
17171818`iroh_car` additionally applies a block size limit of `2MiB`.
1919···74747575*/
76767777-pub mod block;
7877pub mod disk;
7979-pub mod drive;
8080-pub mod link;
7878+pub mod mem;
8179pub mod mst;
8280pub mod walk;
83818484-pub use disk::{DiskBuilder, DiskError, DiskStore};
8585-pub use block::noop;
8686-pub use drive::{DriveError, DriverBuilder, LoadError, MemCar, PartialCar};
8787-pub use link::NodeThing;
8282+pub use disk::{DiskBuilder, DiskDriver, DiskError, DiskStore, DriveError};
8383+pub use mem::{DriverBuilder, LoadError, MemCar, PartialCar};
8884pub use mst::Commit;
8989-pub use walk::{Output, Step, WalkError, WalkItem};
8585+pub use walk::{Output, Step, WalkError, WalkItem, noop};
90869187pub type Bytes = Vec<u8>;
9288