Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

errors for block store

phil bf750a87 b132bf6b

+69 -41
+24 -5
src/disk_drive.rs
··· 3 3 use std::collections::VecDeque; 4 4 use std::error::Error; 5 5 6 - use crate::disk_walk::{Trip, Walker}; 6 + use crate::disk_walk::{Trip, Walker, RkeyError}; 7 7 use crate::mst::Commit; 8 8 9 9 use ipld_core::cid::Cid; ··· 41 41 Tripped(#[from] Trip), 42 42 } 43 43 44 + #[derive(Debug, thiserror::Error)] 45 + pub enum BlockStoreError { 46 + #[error("Error from the storage backend: {0}")] 47 + StorageBackend(Box<dyn Error + Send>), 48 + 49 + #[error(transparent)] 50 + RkeyError(#[from] RkeyError), 51 + 52 + // this should probably not be up here 53 + #[error("Failed to join tokio task: {0}")] 54 + JoinError(tokio::task::JoinError), 55 + 56 + #[error("Could not find block: {0}")] 57 + MissingBlock(Cid), 58 + } 59 + 44 60 /// Storage backend for caching large-repo blocks 45 61 /// 46 62 /// Since 47 63 pub trait BlockStore<MPB: Serialize + DeserializeOwned> { 48 - fn put_batch(&self, blocks: Vec<(Cid, MPB)>) -> impl Future<Output = ()> + Send; // unwraps for now 64 + fn put_batch(&self, blocks: Vec<(Cid, MPB)>) -> impl Future<Output = Result<(), BlockStoreError>>; // unwraps for now 49 65 fn walk_batch( 50 66 &self, 51 67 walker: Walker, 52 68 n: usize, 53 - ) -> impl Future<Output = Result<(Walker, Vec<(String, MPB)>), String>>; // boo string error for now because 69 + ) -> impl Future<Output = Result<(Walker, Vec<(String, MPB)>), BlockStoreError>>; // boo string error for now because 54 70 } 55 71 56 72 type CarBlock<E> = Result<(Cid, Vec<u8>), E>; ··· 127 143 to_insert.push((cid, data)); 128 144 } 129 145 } 130 - block_store.put_batch(to_insert).await; 146 + block_store 147 + .put_batch(to_insert) 148 + .await 149 + .map_err(|e| DriveError::Boooooo(format!("boooOOOOO! {e}")))?; // TODO 131 150 } 132 151 133 152 log::warn!("init: got commit?"); ··· 155 174 .block_store 156 175 .walk_batch(walker, n) 157 176 .await 158 - .map_err(DriveError::Boooooo)?; 177 + .map_err(|e| DriveError::Boooooo(format!("booo! {e}")))?; // TODO 159 178 self.walker = walker; 160 179 161 180 let processed = batch
+26 -19
src/disk_redb.rs
··· 1 - use crate::disk_drive::BlockStore; 1 + use crate::disk_drive::{BlockStore, BlockStoreError}; 2 2 use crate::disk_walk::{Need, Walker}; 3 3 use ipld_core::cid::Cid; 4 4 use redb::{Database, Durability, Error, ReadableDatabase, TableDefinition}; ··· 23 23 } 24 24 } 25 25 26 + // TODO: ship off to a blocking thread 26 27 impl Drop for RedbStore { 27 28 fn drop(&mut self) { 28 29 let mut tx = self.db.begin_write().unwrap(); ··· 32 33 } 33 34 } 34 35 36 + impl<E: Into<Error>> From<E> for BlockStoreError { 37 + fn from(e: E) -> BlockStoreError { 38 + let e = Into::<Error>::into(e); 39 + BlockStoreError::StorageBackend(Box::new(e)) 40 + } 41 + } 42 + 35 43 impl BlockStore<Vec<u8>> for RedbStore { 36 - async fn put_batch(&self, blocks: Vec<(Cid, Vec<u8>)>) { 44 + async fn put_batch(&self, blocks: Vec<(Cid, Vec<u8>)>) -> Result<(), BlockStoreError> { 37 45 let db = self.db.clone(); 38 - tokio::task::spawn_blocking(move || { 39 - let mut tx = db.begin_write().unwrap(); 40 - tx.set_durability(Durability::None).unwrap(); 46 + tokio::task::spawn_blocking(move || -> Result<(), BlockStoreError> { 47 + let mut tx = db.begin_write()?; 48 + tx.set_durability(Durability::None)?; 41 49 42 50 { 43 - let mut table = tx.open_table(TABLE).unwrap(); 51 + let mut table = tx.open_table(TABLE)?; 44 52 for (cid, t) in blocks { 45 53 let key_bytes = cid.to_bytes(); 46 - table.insert(&*key_bytes, &*t).unwrap(); 54 + table.insert(&*key_bytes, &*t)?; 47 55 } 48 56 } 49 57 50 - tx.commit().unwrap(); 58 + Ok(tx.commit()?) 51 59 }) 52 60 .await 53 - .unwrap(); 61 + .map_err(BlockStoreError::JoinError)? 54 62 } 55 63 56 64 async fn walk_batch( 57 65 &self, 58 66 mut walker: Walker, 59 67 n: usize, 60 - ) -> Result<(Walker, Vec<(String, Vec<u8>)>), String> { 68 + ) -> Result<(Walker, Vec<(String, Vec<u8>)>), BlockStoreError> { 61 69 let db = self.db.clone(); 62 - tokio::task::spawn_blocking(move || { 63 - let tx = db.begin_read().unwrap(); 64 - let table = tx.open_table(TABLE).unwrap(); 70 + tokio::task::spawn_blocking(move || -> Result<_, BlockStoreError> { 71 + let tx = db.begin_read()?; 72 + let table = tx.open_table(TABLE)?; 65 73 66 74 let mut out = Vec::with_capacity(n); 67 75 loop { 68 - let Some(need) = walker.next_needed() else { 76 + let Some(need) = walker.next_needed()? else { 69 77 break; 70 78 }; 71 79 let cid = need.cid(); 72 - let Some(res) = table.get(&*cid.to_bytes()).unwrap() else { 73 - return Err(format!("missing block: {cid:?}")); 80 + let Some(res) = table.get(&*cid.to_bytes())? else { 81 + return Err(BlockStoreError::MissingBlock(cid)); 74 82 }; 75 83 let block = res.value(); 76 84 77 85 match need { 78 86 Need::Node(_) => walker 79 - .handle_node(block) 80 - .map_err(|e| format!("failed to handle mst node: {e}"))?, 87 + .handle_node(block)?, 81 88 Need::Record { rkey, .. } => { 82 89 out.push((rkey, block.to_vec())); 83 90 if out.len() >= n { ··· 89 96 Ok((walker, out)) 90 97 }) 91 98 .await 92 - .unwrap() // tokio join 99 + .map_err(BlockStoreError::JoinError)? 93 100 } 94 101 }
+19 -17
src/disk_walk.rs
··· 17 17 RkeyError(#[from] RkeyError), 18 18 #[error("Process failed: {0}")] 19 19 ProcessFailed(String), 20 - #[error("Encountered an rkey out of order while walking the MST")] 21 - RkeyOutOfOrder, 22 20 } 23 21 24 22 /// Errors from invalid Rkeys ··· 28 26 EntryPrefixOutOfbounds, 29 27 #[error("RKey was not utf-8")] 30 28 EntryRkeyNotUtf8(#[from] std::string::FromUtf8Error), 29 + #[error("Encountered an rkey out of order while walking the MST")] 30 + RkeyOutOfOrder, 31 + #[error("Failed to decode commit block: {0}")] 32 + BlockDecodeError(#[from] serde_ipld_dagcbor::DecodeError<Infallible>), 31 33 } 32 34 33 35 /// Walker outputs ··· 110 112 } 111 113 } 112 114 113 - pub fn next_needed(&mut self) -> Option<Need> { 114 - self.stack.pop() 115 - // TODO: 116 - // let need = self.stack.pop()?; 117 - // if let Need::Record { ref rkey, .. } = need { 118 - // // rkeys *must* be in order or else the tree is invalid (or 119 - // // we have a bug) 120 - // if *rkey <= self.prev { 121 - // return Err(Trip::RkeyOutOfOrder); 122 - // } 123 - // self.prev = rkey.clone(); 124 - // } 125 - // Some(need) 115 + pub fn next_needed(&mut self) -> Result<Option<Need>, RkeyError> { 116 + let Some(need) = self.stack.pop() else { 117 + return Ok(None); 118 + }; 119 + if let Need::Record { ref rkey, .. } = need { 120 + // rkeys *must* be in order or else the tree is invalid (or 121 + // we have a bug) 122 + if *rkey <= self.prev { 123 + return Err(RkeyError::RkeyOutOfOrder); 124 + } 125 + self.prev = rkey.clone(); 126 + } 127 + Ok(Some(need)) 126 128 } 127 129 128 130 /// hacky: this must be called after next_needed if it was a node 129 - pub fn handle_node(&mut self, block: &[u8]) -> Result<(), Trip> { 130 - let node = serde_ipld_dagcbor::from_slice::<Node>(block).map_err(Trip::BadCommit)?; 131 + pub fn handle_node(&mut self, block: &[u8]) -> Result<(), RkeyError> { 132 + let node = serde_ipld_dagcbor::from_slice::<Node>(block)?; 131 133 push_from_node(&mut self.stack, &node)?; 132 134 Ok(()) 133 135 }