Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

"optimize" link object sizes

and then get dragged by the benches

phil 1a6e06fb 26632b97

+197 -88
+10 -6
examples/read-file/main.rs
··· 4 4 5 5 extern crate repo_stream; 6 6 use clap::Parser; 7 - use repo_stream::{Driver, DriverBuilder, Step}; 7 + use repo_stream::{Driver, DriverBuilder, Output, Step}; 8 8 use std::path::PathBuf; 9 9 10 10 type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; ··· 34 34 35 35 log::info!("got commit: {commit:?}"); 36 36 37 - let mut n = 0; 38 - while let Step::Value(pairs) = driver.next_chunk(256).await? { 39 - n += pairs.len(); 40 - // log::info!("got {rkey:?}"); 37 + while let Step::Value(records) = driver.next_chunk(256).await? { 38 + for Output { rkey, cid, data } in records { 39 + let size = usize::from_ne_bytes(data.try_into().unwrap()); 40 + print!("0x"); 41 + for byte in cid.to_bytes() { 42 + print!("{byte:>02x}"); 43 + } 44 + println!(": {rkey} => record of len {}", size); 45 + } 41 46 } 42 - log::info!("bye! total records={n}"); 43 47 44 48 Ok(()) 45 49 }
+1 -1
src/disk.rs
··· 148 148 } 149 149 150 150 #[inline] 151 - pub(crate) fn get(&mut self, key: &[u8]) -> Result<Option<fjall::Slice>, FjallError> { 151 + pub(crate) fn get(&self, key: &[u8]) -> Result<Option<fjall::Slice>, FjallError> { 152 152 self.keyspace.get(key) 153 153 } 154 154
+18 -11
src/drive.rs
··· 1 1 //! Consume a CAR from an AsyncRead, producing an ordered stream of records 2 2 3 + use crate::link::ObjectLink; 3 4 use crate::{ 4 5 Bytes, HashMap, Rkey, Step, 5 6 disk::{DiskError, DiskStore}, 6 7 mst::MstNode, 7 - walk::Output, 8 + walk::{MstError, Output}, 8 9 }; 9 10 use cid::Cid; 10 11 use iroh_car::CarReader; ··· 33 34 ChannelSendError, // SendError takes <T> which we don't need 34 35 #[error("Failed to join a task: {0}")] 35 36 JoinError(#[from] tokio::task::JoinError), 37 + } 38 + 39 + impl From<MstError> for DriveError { 40 + fn from(me: MstError) -> DriveError { 41 + DriveError::WalkError(WalkError::MstError(me)) 42 + } 36 43 } 37 44 38 45 /// An in-order chunk of Rkey + CID + (processed) Block ··· 209 216 210 217 // stash (maybe processed) blocks in memory as long as we have room 211 218 mem_size += maybe_processed.len(); 212 - mem_blocks.insert(cid, maybe_processed); 219 + mem_blocks.insert(cid.into(), maybe_processed); 213 220 if mem_size >= max_size { 214 221 return Ok(Driver::Disk(NeedDisk { 215 222 car, ··· 227 234 228 235 // the commit always must point to a Node; empty node => empty MST special case 229 236 let root_node: MstNode = match mem_blocks 230 - .get(&commit.data) 237 + .get(&commit.data_link()?) 231 238 .ok_or(DriveError::MissingCommit)? 232 239 { 233 240 MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?, ··· 262 269 /// so the sync/async boundaries become a little easier to work around. 263 270 #[derive(Debug)] 264 271 pub struct MemDriver { 265 - blocks: HashMap<Cid, MaybeProcessedBlock>, 272 + blocks: HashMap<ObjectLink, MaybeProcessedBlock>, 266 273 walker: Walker, 267 274 process: fn(Bytes) -> Bytes, 268 275 } ··· 273 280 let mut out = Vec::with_capacity(n); 274 281 for _ in 0..n { 275 282 // walk as far as we can until we run out of blocks or find a record 276 - let Step::Value(output) = self.walker.step(&mut self.blocks, self.process)? else { 283 + let Step::Value(output) = self.walker.step(&self.blocks, self.process)? else { 277 284 break; 278 285 }; 279 286 out.push(output); ··· 292 299 root: Cid, 293 300 process: fn(Bytes) -> Bytes, 294 301 max_size: usize, 295 - mem_blocks: HashMap<Cid, MaybeProcessedBlock>, 302 + mem_blocks: HashMap<ObjectLink, MaybeProcessedBlock>, 296 303 pub commit: Option<Commit>, 297 304 } 298 305 ··· 314 321 }) 315 322 .await??; 316 323 317 - let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock)>>(1); 324 + let (tx, mut rx) = mpsc::channel::<Vec<(ObjectLink, MaybeProcessedBlock)>>(1); 318 325 319 326 let store_worker = tokio::task::spawn_blocking(move || { 320 327 while let Some(chunk) = rx.blocking_recv() { ··· 342 349 continue; 343 350 } 344 351 352 + let link = cid.into(); 345 353 let data = Bytes::from(data); 346 354 347 355 // remaining possible types: node, record, other. optimistically process 348 356 // TODO: get the actual in-memory size to compute disk spill 349 357 let maybe_processed = MaybeProcessedBlock::maybe(self.process, data); 350 358 mem_size += maybe_processed.len(); 351 - chunk.push((cid, maybe_processed)); 359 + chunk.push((link, maybe_processed)); 352 360 if mem_size >= (self.max_size / 2) { 353 361 // soooooo if we're setting the db cache to max_size and then letting 354 362 // multiple chunks in the queue that are >= max_size, then at any time ··· 372 380 373 381 let commit = self.commit.ok_or(DriveError::MissingCommit)?; 374 382 375 - // the commit always must point to a Node; empty node => empty MST special case 376 383 let db_bytes = store 377 - .get(&commit.data.to_bytes()) 384 + .get(&commit.data_link()?.to_bytes()) 378 385 .map_err(|e| DriveError::StorageError(DiskError::DbError(e)))? 379 386 .ok_or(DriveError::MissingCommit)?; 380 387 ··· 446 453 447 454 for _ in 0..n { 448 455 // walk as far as we can until we run out of blocks or find a record 449 - let step = match state.walker.disk_step(&mut state.store, process) { 456 + let step = match state.walker.disk_step(&state.store, process) { 450 457 Ok(s) => s, 451 458 Err(e) => { 452 459 return (state, Err(e.into()));
+2
src/lib.rs
··· 82 82 83 83 pub mod disk; 84 84 pub mod drive; 85 + pub mod link; 85 86 86 87 pub use disk::{DiskBuilder, DiskError, DiskStore}; 87 88 pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk, noop}; 89 + pub use link::NodeThing; 88 90 pub use mst::Commit; 89 91 pub use walk::{Output, Step}; 90 92
+97
src/link.rs
··· 1 + use cid::Cid; 2 + use thiserror::Error; 3 + 4 + #[derive(Debug, Error, PartialEq)] 5 + #[error("The CID is not a valid strict atproto SHA256 CID")] 6 + pub struct NotStrictError; 7 + 8 + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] 9 + pub struct CidStrict32([u8; 32]); 10 + 11 + pub const ATPROTO_HASHLINK_PREFIX: [u8; 4] = [0x01, 0x71, 0x12, 0x20]; 12 + 13 + impl TryFrom<&[u8]> for CidStrict32 { 14 + type Error = NotStrictError; 15 + fn try_from(raw: &[u8]) -> Result<CidStrict32, Self::Error> { 16 + let (pre, sha) = raw.split_first_chunk::<4>().ok_or(NotStrictError)?; 17 + if pre != &ATPROTO_HASHLINK_PREFIX { 18 + return Err(NotStrictError); 19 + } 20 + let inner = sha.try_into().map_err(|_| NotStrictError)?; 21 + Ok(CidStrict32(inner)) 22 + } 23 + } 24 + 25 + impl TryFrom<Cid> for CidStrict32 { 26 + type Error = NotStrictError; 27 + fn try_from(cid: Cid) -> Result<CidStrict32, Self::Error> { 28 + cid.to_bytes().as_slice().try_into() 29 + } 30 + } 31 + 32 + impl From<CidStrict32> for Cid { 33 + fn from(CidStrict32(sha): CidStrict32) -> Cid { 34 + let mut bytes = Vec::from(ATPROTO_HASHLINK_PREFIX); 35 + bytes.extend_from_slice(&sha); 36 + bytes.try_into().unwrap() // this prefix + sha is always a valid Cid 37 + } 38 + } 39 + 40 + #[derive(Debug, Clone, PartialEq, Eq, Hash)] 41 + pub enum ObjectLink { 42 + Should(CidStrict32), 43 + Allowed(Box<Cid>), 44 + } 45 + 46 + impl ObjectLink { 47 + pub fn to_bytes(&self) -> Vec<u8> { 48 + match self { 49 + ObjectLink::Should(CidStrict32(sha)) => { 50 + let mut bytes = vec![0xFF]; // prefix that's never valid for CID 51 + bytes.extend_from_slice(sha); 52 + bytes 53 + } 54 + ObjectLink::Allowed(cid) => cid.to_bytes(), 55 + } 56 + } 57 + } 58 + 59 + impl From<&CidStrict32> for ObjectLink { 60 + fn from(strict: &CidStrict32) -> ObjectLink { 61 + ObjectLink::Should(*strict) 62 + } 63 + } 64 + 65 + impl From<Cid> for ObjectLink { 66 + fn from(cid: Cid) -> ObjectLink { 67 + if let Ok(strict) = cid.try_into() { 68 + ObjectLink::Should(strict) 69 + } else { 70 + ObjectLink::Allowed(cid.into()) 71 + } 72 + } 73 + } 74 + 75 + impl From<ObjectLink> for Cid { 76 + fn from(link: ObjectLink) -> Cid { 77 + match link { 78 + ObjectLink::Should(strict) => strict.into(), 79 + ObjectLink::Allowed(boxed) => *boxed, 80 + } 81 + } 82 + } 83 + 84 + #[derive(Debug, Clone)] 85 + pub enum NodeThing { 86 + ChildNode(CidStrict32), 87 + Record(crate::Rkey, ObjectLink), 88 + } 89 + 90 + impl NodeThing { 91 + pub fn link(&self) -> ObjectLink { 92 + match self { 93 + Self::ChildNode(strict) => strict.into(), 94 + Self::Record(_, link) => link.clone(), 95 + } 96 + } 97 + }
+29 -29
src/mst.rs
··· 3 3 //! The primary aim is to work through the **tree** structure. Non-node blocks 4 4 //! are left as raw bytes, for upper levels to parse into DAG-CBOR or whatever. 5 5 6 - use crate::Rkey; 6 + use crate::{ 7 + link::{NodeThing, ObjectLink}, 8 + walk::MstError, 9 + }; 7 10 use cid::Cid; 8 11 use serde::Deserialize; 12 + use serde::de::{self, Deserializer, MapAccess, Unexpected, Visitor}; 9 13 use sha2::{Digest, Sha256}; 14 + use std::fmt; 15 + 16 + pub type Depth = u32; 10 17 11 18 /// The top-level data object in a repository's tree is a signed commit. 12 19 #[derive(Debug, Deserialize)] ··· 38 45 pub sig: serde_bytes::ByteBuf, 39 46 } 40 47 41 - use serde::de::{self, Deserializer, MapAccess, Unexpected, Visitor}; 42 - use std::fmt; 43 - 44 - pub type Depth = u32; 48 + impl Commit { 49 + pub fn data_link(&self) -> Result<ObjectLink, MstError> { 50 + let strict = self.data.try_into().map_err(MstError::MissingRootNode)?; 51 + Ok(ObjectLink::Should(strict)) 52 + } 53 + } 45 54 46 55 #[inline(always)] 47 56 pub fn atproto_mst_depth(key: &str) -> Depth { ··· 55 64 pub things: Vec<NodeThing>, 56 65 } 57 66 58 - #[derive(Debug, Clone)] 59 - pub struct NodeThing { 60 - pub cid: Cid, 61 - pub kind: ThingKind, 62 - } 63 - 64 - #[derive(Debug, Clone)] 65 - pub enum ThingKind { 66 - Tree, 67 - Value { rkey: Rkey }, 68 - } 69 - 70 67 impl<'de> Deserialize<'de> for MstNode { 71 68 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> 72 69 where ··· 98 95 } 99 96 found_left = true; 100 97 if let Some(cid) = map.next_value()? { 101 - left = Some(NodeThing { 102 - cid, 103 - kind: ThingKind::Tree, 104 - }); 98 + let Ok(strict) = Cid::try_into(cid) else { 99 + return Err(de::Error::invalid_value( 100 + Unexpected::Bytes(&cid.to_bytes()), 101 + &"a strict atproto sha256 CID link", 102 + )); 103 + }; 104 + left = Some(NodeThing::ChildNode(strict)); 105 105 } 106 106 } 107 107 "e" => { ··· 142 142 )); 143 143 } 144 144 145 - things.push(NodeThing { 146 - cid: entry.value, 147 - kind: ThingKind::Value { rkey: rkey_s }, 148 - }); 145 + things.push(NodeThing::Record(rkey_s, entry.value.into())); 149 146 150 147 if let Some(cid) = entry.tree { 151 - things.push(NodeThing { 152 - cid, 153 - kind: ThingKind::Tree, 154 - }); 148 + let Ok(strict) = cid.try_into() else { 149 + return Err(de::Error::invalid_value( 150 + Unexpected::Bytes(&cid.to_bytes()), 151 + &"a strict atproto sha256 CID link", 152 + )); 153 + }; 154 + things.push(NodeThing::ChildNode(strict)); 155 155 } 156 156 157 157 prefix = rkey;
+40 -41
src/walk.rs
··· 1 1 //! Depth-first MST traversal 2 2 3 - use crate::mst::{Depth, MstNode, NodeThing, ThingKind}; 4 - use crate::{Bytes, HashMap, Rkey, noop, disk::DiskStore, drive::MaybeProcessedBlock}; 3 + use crate::link::{NodeThing, ObjectLink}; 4 + use crate::mst::{Depth, MstNode}; 5 + use crate::{Bytes, HashMap, Rkey, disk::DiskStore, drive::MaybeProcessedBlock, noop}; 5 6 use cid::Cid; 6 7 use std::convert::Infallible; 7 8 ··· 23 24 /// Errors from invalid Rkeys 24 25 #[derive(Debug, PartialEq, thiserror::Error)] 25 26 pub enum MstError { 27 + #[error("Bad MST root node: {0}")] 28 + MissingRootNode(crate::link::NotStrictError), 26 29 #[error("Nodes cannot be empty (except for an entirely empty MST)")] 27 30 EmptyNode, 28 31 #[error("Expected node to be at depth {expected}, but it was at {depth}")] ··· 68 71 69 72 fn mpb_step( 70 73 &mut self, 71 - kind: ThingKind, 72 - cid: Cid, 74 + thing: NodeThing, 73 75 mpb: &MaybeProcessedBlock, 74 76 process: impl Fn(Bytes) -> Bytes, 75 77 ) -> Result<Option<Output>, WalkError> { 76 - match kind { 77 - ThingKind::Value { rkey } => { 78 + match thing { 79 + NodeThing::Record(rkey, link) => { 78 80 let data = match mpb { 79 81 MaybeProcessedBlock::Raw(data) => process(data.clone()), 80 82 MaybeProcessedBlock::Processed(t) => t.clone(), ··· 89 91 self.prev_rkey = rkey.clone(); 90 92 91 93 log::trace!("val @ {rkey}"); 92 - Ok(Some(Output { rkey, cid, data })) 94 + Ok(Some(Output { 95 + rkey, 96 + cid: link.into(), 97 + data, 98 + })) 93 99 } 94 - ThingKind::Tree => { 100 + NodeThing::ChildNode(_) => { 95 101 let MaybeProcessedBlock::Raw(data) = mpb else { 96 102 return Err(WalkError::BadCommitFingerprint); 97 103 }; ··· 138 144 /// Advance through nodes until we find a record or can't go further 139 145 pub fn step( 140 146 &mut self, 141 - blocks: &mut HashMap<Cid, MaybeProcessedBlock>, 147 + blocks: &HashMap<ObjectLink, MaybeProcessedBlock>, 142 148 process: impl Fn(Bytes) -> Bytes, 143 149 ) -> Result<Step, WalkError> { 144 - while let Some(NodeThing { cid, kind }) = self.next_todo() { 145 - let Some(mpb) = blocks.get(&cid) else { 146 - return Err(WalkError::MissingBlock(NodeThing { cid, kind })); 150 + while let Some(thing) = self.next_todo() { 151 + let Some(mpb) = blocks.get(&thing.link()) else { 152 + return Err(WalkError::MissingBlock(thing)); 147 153 }; 148 - if let Some(out) = self.mpb_step(kind, cid, mpb, &process)? { 154 + if let Some(out) = self.mpb_step(thing, mpb, &process)? { 149 155 return Ok(Step::Value(out)); 150 156 } 151 157 } 152 158 Ok(Step::End(None)) 153 159 } 154 160 155 - #[allow(dead_code)] 156 - pub fn edge( 161 + pub fn step_to_edge( 157 162 &mut self, 158 - blocks: &mut HashMap<Cid, MaybeProcessedBlock>, 163 + blocks: &HashMap<ObjectLink, MaybeProcessedBlock>, 159 164 ) -> Result<Option<Rkey>, WalkError> { 160 165 let mut ant = self.clone(); 161 - let mut rkey_prev: Option<Rkey> = None; 162 - let dummy_mpb = MaybeProcessedBlock::Raw(vec![].into()); 163 - 164 - while let Some(NodeThing { cid, kind }) = ant.next_todo() { 165 - match kind { 166 - ThingKind::Tree => { 167 - let mpb = blocks 168 - .get(&cid) 169 - .ok_or_else(|| WalkError::MissingBlock(NodeThing { cid, kind: ThingKind::Tree }))?; 170 - assert!(ant.mpb_step(kind, cid, mpb, noop)?.is_none()); 171 - } 172 - ThingKind::Value { ref rkey } => { 173 - if blocks.get(&cid).is_some() { 174 - return Ok(rkey_prev); 166 + let mut rkey_prev = None; 167 + loop { 168 + match ant.step(blocks, noop) { 169 + Err(WalkError::MissingBlock(NodeThing::Record(rkey, _))) => { 170 + // missing record is our game here! keep going,, 171 + // (after checking rkey order bc the error path that got us here skips it) 172 + if let Some(prev) = rkey_prev 173 + && rkey <= prev 174 + { 175 + return Err(WalkError::MstError(MstError::RkeyOutOfOrder { rkey, prev })); 175 176 } 176 - rkey_prev = Some(rkey.clone()); 177 - assert!(ant.mpb_step(kind, cid, &dummy_mpb, noop)?.is_some()); 178 - 179 177 *self = ant; 180 178 ant = self.clone(); 179 + rkey_prev = Some(rkey); 181 180 } 182 - }; 183 - 181 + Err(anyother) => return Err(anyother), 182 + Ok(_) => return Ok(rkey_prev), // oop real record, mutant went too far 183 + } 184 184 } 185 - Ok(None) 186 185 } 187 186 188 187 /// blocking!!!!!! 189 188 pub fn disk_step( 190 189 &mut self, 191 - blocks: &mut DiskStore, 190 + blocks: &DiskStore, 192 191 process: impl Fn(Bytes) -> Bytes, 193 192 ) -> Result<Step, WalkError> { 194 - while let Some(NodeThing { cid, kind }) = self.next_todo() { 195 - let Some(block_slice) = blocks.get(&cid.to_bytes())? else { 196 - return Err(WalkError::MissingBlock(NodeThing { cid, kind })); 193 + while let Some(thing) = self.next_todo() { 194 + let Some(block_slice) = blocks.get(&thing.link().to_bytes())? else { 195 + return Err(WalkError::MissingBlock(thing)); 197 196 }; 198 197 let mpb = MaybeProcessedBlock::from_bytes(block_slice.to_vec()); 199 - if let Some(out) = self.mpb_step(kind, cid, &mpb, &process)? { 198 + if let Some(out) = self.mpb_step(thing, &mpb, &process)? { 200 199 return Ok(Step::Value(out)); 201 200 } 202 201 }