···11+# repo-stream
22+33+an AsyncRead for atproto MSTs in CAR files
44+55+- tries to walk and emit the MST *while streaming in the CAR*
66+- drops intermediate mst blocks after reading to reduce total memory
77+- user-provided transform function on record blocks from IPLD
88+99+future work:
1010+- flush to disk if needed (sqlite? redb?) https://bsky.app/profile/divy.zone/post/3m2mf3jqx3k2w
1111+ - either just generally to handle huge CARs, or as a fallback when streaming fails
1212+1313+redb has an in-memory backend, so it would be possible to *always* use it for block caching. user can choose if they want to allow disk or just do memory, and then "spilling" from the cache to disk would be mostly free?
+109
src/drive.rs
···11+use ipld_core::ipld::Ipld;
22+use tokio::io::AsyncRead;
33+use iroh_car::CarReader;
44+use std::collections::HashMap;
55+use ipld_core::cid::Cid;
66+77+use crate::mst::Commit;
88+use crate::walk::{Walker, Step, Trip};
99+1010+#[derive(Debug, thiserror::Error)]
1111+pub enum DriveError {
1212+ #[error("Failed to initialize CarReader: {0}")]
1313+ CarReader(#[from] iroh_car::Error),
1414+ #[error("CAR file requires a root to be present")]
1515+ MissingRoot,
1616+ #[error("Failed to decode commit block: {0}")]
1717+ BadCommit(Box<dyn std::error::Error>),
1818+ #[error("Failed to decode record block: {0}")]
1919+ BadRecord(Box<dyn std::error::Error>),
2020+ #[error("The Commit block reference by the root was not found")]
2121+ MissingCommit,
2222+ #[error("Failed to walk the mst tree: {0}")]
2323+ Tripped(#[from] Trip),
2424+}
2525+2626+2727+pub async fn drive<R: AsyncRead + Unpin>(reader: R) -> Result<(), DriveError> {
2828+ let mut reader = CarReader::new(reader).await?;
2929+3030+ let root = reader
3131+ .header()
3232+ .roots()
3333+ .first()
3434+ .ok_or(DriveError::MissingRoot)?
3535+ .clone();
3636+ log::debug!("root: {root:?}");
3737+3838+ // one day,
3939+ // https://github.com/bluesky-social/proposals/tree/main/0006-sync-iteration#streaming-car-processing
4040+4141+ // block buffers
4242+ let mut blocks: HashMap::<Cid, Vec<u8>> = HashMap::new();
4343+4444+ // stage 1: try to parse out the commit block, buffering other blocks until
4545+ // we find it
4646+ let mut commit = None;
4747+ while let Some((cid, data)) = reader.next_block().await? {
4848+ if cid == root {
4949+ let c: Commit = serde_ipld_dagcbor::from_slice(&data)
5050+ .map_err(|e| DriveError::BadCommit(e.into()))?;
5151+ commit = Some(c);
5252+ break;
5353+ }
5454+ blocks.insert(cid, data);
5555+ };
5656+5757+ // we either broke out or read all the blocks without finding the commit...
5858+ let commit = commit.ok_or(DriveError::MissingCommit)?;
5959+6060+ log::debug!("got the commit: {commit:?}");
6161+6262+ // broke out! found it! yay! and with the commit we should know the tree
6363+ // root, so we can start walking as we go now.
6464+ let mut walker = Walker::new(commit.data);
6565+ let mut n = 0;
6666+ 'outer: loop {
6767+ // walk as far as we can, then stream in more blocks
6868+ let mut m = 0;
6969+ loop {
7070+ match walker.walk(&mut blocks)? {
7171+ Step::Rest => {
7272+ log::trace!("walker is resting, get another block");
7373+ break;
7474+ }
7575+ Step::Finish => {
7676+ log::trace!("walker finished");
7777+ break 'outer;
7878+ }
7979+ Step::Step { rkey, data } => {
8080+ let rkey = String::from_utf8(rkey);
8181+ let record: Ipld = serde_ipld_dagcbor::from_slice(&data)
8282+ .map_err(|e| DriveError::BadRecord(e.into()))?;
8383+ log::info!("found {rkey:?} => {record:?}");
8484+ }
8585+ }
8686+ m += 1;
8787+ if m > 1000 {
8888+ log::error!("ran out of inner loop time, breaking");
8989+ break 'outer;
9090+ };
9191+ }
9292+9393+ let Some((cid, data)) = reader.next_block().await? else {
9494+ log::warn!("no more data to stream in, but ig walker didn't finish?");
9595+ break;
9696+ };
9797+ blocks.insert(cid, data);
9898+9999+ n += 1;
100100+ if n > 1000 {
101101+ log::error!("ran out of outer loop time, breaking");
102102+ break 'outer;
103103+ };
104104+ }
105105+106106+ log::info!("done! bye!");
107107+108108+ Ok(())
109109+}
+3
src/lib.rs
···11+pub mod drive;
22+pub mod mst;
33+pub mod walk;
+90
src/mst.rs
···11+//! Low-level types for parsing raw atproto MST CARs
22+//!
33+//! The primary aim is to work through the **tree** structure. Non-node blocks
44+//! are left as raw bytes, for upper levels to parse into DAG-CBOR or whatever.
55+66+use ipld_core::ipld::Ipld;
77+use ipld_core::cid::Cid;
88+use serde::Deserialize;
99+1010+1111+/// The top-level data object in a repository's tree is a signed commit.
1212+#[derive(Debug, Deserialize)]
1313+// #[serde(deny_unknown_fields)]
1414+pub struct Commit {
1515+ /// the account DID associated with the repo, in strictly normalized form
1616+ /// (eg, lowercase as appropriate)
1717+ pub did: String,
1818+ /// fixed value of 3 for this repo format version
1919+ pub version: u64,
2020+ /// pointer to the top of the repo contents tree structure (MST)
2121+ pub data: Cid,
2222+ /// revision of the repo, used as a logical clock.
2323+ ///
2424+ /// TID format. Must increase monotonically. Recommend using current
2525+ /// timestamp as TID; rev values in the "future" (beyond a fudge factor)
2626+ /// should be ignored and not processed
2727+ pub rev: String,
2828+ /// pointer (by hash) to a previous commit object for this repository.
2929+ ///
3030+ /// Could be used to create a chain of history, but largely unused (included
3131+ /// for v2 backwards compatibility). In version 3 repos, this field must
3232+ /// exist in the CBOR object, but is virtually always null. NOTE: previously
3333+ /// specified as nullable and optional, but this caused interoperability
3434+ /// issues.
3535+ pub prev: Option<Cid>,
3636+ /// cryptographic signature of this commit, as raw bytes
3737+ pub sig: ipld_core::ipld::Ipld, // TODO (vec<u8> fails with Mismatch { expect_major: 4, byte: 88 })
3838+}
3939+4040+/// MST node data schema
4141+#[derive(Debug, Deserialize)]
4242+#[serde(deny_unknown_fields)]
4343+pub struct Node {
4444+ /// link to sub-tree Node on a lower level and with all keys sorting before
4545+ /// keys at this node
4646+ #[serde(rename = "l")]
4747+ pub left: Option<Cid>,
4848+ /// ordered list of TreeEntry objects
4949+ ///
5050+ /// atproto MSTs have a fanout of 4, so there can be max 4 entries.
5151+ #[serde(rename = "e")]
5252+ pub entries: Vec<Entry>, // maybe we can do [Option<Entry>; 4]?
5353+}
5454+5555+impl Node {
5656+ /// Check if a node has any entries
5757+ ///
5858+ /// An empty repository with no records is represented as a single MST node
5959+ /// with an empty array of entries. This is the only situation in which a
6060+ /// tree may contain an empty leaf node which does not either contain keys
6161+ /// ("entries") or point to a sub-tree containing entries.
6262+ ///
6363+ /// TODO: to me this is slightly unclear with respect to `l` (ask someone).
6464+ /// ...is that what "The top of the tree must not be a an empty node which
6565+ /// only points to a sub-tree." is referring to?
6666+ pub fn is_empty(&self) -> bool {
6767+ self.left.is_none() && self.entries.is_empty()
6868+ }
6969+}
7070+7171+/// TreeEntry object
7272+#[derive(Debug, Deserialize)]
7373+#[serde(deny_unknown_fields)]
7474+pub struct Entry {
7575+ /// count of bytes shared with previous TreeEntry in this Node (if any)
7676+ #[serde(rename = "p")]
7777+ pub prefix_len: usize,
7878+ /// remainder of key for this TreeEntry, after "prefixlen" have been removed
7979+ #[serde(rename = "k")]
8080+ pub keysuffix: Ipld, // can we String this here?
8181+ /// link to the record data (CBOR) for this entry
8282+ #[serde(rename = "v")]
8383+ pub value: Cid,
8484+ /// link to a sub-tree Node at a lower level
8585+ ///
8686+ /// the lower level must have keys sorting after this TreeEntry's key (to
8787+ /// the "right"), but before the next TreeEntry's key in this Node (if any)
8888+ #[serde(rename = "t")]
8989+ pub tree: Option<Cid>,
9090+}
+124
src/walk.rs
···11+//! Depth-first MST traversal
22+33+use ipld_core::ipld::Ipld;
44+use crate::mst::Node;
55+use std::collections::HashMap;
66+use ipld_core::cid::Cid;
77+88+#[derive(Debug, thiserror::Error)]
99+pub enum Trip {
1010+ #[error("empty mst nodes are not allowed")]
1111+ NodeEmpty,
1212+ #[error("Failed to decode commit block: {0}")]
1313+ BadCommit(Box<dyn std::error::Error>),
1414+}
1515+1616+#[derive(Debug)]
1717+pub enum Step {
1818+ Rest,
1919+ Finish,
2020+ Step {
2121+ rkey: Vec<u8>,
2222+ data: Vec<u8>
2323+ },
2424+}
2525+2626+#[derive(Debug)]
2727+enum Need {
2828+ Node(Cid),
2929+ Record {
3030+ rkey: Vec<u8>,
3131+ cid: Cid,
3232+ },
3333+ AcutallyDone,
3434+}
3535+3636+fn needs_from_node(node: Node) -> Vec<Need> {
3737+ let mut out = vec![];
3838+ if let Some(left_cid) = node.left {
3939+ out.push(Need::Node(left_cid));
4040+ }
4141+ let mut prefix = vec![];
4242+ for (i, entry) in node.entries.into_iter().enumerate() {
4343+ let suffix = match entry.keysuffix {
4444+ Ipld::Bytes(data) => data,
4545+ _ => panic!("booo"),
4646+ };
4747+ let mut rkey = Vec::with_capacity(prefix.len() + suffix.len());
4848+ rkey.extend_from_slice(&prefix);
4949+ rkey.extend_from_slice(&suffix);
5050+ if i == 0 {
5151+ prefix.extend_from_slice(&suffix);
5252+ }
5353+ out.push(Need::Record { rkey, cid: entry.value });
5454+ if let Some(child_cid) = entry.tree {
5555+ out.push(Need::Node(child_cid.clone()));
5656+ }
5757+ }
5858+ // stack is right-to-left, for our left-to-right traversal
5959+ out.reverse();
6060+ out
6161+}
6262+6363+6464+#[derive(Debug)]
6565+pub struct Walker {
6666+ current: Need,
6767+ stack: Vec<Need>,
6868+}
6969+7070+impl Walker {
7171+ pub fn new(tree_root_cid: Cid) -> Self {
7272+ Self {
7373+ current: Need::Node(tree_root_cid),
7474+ stack: Vec::new(),
7575+ }
7676+ }
7777+7878+ pub fn walk(&mut self, blocks: &mut HashMap<Cid, Vec<u8>>) -> Result<Step, Trip> {
7979+ loop {
8080+ match &mut self.current {
8181+ Need::Node(cid) => {
8282+ log::trace!("need node {cid:?}");
8383+ let Some(block) = blocks.remove(&cid) else {
8484+ log::trace!("node not found, resting");
8585+ return Ok(Step::Rest);
8686+ };
8787+ let node = serde_ipld_dagcbor::from_slice::<Node>(&block)
8888+ .map_err(|e| Trip::BadCommit(e.into()))?;
8989+ let mut needs = needs_from_node(node);
9090+ self.stack.append(&mut needs);
9191+ if let Some(need) = self.stack.pop() {
9292+ log::trace!("found a need from the stack {need:?}");
9393+ self.current = need;
9494+ } else {
9595+ log::trace!("no more needs from stack, ig we are done?");
9696+ return Ok(Step::Finish);
9797+ }
9898+ }
9999+ Need::Record { rkey, cid } => {
100100+ log::trace!("need record {cid:?}");
101101+ let Some(data) = blocks.get(&cid) else {
102102+ log::trace!("record block not found, resting");
103103+ return Ok(Step::Rest);
104104+ };
105105+ let rkey = rkey.to_vec();
106106+ let data = data.to_vec();
107107+ if let Some(next) = self.stack.pop() {
108108+ log::trace!("updated current from stack");
109109+ self.current = next;
110110+ } else {
111111+ log::trace!("nothing left on the stack, making us done");
112112+ self.current = Need::AcutallyDone;
113113+ }
114114+ log::trace!("providing a block as a step");
115115+ return Ok(Step::Step { rkey, data });
116116+ }
117117+ Need::AcutallyDone => {
118118+ log::trace!("tried to walk but we're actually done.");
119119+ return Ok(Step::Finish);
120120+ }
121121+ }
122122+ }
123123+ }
124124+}