Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

fmt

authored by

phil and committed by tangled.org 2b285f7a 0d115654

+77 -92
+4 -9
benches/huge-car.rs
··· 32 32 let reader = tokio::fs::File::open(filename).await.unwrap(); 33 33 let reader = tokio::io::BufReader::new(reader); 34 34 35 - let mut driver = 36 - match Driver::load_car(reader, ser, 1024) 37 - .await 38 - .unwrap() 39 - .unwrap() 40 - { 41 - Driver::Memory(_, mem_driver) => mem_driver, 42 - Driver::Disk(_) => panic!("not doing disk for benchmark"), 43 - }; 35 + let mut driver = match Driver::load_car(reader, ser, 1024).await.unwrap().unwrap() { 36 + Driver::Memory(_, mem_driver) => mem_driver, 37 + Driver::Disk(_) => panic!("not doing disk for benchmark"), 38 + }; 44 39 45 40 let mut n = 0; 46 41 while let Some(pairs) = driver.next_chunk(256).await.unwrap() {
+22 -25
benches/leading.rs
··· 1 - use criterion::{Criterion, BatchSize, criterion_group, criterion_main}; 2 - use sha2::{Sha256, Digest}; 1 + use criterion::{BatchSize, Criterion, criterion_group, criterion_main}; 3 2 use hmac_sha256::Hash; 3 + use sha2::{Digest, Sha256}; 4 4 5 5 pub fn compute(bytes: [u8; 32]) -> u32 { 6 6 let mut zeros = 0; ··· 16 16 } 17 17 18 18 pub fn compute2(bytes: [u8; 32]) -> u32 { 19 - u128::from_be_bytes(bytes.split_at(16).0.try_into().unwrap()) 20 - .leading_zeros() / 2 19 + u128::from_be_bytes(bytes.split_at(16).0.try_into().unwrap()).leading_zeros() / 2 21 20 } 22 21 23 22 fn from_key_old(key: &[u8]) -> u32 { ··· 30 29 31 30 pub fn criterion_benchmark(c: &mut Criterion) { 32 31 for (name, case) in [ 33 - ("no zeros", [0xFF; 32]), 34 - ("two zeros", [0x3F; 32]), 35 - ("some zeros", [0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]), 36 - ("many zeros", [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]), 32 + ("no zeros", [0xFF; 32]), 33 + ("two zeros", [0x3F; 32]), 34 + ( 35 + "some zeros", 36 + [ 37 + 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 38 + 1, 1, 1, 1, 39 + ], 40 + ), 41 + ( 42 + "many zeros", 43 + [ 44 + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 45 + 1, 1, 1, 1, 46 + ], 47 + ), 37 48 ] { 38 49 let mut g = c.benchmark_group(name); 39 50 g.bench_function("old", |b| { 40 - b.iter_batched( 41 - || case.clone(), 42 - |c| compute(c), 43 - BatchSize::SmallInput, 44 - ) 51 + b.iter_batched(|| case.clone(), |c| compute(c), BatchSize::SmallInput) 45 52 }); 46 53 g.bench_function("new", |b| { 47 - b.iter_batched( 48 - || case.clone(), 49 - |c| compute2(c), 50 - BatchSize::SmallInput, 51 - ) 54 + b.iter_batched(|| case.clone(), |c| compute2(c), BatchSize::SmallInput) 52 55 }); 53 56 } 54 57 55 - for case in [ 56 - "a", 57 - "aa", 58 - "aaa", 59 - "aaaa", 60 - ] { 58 + for case in ["a", "aa", "aaa", "aaaa"] { 61 59 let mut g = c.benchmark_group(case); 62 60 g.bench_function("old", |b| b.iter(|| from_key_old(case.as_bytes()))); 63 61 g.bench_function("new", |b| b.iter(|| from_key_new(case.as_bytes()))); 64 62 } 65 63 } 66 - 67 64 68 65 criterion_group!(benches, criterion_benchmark); 69 66 criterion_main!(benches);
+1 -4
benches/non-huge-cars.rs
··· 39 39 } 40 40 41 41 async fn drive_car(bytes: &[u8]) -> usize { 42 - let mut driver = match Driver::load_car(bytes, ser, 32) 43 - .await 44 - .unwrap() 45 - { 42 + let mut driver = match Driver::load_car(bytes, ser, 32).await.unwrap() { 46 43 None => return 0, 47 44 Some(Driver::Memory(_, mem_driver)) => mem_driver, 48 45 Some(Driver::Disk(_)) => panic!("not benching big cars here"),
+1 -2
src/disk.rs
··· 17 17 ``` 18 18 */ 19 19 20 - use crate::Bytes; 21 - use crate::drive::DriveError; 20 + use crate::{Bytes, drive::DriveError}; 22 21 use fjall::{Database, Error as FjallError, Keyspace, KeyspaceCreateOptions}; 23 22 use std::path::PathBuf; 24 23
+13 -7
src/drive.rs
··· 1 1 //! Consume a CAR from an AsyncRead, producing an ordered stream of records 2 2 3 - use crate::walk::Output; 4 - use crate::Bytes; 5 - use crate::HashMap; 6 - use crate::disk::{DiskError, DiskStore}; 7 - use crate::mst::MstNode; 3 + use crate::{ 4 + Bytes, HashMap, 5 + disk::{DiskError, DiskStore}, 6 + mst::MstNode, 7 + walk::Output, 8 + }; 8 9 use cid::Cid; 9 10 use iroh_car::CarReader; 10 11 use std::convert::Infallible; ··· 254 255 let commit = commit.ok_or(DriveError::MissingCommit)?; 255 256 256 257 // the commit always must point to a Node; empty node => empty MST special case 257 - let root_node: MstNode = match mem_blocks.get(&commit.data).ok_or(DriveError::MissingCommit)? { 258 + let root_node: MstNode = match mem_blocks 259 + .get(&commit.data) 260 + .ok_or(DriveError::MissingCommit)? 261 + { 258 262 MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?, 259 263 MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(bytes)?, 260 264 }; ··· 300 304 let mut out = Vec::with_capacity(n); 301 305 for _ in 0..n { 302 306 // walk as far as we can until we run out of blocks or find a record 303 - let Some(Output { rkey, cid: _, data }) = self.walker.step(&mut self.blocks, self.process)? else { 307 + let Some(Output { rkey, cid: _, data }) = 308 + self.walker.step(&mut self.blocks, self.process)? 309 + else { 304 310 break; 305 311 }; 306 312 out.push((rkey, data));
+19 -14
src/mst.rs
··· 3 3 //! The primary aim is to work through the **tree** structure. Non-node blocks 4 4 //! are left as raw bytes, for upper levels to parse into DAG-CBOR or whatever. 5 5 6 - use sha2::{Digest, Sha256}; 7 6 use cid::Cid; 8 7 use serde::Deserialize; 8 + use sha2::{Digest, Sha256}; 9 9 10 10 /// The top-level data object in a repository's tree is a signed commit. 11 11 #[derive(Debug, Deserialize)] ··· 37 37 pub sig: serde_bytes::ByteBuf, 38 38 } 39 39 40 - use serde::de::{self, Deserializer, Visitor, MapAccess, Unexpected}; 40 + use serde::de::{self, Deserializer, MapAccess, Unexpected, Visitor}; 41 41 use std::fmt; 42 42 43 43 pub type Depth = u32; ··· 97 97 } 98 98 found_left = true; 99 99 if let Some(cid) = map.next_value()? { 100 - left = Some(NodeThing { cid, kind: ThingKind::Tree }); 100 + left = Some(NodeThing { 101 + cid, 102 + kind: ThingKind::Tree, 103 + }); 101 104 } 102 105 } 103 106 "e" => { ··· 110 113 111 114 for entry in map.next_value::<Vec<Entry>>()? { 112 115 let mut rkey: Vec<u8> = vec![]; 113 - let pre_checked = prefix 114 - .get(..entry.prefix_len) 115 - .ok_or_else(|| de::Error::invalid_value( 116 - Unexpected::Bytes(&prefix), 117 - &"a prefix at least as long as the prefix_len", 118 - ))?; 116 + let pre_checked = 117 + prefix.get(..entry.prefix_len).ok_or_else(|| { 118 + de::Error::invalid_value( 119 + Unexpected::Bytes(&prefix), 120 + &"a prefix at least as long as the prefix_len", 121 + ) 122 + })?; 119 123 120 124 rkey.extend_from_slice(pre_checked); 121 125 rkey.extend_from_slice(&entry.keysuffix); 122 126 123 - let rkey_s = String::from_utf8(rkey.clone()) 124 - .map_err(|_| de::Error::invalid_value( 127 + let rkey_s = String::from_utf8(rkey.clone()).map_err(|_| { 128 + de::Error::invalid_value( 125 129 Unexpected::Bytes(&rkey), 126 130 &"a valid utf-8 rkey", 127 - ))?; 131 + ) 132 + })?; 128 133 129 134 let key_depth = atproto_mst_depth(&rkey_s); 130 135 if depth.is_none() { ··· 150 155 151 156 prefix = rkey; 152 157 } 153 - }, 154 - f => return Err(de::Error::unknown_field(f, NODE_FIELDS)) 158 + } 159 + f => return Err(de::Error::unknown_field(f, NODE_FIELDS)), 155 160 } 156 161 } 157 162 if !found_left {
+16 -26
src/walk.rs
··· 1 1 //! Depth-first MST traversal 2 2 3 - use crate::mst::NodeThing; 4 - use crate::mst::ThingKind; 5 - use crate::mst::MstNode; 6 - use crate::mst::Depth; 7 - use crate::Bytes; 8 - use crate::HashMap; 9 - use crate::disk::DiskStore; 10 - use crate::drive::MaybeProcessedBlock; 3 + use crate::mst::{Depth, MstNode, NodeThing, ThingKind}; 4 + use crate::{Bytes, HashMap, disk::DiskStore, drive::MaybeProcessedBlock}; 11 5 use cid::Cid; 12 6 use std::convert::Infallible; 13 7 ··· 60 54 } 61 55 62 56 impl Walker { 63 - pub fn new( 64 - root_node: MstNode, 65 - ) -> Option<Self> { 57 + pub fn new(root_node: MstNode) -> Option<Self> { 66 58 Some(Self { 67 59 prev_rkey: "".to_string(), 68 60 root_depth: root_node.depth?, ··· 93 85 self.prev_rkey = rkey.clone(); 94 86 95 87 log::trace!("val @ {rkey}"); 96 - Ok(Some(Output { 97 - rkey, 98 - cid, 99 - data, 100 - })) 88 + Ok(Some(Output { rkey, cid, data })) 101 89 } 102 90 ThingKind::Tree => { 103 91 let MaybeProcessedBlock::Raw(data) = mpb else { 104 92 return Err(WalkError::BadCommitFingerprint); 105 93 }; 106 94 107 - let node: MstNode = serde_ipld_dagcbor::from_slice(&data) 108 - .map_err(WalkError::BadCommit)?; 95 + let node: MstNode = 96 + serde_ipld_dagcbor::from_slice(data).map_err(WalkError::BadCommit)?; 109 97 110 98 if node.is_empty() { 111 99 return Err(WalkError::MstError(MstError::EmptyNode)); 112 100 } 113 101 114 102 let current_depth = self.root_depth - (self.todo.len() - 1) as u32; 115 - let next_depth = current_depth.checked_sub(1).ok_or(MstError::DepthUnderflow)?; 116 - if let Some(d) = node.depth { 117 - if d != next_depth { 118 - return Err(WalkError::MstError(MstError::WrongDepth { 119 - depth: d, 120 - expected: next_depth, 121 - })); 122 - } 103 + let next_depth = current_depth 104 + .checked_sub(1) 105 + .ok_or(MstError::DepthUnderflow)?; 106 + if let Some(d) = node.depth 107 + && d != next_depth 108 + { 109 + return Err(WalkError::MstError(MstError::WrongDepth { 110 + depth: d, 111 + expected: next_depth, 112 + })); 123 113 } 124 114 125 115 log::trace!("node into depth {next_depth}");
+1 -5
tests/mst-depth.rs
··· 205 205 #[test] 206 206 fn test_ietf_example_keys() { 207 207 // https://atproto.com/specs/repository#mst-structure 208 - for (key, expected) in [ 209 - ("key1", 0), 210 - ("key7", 1), 211 - ("key515", 4), 212 - ] { 208 + for (key, expected) in [("key1", 0), ("key7", 1), ("key515", 4)] { 213 209 let computed = atproto_mst_depth(key); 214 210 assert_eq!(computed, expected); 215 211 }