Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

simpler depth handling

authored by

phil and committed by tangled.org 843447f4 f11372c9

+64 -44
+1
Cargo.toml
··· 27 27 tempfile = "3.23.0" 28 28 tokio = { version = "1.47.1", features = ["full"] } 29 29 mimalloc = "0.1.48" 30 + hmac-sha256 = "1.1.12" 30 31 31 32 [profile.profiling] 32 33 inherits = "release"
+4
benches/huge-car.rs
··· 4 4 5 5 use criterion::{Criterion, criterion_group, criterion_main}; 6 6 7 + // use mimalloc::MiMalloc; 8 + // #[global_allocator] 9 + // static GLOBAL: MiMalloc = MiMalloc; 10 + 7 11 pub fn criterion_benchmark(c: &mut Criterion) { 8 12 let rt = tokio::runtime::Builder::new_multi_thread() 9 13 .enable_all()
+4
benches/non-huge-cars.rs
··· 3 3 4 4 use criterion::{Criterion, criterion_group, criterion_main}; 5 5 6 + // use mimalloc::MiMalloc; 7 + // #[global_allocator] 8 + // static GLOBAL: MiMalloc = MiMalloc; 9 + 6 10 const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car"); 7 11 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 8 12 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car");
+22 -6
readme.md
··· 58 58 ``` 59 59 60 60 more recent todo 61 - 62 - - [ ] get an *emtpy* car for the test suite 61 + - [ ] repo car slices 62 + - [ ] lazy-value stream (rkey -> CID diffing for tap-like `#sync` handling) 63 + - [x] get an *emtpy* car for the test suite 63 64 - [x] implement a max size on disk limit 64 65 65 66 ··· 70 71 71 72 current car processing times (records processed into their length usize, phil's dev machine): 72 73 73 - - 128MiB CAR file: `347ms` 74 - - 5.0MiB: `6.1ms` 75 - - 279KiB: `139us` 76 - - 3.4KiB: `4.9us` 74 + - 128MiB CAR file: `350ms` 75 + - 5.0MiB: `6.8ms` 76 + - 279KiB: `170us` 77 + - 3.4KiB: `5.2us` 78 + - empty: `710ns` 79 + 80 + it's a little faster with `mimalloc` 81 + 82 + ```rust 83 + use mimalloc::MiMalloc; 84 + #[global_allocator] 85 + static GLOBAL: MiMalloc = MiMalloc; 86 + ``` 87 + 88 + - 128MiB CAR file: `310ms` (-13%) 89 + - 5.0MiB: `6.1ms` (-10%) 90 + - 279KiB: `160us` (-5%) 91 + - 3.4KiB: `5.7us` (-9%) 92 + - empty: `660ns` (-7%) 77 93 78 94 79 95 running the huge-car benchmark
+5 -11
src/drive.rs
··· 254 254 let commit = commit.ok_or(DriveError::MissingCommit)?; 255 255 256 256 // the commit always must point to a Node; empty node => empty MST special case 257 - let node: MstNode = match mem_blocks.get(&commit.data).ok_or(DriveError::MissingCommit)? { 257 + let root_node: MstNode = match mem_blocks.get(&commit.data).ok_or(DriveError::MissingCommit)? { 258 258 MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?, 259 259 MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(bytes)?, 260 260 }; 261 - if node.is_empty() { 261 + let Some(walker) = Walker::new(root_node) else { 262 262 // TODO: actually we still want the commit in this case 263 263 return Ok(None); 264 - } 265 - let depth = node.depth.unwrap(); 266 - 267 - let walker = Walker::new(commit.data, depth); 264 + }; 268 265 269 266 Ok(Some(Driver::Memory( 270 267 commit, ··· 412 409 MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?, 413 410 MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(&bytes)?, 414 411 }; 415 - if node.is_empty() { 412 + let Some(walker) = Walker::new(node) else { 416 413 return Ok((commit, None)); 417 - } 418 - let depth = node.depth.unwrap(); 419 - 420 - let walker = Walker::new(commit.data, depth); 414 + }; 421 415 422 416 Ok(( 423 417 commit,
+28 -27
src/walk.rs
··· 55 55 #[derive(Debug)] 56 56 pub struct Walker { 57 57 prev_rkey: String, 58 - todo: Vec<(Depth, NodeThing)>, 58 + root_depth: Depth, 59 + todo: Vec<Vec<NodeThing>>, 59 60 } 60 61 61 62 impl Walker { 62 63 pub fn new( 63 - root_cid: Cid, 64 - depth: Depth, 65 - ) -> Self { 66 - Self { 64 + root_node: MstNode, 65 + ) -> Option<Self> { 66 + Some(Self { 67 67 prev_rkey: "".to_string(), 68 - todo: vec![( 69 - depth + 1, // we're kind of inventing a fake root one above the real root 70 - // ... maybe we should just pass in the real root here??? 71 - NodeThing { 72 - cid: root_cid, 73 - kind: ThingKind::Tree, 74 - }, 75 - )], 68 + root_depth: root_node.depth?, 69 + todo: vec![root_node.things], 70 + }) 71 + } 72 + 73 + fn next_todo(&mut self) -> Option<NodeThing> { 74 + while let Some(last) = self.todo.last_mut() { 75 + let Some(thing) = last.pop() else { 76 + self.todo.pop(); 77 + continue; 78 + }; 79 + return Some(thing); 76 80 } 81 + None 77 82 } 78 83 79 84 fn mpb_step( 80 85 &mut self, 81 - depth: Depth, 82 86 kind: ThingKind, 83 87 cid: Cid, 84 88 mpb: &MaybeProcessedBlock, ··· 99 103 } 100 104 self.prev_rkey = rkey.clone(); 101 105 106 + log::trace!("val @ {rkey}"); 102 107 Ok(Some(Output { 103 108 rkey, 104 109 cid, ··· 117 122 return Err(WalkError::MstError(MstError::EmptyNode)); 118 123 } 119 124 120 - let next_depth = depth.checked_sub(1).ok_or(MstError::DepthUnderflow)?; 125 + let current_depth = self.root_depth - (self.todo.len() - 1) as u32; 126 + let next_depth = current_depth.checked_sub(1).ok_or(MstError::DepthUnderflow)?; 121 127 if let Some(d) = node.depth { 122 128 if d != next_depth { 123 129 return Err(WalkError::MstError(MstError::WrongDepth { ··· 127 133 } 128 134 } 129 135 130 - for thing in node.things { 131 - self.todo.push((next_depth, thing)); 132 - } 133 - 136 + log::trace!("node into depth {next_depth}"); 137 + self.todo.push(node.things); 134 138 Ok(None) 135 139 } 136 140 } ··· 143 147 process: impl Fn(Bytes) -> Bytes, 144 148 ) -> Result<Option<Output>, WalkError> { 145 149 146 - while let Some((depth, NodeThing { cid, kind })) = self.todo.pop() { 150 + while let Some(NodeThing { cid, kind }) = self.next_todo() { 147 151 let Some(mpb) = blocks.get(&cid) else { 148 152 return Err(WalkError::MissingBlock(cid)); 149 153 }; 150 - if let Some(out) = self.mpb_step(depth, kind, cid, mpb, &process)? { 154 + 155 + if let Some(out) = self.mpb_step(kind, cid, mpb, &process)? { 151 156 return Ok(Some(out)); 152 157 } 153 158 } 154 - 155 - log::trace!("tried to walk but we're actually done."); 156 159 Ok(None) 157 160 } 158 161 ··· 162 165 blocks: &mut DiskStore, 163 166 process: impl Fn(Bytes) -> Bytes, 164 167 ) -> Result<Option<Output>, WalkError> { 165 - 166 - while let Some((depth, NodeThing { cid, kind })) = self.todo.pop() { 168 + while let Some(NodeThing { cid, kind }) = self.next_todo() { 167 169 let Some(block_slice) = blocks.get(&cid.to_bytes())? else { 168 170 return Err(WalkError::MissingBlock(cid)); 169 171 }; 170 172 let mpb = MaybeProcessedBlock::from_bytes(block_slice.to_vec()); 171 - if let Some(out) = self.mpb_step(depth, kind, cid, &mpb, &process)? { 173 + if let Some(out) = self.mpb_step(kind, cid, &mpb, &process)? { 172 174 return Ok(Some(out)); 173 175 } 174 176 } 175 - log::trace!("tried to walk but we're actually done."); 176 177 Ok(None) 177 178 } 178 179 }