Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

generic process error

yay

this was not that bad

phil 02732591 6dbdc163

+35 -25
+2 -1
examples/read-file/main.rs
··· 1 1 extern crate repo_stream; 2 + use std::convert::Infallible; 2 3 use clap::Parser; 3 4 use futures::TryStreamExt; 4 5 use iroh_car::CarReader; ··· 38 39 let (commit, v) = repo_stream::drive::Vehicle::init( 39 40 root, 40 41 stream, 41 - |block| Ok(block.len()), 42 + |block| Ok::<_, Infallible>(block.len()), 42 43 ).await?; 43 44 let mut record_stream = std::pin::pin!(v.stream()); 44 45
+16 -14
src/drive.rs
··· 7 7 use crate::walk::{Step, Trip, Walker}; 8 8 9 9 #[derive(Debug, thiserror::Error)] 10 - pub enum DriveError { 10 + pub enum DriveError<E: Error> { 11 11 #[error("Failed to initialize CarReader: {0}")] 12 12 CarReader(#[from] iroh_car::Error), 13 13 #[error("CAR file requires a root to be present")] ··· 23 23 #[error("The MST block {0} could not be found")] 24 24 MissingBlock(Cid), 25 25 #[error("Failed to walk the mst tree: {0}")] 26 - Tripped(#[from] Trip), 26 + Tripped(#[from] Trip<E>), 27 27 #[error("Not finished walking, but no more blocks are available to continue")] 28 28 Dnf, 29 29 } ··· 34 34 pub struct Rkey(pub String); 35 35 36 36 #[derive(Debug)] 37 - pub enum MaybeProcessedBlock<T> { 37 + pub enum MaybeProcessedBlock<T, E> { 38 38 /// A block that's *probably* a Node (but we can't know yet) 39 39 /// 40 40 /// It *can be* a record that suspiciously looks a lot like a node, so we ··· 59 59 /// There's an alternative here, which would be to kick unprocessable blocks 60 60 /// back to Raw, or maybe even a new RawUnprocessable variant. Then we could 61 61 /// surface the typed error later if needed by trying to reprocess. 62 - Processed(Result<T, Box<dyn Error>>), 62 + Processed(Result<T, E>), 63 63 } 64 64 65 65 // TODO: generic error not box dyn nonsense. 66 - pub type ProcRes<T> = Result<T, Box<dyn Error>>; 66 + pub type ProcRes<T, E> = Result<T, E>; 67 67 68 - pub struct Vehicle<SE, S, T, P> 68 + pub struct Vehicle<SE, S, T, P, PE> 69 69 where 70 70 S: Stream<Item = CarBlock<SE>>, 71 - P: Fn(&[u8]) -> ProcRes<T>, 71 + P: Fn(&[u8]) -> ProcRes<T, PE>, 72 + PE: Error, 72 73 { 73 74 block_stream: S, 74 - blocks: HashMap<Cid, MaybeProcessedBlock<T>>, 75 + blocks: HashMap<Cid, MaybeProcessedBlock<T, PE>>, 75 76 walker: Walker, 76 77 process: P, 77 78 } 78 79 79 - impl<SE, S, T: Clone, P> Vehicle<SE, S, T, P> 80 + impl<SE, S, T: Clone, P, PE> Vehicle<SE, S, T, P, PE> 80 81 where 81 82 SE: Error + 'static, 82 83 S: Stream<Item = CarBlock<SE>> + Unpin, 83 - P: Fn(&[u8]) -> ProcRes<T>, 84 + P: Fn(&[u8]) -> ProcRes<T, PE>, 85 + PE: Error, 84 86 { 85 87 pub async fn init( 86 88 root: Cid, 87 89 mut block_stream: S, 88 90 process: P, 89 - ) -> Result<(Commit, Self), DriveError> { 91 + ) -> Result<(Commit, Self), DriveError<PE>> { 90 92 let mut blocks = HashMap::new(); 91 93 92 94 let mut commit = None; ··· 124 126 Ok((commit, me)) 125 127 } 126 128 127 - async fn drive_until(&mut self, cid_needed: Cid) -> Result<(), DriveError> { 129 + async fn drive_until(&mut self, cid_needed: Cid) -> Result<(), DriveError<PE>> { 128 130 while let Some((cid, data)) = self 129 131 .block_stream 130 132 .try_next() ··· 145 147 return Err(DriveError::MissingBlock(cid_needed)); 146 148 } 147 149 148 - pub async fn next_record(&mut self) -> Result<Option<(Rkey, T)>, DriveError> { 150 + pub async fn next_record(&mut self) -> Result<Option<(Rkey, T)>, DriveError<PE>> { 149 151 loop { 150 152 // walk as far as we can until we run out of blocks or find a record 151 153 let cid_needed = match self.walker.walk(&mut self.blocks, &self.process)? { ··· 159 161 } 160 162 } 161 163 162 - pub fn stream(self) -> impl Stream<Item = Result<(Rkey, T), DriveError>> { 164 + pub fn stream(self) -> impl Stream<Item = Result<(Rkey, T), DriveError<PE>>> { 163 165 futures::stream::try_unfold(self, |mut this| async move { 164 166 let maybe_record = this.next_record().await?; 165 167 Ok(maybe_record.map(|b| (b, this)))
+17 -10
src/walk.rs
··· 1 1 //! Depth-first MST traversal 2 2 3 + use std::error::Error; 3 4 use crate::mst::Node; 4 5 use crate::drive::{MaybeProcessedBlock, ProcRes}; 5 6 use ipld_core::cid::Cid; ··· 7 8 use std::fmt; 8 9 9 10 #[derive(Debug, thiserror::Error)] 10 - pub enum Trip { 11 + pub enum Trip<E: Error> { 11 12 #[error("empty mst nodes are not allowed")] 12 13 NodeEmpty, 13 14 #[error("Failed to decode commit block: {0}")] 14 15 BadCommit(Box<dyn std::error::Error>), 15 16 #[error("Failed to process record: {0}")] 16 - RecordFailedProcessing(Box<dyn std::error::Error>), 17 - #[error("Failed to compute an rkey due to invalid prefix_len")] 18 - EntryPrefixOutOfbounds, 17 + RecordFailedProcessing(Box<dyn Error>), 18 + #[error("Action node error: {0}")] 19 + ActionNode(#[from] ActionNodeError), 19 20 #[error("RKey was not utf-8")] 20 21 EntryRkeyNotUtf8(#[from] std::string::FromUtf8Error), 21 22 #[error("Process failed: {0}")] 22 - ProcessFailed(Box<dyn std::error::Error>), 23 + ProcessFailed(E), 24 + } 25 + 26 + #[derive(Debug, thiserror::Error)] 27 + pub enum ActionNodeError { 28 + #[error("Failed to compute an rkey due to invalid prefix_len")] 29 + EntryPrefixOutOfbounds, 23 30 } 24 31 25 32 #[derive(Debug)] ··· 148 155 let mut rkey = vec![]; 149 156 let pre_checked = prefix 150 157 .get(..entry.prefix_len) 151 - .ok_or(Trip::EntryPrefixOutOfbounds) 158 + .ok_or(ActionNodeError::EntryPrefixOutOfbounds) 152 159 .unwrap(); // TODO has to be try_from 153 160 rkey.extend_from_slice(pre_checked); 154 161 rkey.extend_from_slice(&entry.keysuffix); ··· 177 184 } 178 185 } 179 186 180 - pub fn walk<T: Clone>( 187 + pub fn walk<T: Clone, E: Error>( 181 188 &mut self, 182 - blocks: &mut HashMap<Cid, MaybeProcessedBlock<T>>, 183 - process: impl Fn(&[u8]) -> ProcRes<T>, 184 - ) -> Result<Step<T>, Trip> { 189 + blocks: &mut HashMap<Cid, MaybeProcessedBlock<T, E>>, 190 + process: impl Fn(&[u8]) -> ProcRes<T, E>, 191 + ) -> Result<Step<T>, Trip<E>> { 185 192 loop { 186 193 let Some(current_node) = self.stack.last_mut() else { 187 194 log::trace!("tried to walk but we're actually done.");