···11+use futures::Stream;
22+use futures::TryStreamExt;
33+use std::error::Error;
44+55+use crate::disk_walk::{Step, Trip, Walker};
66+use crate::mst::Commit;
77+use crate::mst::Node;
88+99+use ipld_core::cid::Cid;
1010+use serde::{Deserialize, Serialize, de::DeserializeOwned};
1111+1212+/// Errors that can happen while consuming and emitting blocks and records
1313+#[derive(Debug, thiserror::Error)]
1414+pub enum DriveError {
1515+ #[error("Failed to initialize CarReader: {0}")]
1616+ CarReader(#[from] iroh_car::Error),
1717+ #[error("Car block stream error: {0}")]
1818+ CarBlockError(Box<dyn Error>),
1919+ #[error("Failed to decode commit block: {0}")]
2020+ BadCommit(Box<dyn Error>),
2121+ #[error("The Commit block reference by the root was not found")]
2222+ MissingCommit,
2323+ #[error("The MST block {0} could not be found")]
2424+ MissingBlock(Cid),
2525+ #[error("Failed to walk the mst tree: {0}")]
2626+ Tripped(#[from] Trip),
2727+}
2828+2929+#[derive(Debug, Clone, Serialize, Deserialize)]
3030+pub enum MaybeProcessedBlock<T: Clone + Serialize> {
3131+ /// A block that's *probably* a Node (but we can't know yet)
3232+ ///
3333+ /// It *can be* a record that suspiciously looks a lot like a node, so we
3434+ /// cannot eagerly turn it into a Node. We only know for sure what it is
3535+ /// when we actually walk down the MST
3636+ Raw(Vec<u8>),
3737+ /// A processed record from a block that was definitely not a Node
3838+ ///
3939+ /// If we _never_ needed this block, then we may have wasted a bit of effort
4040+ /// trying to process it. Oh well.
4141+ ///
4242+ /// Processing has to be fallible because the CAR can have totally-unused
4343+ /// blocks, which can just be garbage. since we're eagerly trying to process
4444+ /// record blocks without knowing for sure that they *are* records, we
4545+ /// discard any definitely-not-nodes that fail processing and keep their
4646+ /// error in the buffer for them. if we later try to retreive them as a
4747+ /// record, then we can surface the error.
4848+ ///
4949+ /// The error type is `String` because we don't really want to put
5050+ /// any constraints like `Serialize` on the error type, and `Error`
5151+ /// at least requires `Display`. It's a compromise.
5252+ ProcessedOk(T),
5353+ Unprocessable(String),
5454+}
5555+5656+pub trait BlockStore<MPB: Serialize + DeserializeOwned> {
5757+ fn put(&self, key: Cid, value: MPB); // unwraps for now
5858+ fn get(&self, key: Cid) -> Option<MPB>;
5959+}
6060+6161+type CarBlock<E> = Result<(Cid, Vec<u8>), E>;
6262+6363+/// The core driver between the block stream and MST walker
6464+pub struct Vehicle<SE, S, T, BS, P, PE>
6565+where
6666+ SE: Error + 'static,
6767+ S: Stream<Item = CarBlock<SE>>,
6868+ T: Clone + Serialize + DeserializeOwned,
6969+ BS: BlockStore<MaybeProcessedBlock<T>>,
7070+ P: Fn(&[u8]) -> Result<T, PE>,
7171+ PE: Error,
7272+{
7373+ block_stream: S,
7474+ block_store: BS,
7575+ walker: Walker,
7676+ process: P,
7777+}
7878+7979+impl<SE, S, T, BS, P, PE> Vehicle<SE, S, T, BS, P, PE>
8080+where
8181+ SE: Error + 'static,
8282+ S: Stream<Item = CarBlock<SE>> + Unpin,
8383+ T: Clone + Serialize + DeserializeOwned,
8484+ BS: BlockStore<MaybeProcessedBlock<T>>,
8585+ P: Fn(&[u8]) -> Result<T, PE>,
8686+ PE: Error,
8787+{
8888+ /// Set up the stream
8989+ ///
9090+ /// This will eagerly consume blocks until the `Commit` object is found.
9191+ /// *Usually* the it's the first block, but there is no guarantee.
9292+ ///
9393+ /// ### Parameters
9494+ ///
9595+ /// `root`: CID of the commit object that is the root of the MST
9696+ ///
9797+ /// `block_stream`: Input stream of raw CAR blocks
9898+ ///
9999+ /// `process`: record-transforming callback:
100100+ ///
101101+ /// For tasks where records can be quickly processed into a *smaller*
102102+ /// useful representation, you can do that eagerly as blocks come in by
103103+ /// passing the processor as a callback here. This can reduce overall
104104+ /// memory usage.
105105+ pub async fn init(
106106+ root: Cid,
107107+ mut block_stream: S,
108108+ block_store: BS,
109109+ process: P,
110110+ ) -> Result<(Commit, Self), DriveError> {
111111+ let mut commit = None;
112112+113113+ while let Some((cid, data)) = block_stream
114114+ .try_next()
115115+ .await
116116+ .map_err(|e| DriveError::CarBlockError(e.into()))?
117117+ {
118118+ if cid == root {
119119+ let c: Commit = serde_ipld_dagcbor::from_slice(&data)
120120+ .map_err(|e| DriveError::BadCommit(e.into()))?;
121121+ commit = Some(c);
122122+ break;
123123+ } else {
124124+ block_store.put(
125125+ cid,
126126+ if Node::could_be(&data) {
127127+ MaybeProcessedBlock::Raw(data)
128128+ } else {
129129+ match process(&data) {
130130+ Ok(t) => MaybeProcessedBlock::ProcessedOk(t),
131131+ Err(e) => MaybeProcessedBlock::Unprocessable(e.to_string()),
132132+ }
133133+ },
134134+ );
135135+ }
136136+ }
137137+138138+ // we either broke out or read all the blocks without finding the commit...
139139+ let commit = commit.ok_or(DriveError::MissingCommit)?;
140140+141141+ let walker = Walker::new(commit.data);
142142+143143+ let me = Self {
144144+ block_stream,
145145+ block_store,
146146+ walker,
147147+ process,
148148+ };
149149+ Ok((commit, me))
150150+ }
151151+152152+ async fn drive_until(&mut self, cid_needed: Cid) -> Result<(), DriveError> {
153153+ while let Some((cid, data)) = self
154154+ .block_stream
155155+ .try_next()
156156+ .await
157157+ .map_err(|e| DriveError::CarBlockError(e.into()))?
158158+ {
159159+ self.block_store.put(
160160+ cid,
161161+ if Node::could_be(&data) {
162162+ MaybeProcessedBlock::Raw(data)
163163+ } else {
164164+ match (self.process)(&data) {
165165+ Ok(t) => MaybeProcessedBlock::ProcessedOk(t),
166166+ Err(e) => MaybeProcessedBlock::Unprocessable(e.to_string()),
167167+ }
168168+ },
169169+ );
170170+ if cid == cid_needed {
171171+ return Ok(());
172172+ }
173173+ }
174174+175175+ // if we never found the block
176176+ Err(DriveError::MissingBlock(cid_needed))
177177+ }
178178+179179+ /// Manually step through the record outputs
180180+ pub async fn next_record(&mut self) -> Result<Option<(String, T)>, DriveError> {
181181+ loop {
182182+ // walk as far as we can until we run out of blocks or find a record
183183+ let cid_needed = match self.walker.step(&mut self.block_store, &self.process)? {
184184+ Step::Rest(cid) => cid,
185185+ Step::Finish => return Ok(None),
186186+ Step::Step { rkey, data } => return Ok(Some((rkey, data))),
187187+ };
188188+189189+ // load blocks until we reach that cid
190190+ self.drive_until(cid_needed).await?;
191191+ }
192192+ }
193193+194194+ /// Convert to a futures::stream of record outputs
195195+ pub fn stream(self) -> impl Stream<Item = Result<(String, T), DriveError>> {
196196+ futures::stream::try_unfold(self, |mut this| async move {
197197+ let maybe_record = this.next_record().await?;
198198+ Ok(maybe_record.map(|b| (b, this)))
199199+ })
200200+ }
201201+}
+50
src/disk_redb.rs
···11+use crate::disk_drive::BlockStore;
22+use ipld_core::cid::Cid;
33+use redb::{Database, Error, ReadableDatabase, TableDefinition};
44+use serde::{Serialize, de::DeserializeOwned};
55+use std::path::Path;
66+77+const TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("blocks");
88+99+pub struct RedbStore {
1010+ db: Database,
1111+}
1212+1313+impl RedbStore {
1414+ pub fn new(path: impl AsRef<Path>) -> Result<Self, Error> {
1515+ let db = Database::create(path)?;
1616+ Ok(Self { db })
1717+ }
1818+}
1919+2020+// TODO: clean up on drop
2121+2222+impl<MPB: Serialize + DeserializeOwned> BlockStore<MPB> for RedbStore {
2323+ fn put(&self, c: Cid, t: MPB) {
2424+ let key_bytes = c.to_bytes();
2525+ let val_bytes = bincode::serde::encode_to_vec(t, bincode::config::standard()).unwrap();
2626+2727+ let mut tx = self.db.begin_write().unwrap();
2828+ tx.set_durability(redb::Durability::None).unwrap();
2929+ {
3030+ let mut table = tx.open_table(TABLE).unwrap();
3131+ table.insert(&*key_bytes, &*val_bytes).unwrap();
3232+ }
3333+ tx.commit().unwrap();
3434+ }
3535+ fn get(&self, c: Cid) -> Option<MPB> {
3636+ let key_bytes = c.to_bytes();
3737+ let tx = self.db.begin_read().unwrap();
3838+ let table = match tx.open_table(TABLE) {
3939+ Ok(t) => t,
4040+ Err(redb::TableError::TableDoesNotExist(_)) => return None,
4141+ e => e.unwrap(),
4242+ };
4343+ let maybe_val_bytes = table.get(&*key_bytes).unwrap()?;
4444+ let (t, n): (MPB, usize) =
4545+ bincode::serde::decode_from_slice(maybe_val_bytes.value(), bincode::config::standard())
4646+ .unwrap();
4747+ assert_eq!(maybe_val_bytes.value().len(), n);
4848+ Some(t)
4949+ }
5050+}
···22//!
33//! For now see the [examples](https://tangled.org/@microcosm.blue/repo-stream/tree/main/examples)
4455+pub mod disk_drive;
66+pub mod disk_redb;
77+pub mod disk_walk;
58pub mod drive;
69pub mod mst;
710pub mod walk;