Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

validate mst key depth and fix tests

phil 9de3622e f7be2ed6

+283 -58
+72
Cargo.lock
··· 152 152 checksum = "2261d10cca569e4643e526d8dc2e62e433cc8aba21ab764233731f8d369bf394" 153 153 154 154 [[package]] 155 + name = "block-buffer" 156 + version = "0.10.4" 157 + source = "registry+https://github.com/rust-lang/crates.io-index" 158 + checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" 159 + dependencies = [ 160 + "generic-array", 161 + ] 162 + 163 + [[package]] 155 164 name = "bumpalo" 156 165 version = "3.19.0" 157 166 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 287 296 ] 288 297 289 298 [[package]] 299 + name = "cpufeatures" 300 + version = "0.2.17" 301 + source = "registry+https://github.com/rust-lang/crates.io-index" 302 + checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" 303 + dependencies = [ 304 + "libc", 305 + ] 306 + 307 + [[package]] 290 308 name = "criterion" 291 309 version = "0.7.0" 292 310 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 352 370 checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" 353 371 354 372 [[package]] 373 + name = "crypto-common" 374 + version = "0.1.6" 375 + source = "registry+https://github.com/rust-lang/crates.io-index" 376 + checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" 377 + dependencies = [ 378 + "generic-array", 379 + "typenum", 380 + ] 381 + 382 + [[package]] 355 383 name = "data-encoding" 356 384 version = "2.9.0" 357 385 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 378 406 ] 379 407 380 408 [[package]] 409 + name = "digest" 410 + version = "0.10.7" 411 + source = "registry+https://github.com/rust-lang/crates.io-index" 412 + checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" 413 + dependencies = [ 414 + "block-buffer", 415 + "crypto-common", 416 + ] 417 + 418 + [[package]] 381 419 name = "either" 382 420 version = "1.15.0" 383 421 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 527 565 "pin-project-lite", 528 566 "pin-utils", 529 567 "slab", 568 + ] 569 + 570 + [[package]] 571 + name = "generic-array" 572 + version = "0.14.9" 573 + source = "registry+https://github.com/rust-lang/crates.io-index" 574 + checksum = "4bb6743198531e02858aeaea5398fcc883e71851fcbcb5a2f773e2fb6cb1edf2" 575 + dependencies = [ 576 + "typenum", 577 + "version_check", 530 578 ] 531 579 532 580 [[package]] ··· 992 1040 "serde", 993 1041 "serde_bytes", 994 1042 "serde_ipld_dagcbor", 1043 + "sha2", 995 1044 "tempfile", 996 1045 "thiserror 2.0.17", 997 1046 "tokio", ··· 1123 1172 ] 1124 1173 1125 1174 [[package]] 1175 + name = "sha2" 1176 + version = "0.10.9" 1177 + source = "registry+https://github.com/rust-lang/crates.io-index" 1178 + checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" 1179 + dependencies = [ 1180 + "cfg-if", 1181 + "cpufeatures", 1182 + "digest", 1183 + ] 1184 + 1185 + [[package]] 1126 1186 name = "signal-hook-registry" 1127 1187 version = "1.4.6" 1128 1188 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1276 1336 ] 1277 1337 1278 1338 [[package]] 1339 + name = "typenum" 1340 + version = "1.19.0" 1341 + source = "registry+https://github.com/rust-lang/crates.io-index" 1342 + checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" 1343 + 1344 + [[package]] 1279 1345 name = "unicode-ident" 1280 1346 version = "1.0.19" 1281 1347 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1310 1376 version = "0.2.15" 1311 1377 source = "registry+https://github.com/rust-lang/crates.io-index" 1312 1378 checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" 1379 + 1380 + [[package]] 1381 + name = "version_check" 1382 + version = "0.9.5" 1383 + source = "registry+https://github.com/rust-lang/crates.io-index" 1384 + checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" 1313 1385 1314 1386 [[package]] 1315 1387 name = "virtue"
+1
Cargo.toml
··· 18 18 serde = { version = "1.0.228", features = ["derive"] } 19 19 serde_bytes = "0.11.19" 20 20 serde_ipld_dagcbor = "0.6.4" 21 + sha2 = "0.10.9" 21 22 thiserror = "2.0.17" 22 23 tokio = { version = "1.47.1", features = ["rt", "sync"] } 23 24
+12 -1
examples/read-file/main.rs
··· 1 1 extern crate repo_stream; 2 2 use clap::Parser; 3 + use repo_stream::drive::Processable; 4 + use serde::{Deserialize, Serialize}; 3 5 use std::path::PathBuf; 4 6 5 7 type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; ··· 10 12 file: PathBuf, 11 13 } 12 14 15 + #[derive(Clone, Serialize, Deserialize)] 16 + struct S(usize); 17 + 18 + impl Processable for S { 19 + fn get_size(&self) -> usize { 20 + 0 // no additional space taken, just its stack size (newtype is free) 21 + } 22 + } 23 + 13 24 #[tokio::main] 14 25 async fn main() -> Result<()> { 15 26 env_logger::init(); ··· 19 30 let reader = tokio::io::BufReader::new(reader); 20 31 21 32 let (commit, mut driver) = 22 - match repo_stream::drive::load_car(reader, |block| block.len(), 1024 * 1024).await? { 33 + match repo_stream::drive::load_car(reader, |block| S(block.len()), 1024 * 1024).await? { 23 34 repo_stream::drive::Vehicle::Lil(commit, mem_driver) => (commit, mem_driver), 24 35 repo_stream::drive::Vehicle::Big(_) => panic!("can't handle big cars yet"), 25 36 };
-4
src/mst.rs
··· 83 83 /// with an empty array of entries. This is the only situation in which a 84 84 /// tree may contain an empty leaf node which does not either contain keys 85 85 /// ("entries") or point to a sub-tree containing entries. 86 - /// 87 - /// TODO: to me this is slightly unclear with respect to `l` (ask someone). 88 - /// ...is that what "The top of the tree must not be a an empty node which 89 - /// only points to a sub-tree." is referring to? 90 86 pub fn is_empty(&self) -> bool { 91 87 self.left.is_none() && self.entries.is_empty() 92 88 }
+169 -28
src/walk.rs
··· 4 4 use crate::drive::{MaybeProcessedBlock, Processable}; 5 5 use crate::mst::Node; 6 6 use ipld_core::cid::Cid; 7 + use sha2::{Digest, Sha256}; 7 8 use std::collections::HashMap; 8 9 use std::convert::Infallible; 9 10 ··· 17 18 #[error("Failed to decode commit block: {0}")] 18 19 BadCommit(#[from] serde_ipld_dagcbor::DecodeError<Infallible>), 19 20 #[error("Action node error: {0}")] 20 - RkeyError(#[from] RkeyError), 21 + MstError(#[from] MstError), 21 22 #[error("Encountered an rkey out of order while walking the MST")] 22 23 RkeyOutOfOrder, 23 24 } ··· 34 35 } 35 36 36 37 /// Errors from invalid Rkeys 37 - #[derive(Debug, thiserror::Error)] 38 - pub enum RkeyError { 38 + #[derive(Debug, PartialEq, thiserror::Error)] 39 + pub enum MstError { 39 40 #[error("Failed to compute an rkey due to invalid prefix_len")] 40 41 EntryPrefixOutOfbounds, 41 42 #[error("RKey was not utf-8")] 42 43 EntryRkeyNotUtf8(#[from] std::string::FromUtf8Error), 44 + #[error("Nodes cannot be empty (except for an entirely empty MST)")] 45 + EmptyNode, 46 + #[error("Found an entry with rkey at the wrong depth")] 47 + WrongDepth, 48 + #[error("Lost track of our depth (possible bug?)")] 49 + LostDepth, 50 + #[error("MST depth underflow: depth-0 node with child trees")] 51 + DepthUnderflow, 43 52 } 44 53 45 54 /// Walker outputs ··· 55 64 56 65 #[derive(Debug, Clone, PartialEq)] 57 66 enum Need { 58 - Node(Cid), 67 + Node { depth: Depth, cid: Cid }, 59 68 Record { rkey: String, cid: Cid }, 60 69 } 61 70 62 - fn push_from_node(stack: &mut Vec<Need>, node: &Node) -> Result<(), RkeyError> { 71 + #[derive(Debug, Clone, Copy, PartialEq)] 72 + enum Depth { 73 + Root, 74 + Depth(u32), 75 + } 76 + 77 + impl Depth { 78 + fn from_key(key: &[u8]) -> Self { 79 + let mut zeros = 0; 80 + for byte in Sha256::digest(key) { 81 + let leading = byte.leading_zeros(); 82 + zeros += leading; 83 + if leading < 8 { 84 + break; 85 + } 86 + } 87 + Self::Depth(zeros / 2) // truncating divide (rounds down) 88 + } 89 + fn next_expected(&self) -> Result<Option<u32>, MstError> { 90 + match self { 91 + Self::Root => Ok(None), 92 + Self::Depth(d) => d.checked_sub(1).ok_or(MstError::DepthUnderflow).map(Some), 93 + } 94 + } 95 + } 96 + 97 + fn push_from_node(stack: &mut Vec<Need>, node: &Node, parent_depth: Depth) -> Result<(), MstError> { 98 + // empty nodes are not allowed in the MST 99 + // ...except for a single one for empty MST, but we wouldn't be pushing that 100 + if node.is_empty() { 101 + return Err(MstError::EmptyNode); 102 + } 103 + 63 104 let mut entries = Vec::with_capacity(node.entries.len()); 64 - 65 105 let mut prefix = vec![]; 106 + let mut this_depth = parent_depth.next_expected()?; 107 + 66 108 for entry in &node.entries { 67 109 let mut rkey = vec![]; 68 110 let pre_checked = prefix 69 111 .get(..entry.prefix_len) 70 - .ok_or(RkeyError::EntryPrefixOutOfbounds)?; 112 + .ok_or(MstError::EntryPrefixOutOfbounds)?; 71 113 rkey.extend_from_slice(pre_checked); 72 114 rkey.extend_from_slice(&entry.keysuffix); 115 + 116 + let Depth::Depth(key_depth) = Depth::from_key(&rkey) else { 117 + return Err(MstError::WrongDepth); 118 + }; 119 + 120 + // this_depth is `none` if we are the deepest child (directly below root) 121 + // in that case we accept whatever highest depth is claimed 122 + let expected_depth = match this_depth { 123 + Some(d) => d, 124 + None => { 125 + this_depth = Some(key_depth); 126 + key_depth 127 + } 128 + }; 129 + 130 + // all keys we find should be this depth 131 + if key_depth != expected_depth { 132 + return Err(MstError::DepthUnderflow); 133 + } 134 + 73 135 prefix = rkey.clone(); 74 136 75 137 entries.push(Need::Record { ··· 77 139 cid: entry.value, 78 140 }); 79 141 if let Some(ref tree) = entry.tree { 80 - entries.push(Need::Node(*tree)); 142 + entries.push(Need::Node { 143 + depth: Depth::Depth(key_depth), 144 + cid: *tree, 145 + }); 81 146 } 82 147 } 83 148 84 149 entries.reverse(); 85 150 stack.append(&mut entries); 86 151 152 + let d = this_depth.ok_or(MstError::LostDepth)?; 153 + 87 154 if let Some(tree) = node.left { 88 - stack.push(Need::Node(tree)); 155 + stack.push(Need::Node { 156 + depth: Depth::Depth(d), 157 + cid: tree, 158 + }); 89 159 } 90 160 Ok(()) 91 161 } ··· 102 172 impl Walker { 103 173 pub fn new(tree_root_cid: Cid) -> Self { 104 174 Self { 105 - stack: vec![Need::Node(tree_root_cid)], 175 + stack: vec![Need::Node { 176 + depth: Depth::Root, 177 + cid: tree_root_cid, 178 + }], 106 179 prev: "".to_string(), 107 180 } 108 181 } ··· 114 187 process: impl Fn(Vec<u8>) -> T, 115 188 ) -> Result<Step<T>, Trip> { 116 189 loop { 117 - let Some(mut need) = self.stack.last() else { 190 + let Some(need) = self.stack.last_mut() else { 118 191 log::trace!("tried to walk but we're actually done."); 119 192 return Ok(Step::Finish); 120 193 }; 121 194 122 - match &mut need { 123 - Need::Node(cid) => { 195 + match need { 196 + &mut Need::Node { depth, cid } => { 124 197 log::trace!("need node {cid:?}"); 125 - let Some(block) = blocks.remove(cid) else { 198 + let Some(block) = blocks.remove(&cid) else { 126 199 log::trace!("node not found, resting"); 127 - return Ok(Step::Missing(*cid)); 200 + return Ok(Step::Missing(cid)); 128 201 }; 129 202 130 203 let MaybeProcessedBlock::Raw(data) = block else { ··· 137 210 self.stack.pop(); 138 211 139 212 // queue up work on the found node next 140 - push_from_node(&mut self.stack, &node)?; 213 + push_from_node(&mut self.stack, &node, depth)?; 141 214 } 142 215 Need::Record { rkey, cid } => { 143 216 log::trace!("need record {cid:?}"); ··· 176 249 process: impl Fn(Vec<u8>) -> T, 177 250 ) -> Result<Step<T>, DiskTrip> { 178 251 loop { 179 - let Some(mut need) = self.stack.last() else { 252 + let Some(need) = self.stack.last_mut() else { 180 253 log::trace!("tried to walk but we're actually done."); 181 254 return Ok(Step::Finish); 182 255 }; 183 256 184 - match &mut need { 185 - Need::Node(cid) => { 257 + match need { 258 + &mut Need::Node { depth, cid } => { 186 259 let cid_bytes = cid.to_bytes(); 187 260 log::trace!("need node {cid:?}"); 188 261 let Some(block_bytes) = reader.get(cid_bytes)? else { 189 262 log::trace!("node not found, resting"); 190 - return Ok(Step::Missing(*cid)); 263 + return Ok(Step::Missing(cid)); 191 264 }; 192 265 193 266 let block: MaybeProcessedBlock<T> = crate::drive::decode(&block_bytes)?; ··· 202 275 self.stack.pop(); 203 276 204 277 // queue up work on the found node next 205 - push_from_node(&mut self.stack, &node).map_err(Trip::RkeyError)?; 278 + push_from_node(&mut self.stack, &node, depth).map_err(Trip::MstError)?; 206 279 } 207 280 Need::Record { rkey, cid } => { 208 281 log::trace!("need record {cid:?}"); ··· 289 362 // } 290 363 291 364 #[test] 292 - fn test_next_from_node_empty() { 293 - let node = Node { 365 + fn test_depth_spec_0() { 366 + let d = Depth::from_key(b"2653ae71"); 367 + assert_eq!(d, Depth::Depth(0)) 368 + } 369 + 370 + #[test] 371 + fn test_depth_spec_1() { 372 + let d = Depth::from_key(b"blue"); 373 + assert_eq!(d, Depth::Depth(1)) 374 + } 375 + 376 + #[test] 377 + fn test_depth_spec_4() { 378 + let d = Depth::from_key(b"app.bsky.feed.post/454397e440ec"); 379 + assert_eq!(d, Depth::Depth(4)) 380 + } 381 + 382 + #[test] 383 + fn test_depth_spec_8() { 384 + let d = Depth::from_key(b"app.bsky.feed.post/9adeb165882c"); 385 + assert_eq!(d, Depth::Depth(8)) 386 + } 387 + 388 + #[test] 389 + fn test_depth_ietf_draft_0() { 390 + let d = Depth::from_key(b"key1"); 391 + assert_eq!(d, Depth::Depth(0)) 392 + } 393 + 394 + #[test] 395 + fn test_depth_ietf_draft_1() { 396 + let d = Depth::from_key(b"key7"); 397 + assert_eq!(d, Depth::Depth(1)) 398 + } 399 + 400 + #[test] 401 + fn test_depth_ietf_draft_4() { 402 + let d = Depth::from_key(b"key515"); 403 + assert_eq!(d, Depth::Depth(4)) 404 + } 405 + 406 + #[test] 407 + fn test_depth_interop() { 408 + // examples from https://github.com/bluesky-social/atproto-interop-tests/blob/main/mst/key_heights.json 409 + for (k, expected) in [ 410 + ("", 0), 411 + ("asdf", 0), 412 + ("blue", 1), 413 + ("2653ae71", 0), 414 + ("88bfafc7", 2), 415 + ("2a92d355", 4), 416 + ("884976f5", 6), 417 + ("app.bsky.feed.post/454397e440ec", 4), 418 + ("app.bsky.feed.post/9adeb165882c", 8), 419 + ] { 420 + let d = Depth::from_key(k.as_bytes()); 421 + assert_eq!(d, Depth::Depth(expected), "key: {}", k); 422 + } 423 + } 424 + 425 + #[test] 426 + fn test_push_empty_fails() { 427 + let empty_node = Node { 294 428 left: None, 295 429 entries: vec![], 296 430 }; 297 431 let mut stack = vec![]; 298 - push_from_node(&mut stack, &node).unwrap(); 299 - assert_eq!(stack.last(), None); 432 + let err = push_from_node(&mut stack, &empty_node, Depth::Depth(4)); 433 + assert_eq!(err, Err(MstError::EmptyNode)); 300 434 } 301 435 302 436 #[test] 303 - fn test_needs_from_node_just_left() { 437 + fn test_push_one_node() { 304 438 let node = Node { 305 439 left: Some(cid1()), 306 440 entries: vec![], 307 441 }; 308 442 let mut stack = vec![]; 309 - push_from_node(&mut stack, &node).unwrap(); 310 - assert_eq!(stack.last(), Some(Need::Node(cid1())).as_ref()); 443 + push_from_node(&mut stack, &node, Depth::Depth(4)).unwrap(); 444 + assert_eq!( 445 + stack.last(), 446 + Some(Need::Node { 447 + depth: Depth::Depth(3), 448 + cid: cid1() 449 + }) 450 + .as_ref() 451 + ); 311 452 } 312 453 313 454 // #[test]
+29 -25
tests/non-huge-cars.rs
··· 1 1 extern crate repo_stream; 2 - use futures::TryStreamExt; 3 - use iroh_car::CarReader; 4 - use std::convert::Infallible; 2 + use repo_stream::drive::Processable; 3 + use serde::{Deserialize, Serialize}; 5 4 6 5 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 7 6 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); 8 7 const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car"); 9 8 10 - async fn test_car(bytes: &[u8], expected_records: usize, expected_sum: usize) { 11 - let reader = CarReader::new(bytes).await.unwrap(); 9 + #[derive(Clone, Serialize, Deserialize)] 10 + struct S(usize); 12 11 13 - let root = reader 14 - .header() 15 - .roots() 16 - .first() 17 - .ok_or("missing root") 18 - .unwrap() 19 - .clone(); 12 + impl Processable for S { 13 + fn get_size(&self) -> usize { 14 + 0 // no additional space taken, just its stack size (newtype is free) 15 + } 16 + } 20 17 21 - let stream = std::pin::pin!(reader.stream()); 18 + async fn test_car(bytes: &[u8], expected_records: usize, expected_sum: usize) { 19 + let mb = 2_usize.pow(20); 22 20 23 - let (_commit, v) = 24 - repo_stream::drive::Vehicle::init(root, stream, |block| Ok::<_, Infallible>(block.len())) 25 - .await 26 - .unwrap(); 27 - let mut record_stream = std::pin::pin!(v.stream()); 21 + let mut driver = match repo_stream::drive::load_car(bytes, |block| S(block.len()), 10 * mb) 22 + .await 23 + .unwrap() 24 + { 25 + repo_stream::drive::Vehicle::Lil(_commit, mem_driver) => mem_driver, 26 + repo_stream::drive::Vehicle::Big(_) => panic!("too big"), 27 + }; 28 28 29 29 let mut records = 0; 30 30 let mut sum = 0; 31 31 let mut found_bsky_profile = false; 32 32 let mut prev_rkey = "".to_string(); 33 - while let Some((rkey, size)) = record_stream.try_next().await.unwrap() { 34 - records += 1; 35 - sum += size; 36 - if rkey == "app.bsky.actor.profile/self" { 37 - found_bsky_profile = true; 33 + 34 + while let Some(pairs) = driver.next_chunk(256).await.unwrap() { 35 + for (rkey, S(size)) in pairs { 36 + records += 1; 37 + sum += size; 38 + if rkey == "app.bsky.actor.profile/self" { 39 + found_bsky_profile = true; 40 + } 41 + assert!(rkey > prev_rkey, "rkeys are streamed in order"); 42 + prev_rkey = rkey; 38 43 } 39 - assert!(rkey > prev_rkey, "rkeys are streamed in order"); 40 - prev_rkey = rkey; 41 44 } 45 + 42 46 assert_eq!(records, expected_records); 43 47 assert_eq!(sum, expected_sum); 44 48 assert!(found_bsky_profile);