Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

well it kinda works again

authored by

phil and committed by tangled.org f11372c9 cc77d06f

+545 -340
+7
Cargo.lock
··· 705 705 checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" 706 706 707 707 [[package]] 708 + name = "hmac-sha256" 709 + version = "1.1.12" 710 + source = "registry+https://github.com/rust-lang/crates.io-index" 711 + checksum = "ad6880c8d4a9ebf39c6e8b77007ce223f646a4d21ce29d99f70cb16420545425" 712 + 713 + [[package]] 708 714 name = "interval-heap" 709 715 version = "0.0.5" 710 716 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1149 1155 "env_logger", 1150 1156 "fjall", 1151 1157 "hashbrown 0.16.1", 1158 + "hmac-sha256", 1152 1159 "iroh-car", 1153 1160 "log", 1154 1161 "mimalloc",
+5 -1
Cargo.toml
··· 15 15 serde = { version = "1.0.228", features = ["derive"] } 16 16 serde_bytes = "0.11.19" 17 17 serde_ipld_dagcbor = "0.6.4" 18 - sha2 = "0.10.9" 18 + sha2 = "0.10.9" # note: hmac-sha256 is simpler, smaller, benches ~15ns slower 19 19 thiserror = "2.0.17" 20 20 tokio = { version = "1.47.1", features = ["rt", "sync"] } 21 21 ··· 42 42 [[bench]] 43 43 name = "huge-car" 44 44 harness = false 45 + 46 + # [[bench]] 47 + # name = "leading" 48 + # harness = false
+1
benches/huge-car.rs
··· 26 26 match Driver::load_car(reader, |block| block.len().to_le_bytes().to_vec(), 1024) 27 27 .await 28 28 .unwrap() 29 + .unwrap() 29 30 { 30 31 Driver::Memory(_, mem_driver) => mem_driver, 31 32 Driver::Disk(_) => panic!("not doing disk for benchmark"),
+69
benches/leading.rs
··· 1 + use criterion::{Criterion, BatchSize, criterion_group, criterion_main}; 2 + use sha2::{Sha256, Digest}; 3 + use hmac_sha256::Hash; 4 + 5 + pub fn compute(bytes: [u8; 32]) -> u32 { 6 + let mut zeros = 0; 7 + for byte in bytes { 8 + if byte == 0 { 9 + zeros += 8 10 + } else { 11 + zeros += byte.leading_zeros(); 12 + break; 13 + } 14 + } 15 + zeros / 2 16 + } 17 + 18 + pub fn compute2(bytes: [u8; 32]) -> u32 { 19 + u128::from_be_bytes(bytes.split_at(16).0.try_into().unwrap()) 20 + .leading_zeros() / 2 21 + } 22 + 23 + fn from_key_old(key: &[u8]) -> u32 { 24 + compute2(Sha256::digest(key).into()) 25 + } 26 + 27 + fn from_key_new(key: &[u8]) -> u32 { 28 + compute2(Hash::hash(key).into()) 29 + } 30 + 31 + pub fn criterion_benchmark(c: &mut Criterion) { 32 + for (name, case) in [ 33 + ("no zeros", [0xFF; 32]), 34 + ("two zeros", [0x3F; 32]), 35 + ("some zeros", [0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]), 36 + ("many zeros", [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]), 37 + ] { 38 + let mut g = c.benchmark_group(name); 39 + g.bench_function("old", |b| { 40 + b.iter_batched( 41 + || case.clone(), 42 + |c| compute(c), 43 + BatchSize::SmallInput, 44 + ) 45 + }); 46 + g.bench_function("new", |b| { 47 + b.iter_batched( 48 + || case.clone(), 49 + |c| compute2(c), 50 + BatchSize::SmallInput, 51 + ) 52 + }); 53 + } 54 + 55 + for case in [ 56 + "a", 57 + "aa", 58 + "aaa", 59 + "aaaa", 60 + ] { 61 + let mut g = c.benchmark_group(case); 62 + g.bench_function("old", |b| b.iter(|| from_key_old(case.as_bytes()))); 63 + g.bench_function("new", |b| b.iter(|| from_key_new(case.as_bytes()))); 64 + } 65 + } 66 + 67 + 68 + criterion_group!(benches, criterion_benchmark); 69 + criterion_main!(benches);
+3 -2
benches/non-huge-cars.rs
··· 33 33 .await 34 34 .unwrap() 35 35 { 36 - Driver::Memory(_, mem_driver) => mem_driver, 37 - Driver::Disk(_) => panic!("not benching big cars here"), 36 + None => return 0, 37 + Some(Driver::Memory(_, mem_driver)) => mem_driver, 38 + Some(Driver::Disk(_)) => panic!("not benching big cars here"), 38 39 }; 39 40 40 41 let mut n = 0;
+7 -2
examples/disk-read-file/main.rs
··· 43 43 .load_car(reader) 44 44 .await? 45 45 { 46 - Driver::Memory(_, _) => panic!("try this on a bigger car"), 47 - Driver::Disk(big_stuff) => { 46 + None => panic!("empty mst! try a bigger car"), 47 + Some(Driver::Memory(_, _)) => panic!("try this on a bigger car"), 48 + Some(Driver::Disk(big_stuff)) => { 48 49 // we reach here if the repo was too big and needs to be spilled to 49 50 // disk to continue 50 51 ··· 61 62 // pop the driver back out to get some code indentation relief 62 63 driver 63 64 } 65 + }; 66 + 67 + let Some(driver) = driver else { 68 + panic!("big car but somehow empty MST: is the archive stuffed with garbage?"); 64 69 }; 65 70 66 71 // collect some random stats about the blocks
+56 -32
src/drive.rs
··· 1 1 //! Consume a CAR from an AsyncRead, producing an ordered stream of records 2 2 3 + use crate::walk::Output; 3 4 use crate::Bytes; 4 5 use crate::HashMap; 5 6 use crate::disk::{DiskError, DiskStore}; 6 - use crate::mst::Node; 7 + use crate::mst::{Node, MstNode}; 7 8 use cid::Cid; 8 9 use iroh_car::CarReader; 9 10 use std::convert::Infallible; 10 11 use tokio::{io::AsyncRead, sync::mpsc}; 11 12 12 13 use crate::mst::Commit; 13 - use crate::walk::{Step, WalkError, Walker}; 14 + use crate::walk::{WalkError, Walker}; 14 15 15 16 /// Errors that can happen while consuming and emitting blocks and records 16 17 #[derive(Debug, thiserror::Error)] ··· 157 158 } 158 159 } 159 160 /// Begin processing an atproto MST from a CAR file 160 - pub async fn load_car<R: AsyncRead + Unpin>(&self, reader: R) -> Result<Driver<R>, DriveError> { 161 + pub async fn load_car<R: AsyncRead + Unpin>( 162 + &self, 163 + reader: R, 164 + ) -> Result<Option<Driver<R>>, DriveError> { 161 165 Driver::load_car(reader, noop, self.mem_limit_mb).await 162 166 } 163 167 } ··· 180 184 self 181 185 } 182 186 /// Begin processing an atproto MST from a CAR file 183 - pub async fn load_car<R: AsyncRead + Unpin>(&self, reader: R) -> Result<Driver<R>, DriveError> { 187 + pub async fn load_car<R: AsyncRead + Unpin>( 188 + &self, 189 + reader: R, 190 + ) -> Result<Option<Driver<R>>, DriveError> { 184 191 Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await 185 192 } 186 193 } ··· 199 206 reader: R, 200 207 process: fn(Bytes) -> Bytes, 201 208 mem_limit_mb: usize, 202 - ) -> Result<Driver<R>, DriveError> { 209 + ) -> Result<Option<Driver<R>>, DriveError> { 203 210 let max_size = mem_limit_mb * 2_usize.pow(20); 204 211 let mut mem_blocks = HashMap::new(); 205 212 ··· 225 232 continue; 226 233 } 227 234 228 - let data = Bytes::from(data); 229 - 230 235 // remaining possible types: node, record, other. optimistically process 231 236 let maybe_processed = MaybeProcessedBlock::maybe(process, data); 232 237 ··· 234 239 mem_size += maybe_processed.len(); 235 240 mem_blocks.insert(cid, maybe_processed); 236 241 if mem_size >= max_size { 237 - return Ok(Driver::Disk(NeedDisk { 242 + return Ok(Some(Driver::Disk(NeedDisk { 238 243 car, 239 244 root, 240 245 process, 241 246 max_size, 242 247 mem_blocks, 243 248 commit, 244 - })); 249 + }))); 245 250 } 246 251 } 247 252 248 253 // all blocks loaded and we fit in memory! hopefully we found the commit... 249 254 let commit = commit.ok_or(DriveError::MissingCommit)?; 250 255 251 - let walker = Walker::new(commit.data); 256 + // the commit always must point to a Node; empty node => empty MST special case 257 + let node: MstNode = match mem_blocks.get(&commit.data).ok_or(DriveError::MissingCommit)? { 258 + MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?, 259 + MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(bytes)?, 260 + }; 261 + if node.is_empty() { 262 + // TODO: actually we still want the commit in this case 263 + return Ok(None); 264 + } 265 + let depth = node.depth.unwrap(); 266 + 267 + let walker = Walker::new(commit.data, depth); 252 268 253 - Ok(Driver::Memory( 269 + Ok(Some(Driver::Memory( 254 270 commit, 255 271 MemDriver { 256 272 blocks: mem_blocks, 257 273 walker, 258 274 process, 259 275 }, 260 - )) 276 + ))) 261 277 } 262 278 } 263 279 ··· 287 303 let mut out = Vec::with_capacity(n); 288 304 for _ in 0..n { 289 305 // walk as far as we can until we run out of blocks or find a record 290 - match self.walker.step(&mut self.blocks, self.process)? { 291 - Step::Finish => break, 292 - Step::Found { rkey, data } => { 293 - out.push((rkey, data)); 294 - continue; 295 - } 306 + let Some(Output { rkey, cid: _, data }) = self.walker.step(&mut self.blocks, self.process)? else { 307 + break; 296 308 }; 309 + out.push((rkey, data)); 297 310 } 298 - 299 311 if out.is_empty() { 300 312 Ok(None) 301 313 } else { ··· 318 330 pub async fn finish_loading( 319 331 mut self, 320 332 mut store: DiskStore, 321 - ) -> Result<(Commit, DiskDriver), DriveError> { 333 + ) -> Result<(Commit, Option<DiskDriver>), DriveError> { 322 334 // move store in and back out so we can manage lifetimes 323 335 // dump mem blocks into the store 324 336 store = tokio::task::spawn(async move { ··· 390 402 391 403 let commit = self.commit.ok_or(DriveError::MissingCommit)?; 392 404 393 - let walker = Walker::new(commit.data); 405 + // the commit always must point to a Node; empty node => empty MST special case 406 + let db_bytes = store 407 + .get(&commit.data.to_bytes()) 408 + .map_err(|e| DriveError::StorageError(DiskError::DbError(e)))? 409 + .ok_or(DriveError::MissingCommit)?; 410 + 411 + let node: MstNode = match MaybeProcessedBlock::from_bytes(db_bytes.to_vec()) { 412 + MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?, 413 + MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(&bytes)?, 414 + }; 415 + if node.is_empty() { 416 + return Ok((commit, None)); 417 + } 418 + let depth = node.depth.unwrap(); 419 + 420 + let walker = Walker::new(commit.data, depth); 394 421 395 422 Ok(( 396 423 commit, 397 - DiskDriver { 424 + Some(DiskDriver { 398 425 process: self.process, 399 426 state: Some(BigState { store, walker }), 400 - }, 427 + }), 401 428 )) 402 429 } 403 430 } ··· 459 486 return (state, Err(e.into())); 460 487 } 461 488 }; 462 - match step { 463 - Step::Finish => break, 464 - Step::Found { rkey, data } => out.push((rkey, data)), 489 + let Some(Output { rkey, cid: _, data }) = step else { 490 + break; 465 491 }; 492 + out.push((rkey, data)); 466 493 } 467 494 468 495 (state, Ok::<_, DriveError>(out)) ··· 499 526 Err(e) => return tx.blocking_send(Err(e.into())), 500 527 }; 501 528 502 - match step { 503 - Step::Finish => return Ok(()), 504 - Step::Found { rkey, data } => { 505 - out.push((rkey, data)); 506 - continue; 507 - } 529 + let Some(Output { rkey, cid: _, data }) = step else { 530 + break; 508 531 }; 532 + out.push((rkey, data)); 509 533 } 510 534 511 535 if out.is_empty() {
+74 -23
src/mst.rs
··· 3 3 //! The primary aim is to work through the **tree** structure. Non-node blocks 4 4 //! are left as raw bytes, for upper levels to parse into DAG-CBOR or whatever. 5 5 6 + use sha2::{Digest, Sha256}; 6 7 use cid::Cid; 7 8 use serde::Deserialize; 8 - use crate::walk::Depth; 9 9 10 10 /// The top-level data object in a repository's tree is a signed commit. 11 11 #[derive(Debug, Deserialize)] ··· 37 37 pub sig: serde_bytes::ByteBuf, 38 38 } 39 39 40 - use serde::{de, de::{Deserializer, Visitor, MapAccess, SeqAccess}}; 40 + use serde::de::{self, Deserializer, Visitor, MapAccess, SeqAccess, Unexpected}; 41 41 use std::fmt; 42 42 43 - pub(crate) enum NodeEntry { 44 - Value(Cid, Vec<u8>), // rkey 45 - Tree(Cid, u32), // depth 43 + pub type Depth = u32; 44 + 45 + #[inline(always)] 46 + pub fn atproto_mst_depth(key: &str) -> Depth { 47 + // 128 bits oughta be enough: https://bsky.app/profile/retr0.id/post/3jwwbf4izps24 48 + u128::from_be_bytes(Sha256::digest(key).split_at(16).0.try_into().unwrap()).leading_zeros() / 2 46 49 } 47 50 51 + #[derive(Debug)] 48 52 pub(crate) struct MstNode { 49 - pub left: Option<Cid>, // a tree but we don't know the depth 50 - pub entries: Vec<NodeEntry>, 53 + pub depth: Option<Depth>, // known for nodes with entries (required for root) 54 + pub things: Vec<NodeThing>, 55 + } 56 + 57 + #[derive(Debug)] 58 + pub(crate) struct NodeThing { 59 + pub(crate) cid: Cid, 60 + pub(crate) kind: ThingKind, 61 + } 62 + 63 + #[derive(Debug)] 64 + pub(crate) enum ThingKind { 65 + Tree, 66 + Value { rkey: String }, 51 67 } 52 68 53 - pub(crate) struct Entries(pub(crate) Vec<NodeEntry>); 69 + pub(crate) struct Entries(Vec<NodeThing>, Option<Depth>); 54 70 55 71 impl<'de> Deserialize<'de> for Entries { 56 72 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> ··· 69 85 where 70 86 S: SeqAccess<'de>, 71 87 { 72 - let mut children: Vec<NodeEntry> = Vec::with_capacity(seq.size_hint().unwrap_or(5)); 88 + let mut children: Vec<NodeThing> = Vec::with_capacity(seq.size_hint().unwrap_or(5)); 73 89 let mut prefix: Vec<u8> = vec![]; 90 + let mut depth = None; 74 91 while let Some(entry) = seq.next_element::<Entry>()? { 75 92 let mut rkey: Vec<u8> = vec![]; 76 93 let pre_checked = prefix 77 94 .get(..entry.prefix_len) 78 - // .ok_or(MstError::EntryPrefixOutOfbounds)?; 79 - .ok_or_else(|| todo!()).unwrap(); 95 + .ok_or_else(|| de::Error::invalid_value( 96 + Unexpected::Bytes(&prefix), 97 + &"a prefix at least as long as the prefix_len", 98 + ))?; 80 99 81 100 rkey.extend_from_slice(pre_checked); 82 101 rkey.extend_from_slice(&entry.keysuffix); 83 - let depth = Depth::compute(&rkey); 84 102 85 - prefix = rkey.clone(); 103 + let rkey_s = String::from_utf8(rkey.clone()) 104 + .map_err(|_| de::Error::invalid_value( 105 + Unexpected::Bytes(&rkey), 106 + &"a valid utf-8 rkey", 107 + ))?; 108 + 109 + let key_depth = atproto_mst_depth(&rkey_s); 110 + if depth.is_none() { 111 + depth = Some(key_depth); 112 + } else if Some(key_depth) != depth { 113 + return Err(de::Error::invalid_value( 114 + Unexpected::Bytes(&prefix), 115 + &"all rkeys to have equal MST depth", 116 + )); 117 + } 86 118 87 - children.push(NodeEntry::Value(entry.value, rkey)); 119 + children.push(NodeThing { 120 + cid: entry.value, 121 + kind: ThingKind::Value { rkey: rkey_s }, 122 + }); 88 123 89 - if let Some(ref tree) = entry.tree { 90 - children.push(NodeEntry::Tree(*tree, depth)); 124 + if let Some(cid) = entry.tree { 125 + children.push(NodeThing { 126 + cid, 127 + kind: ThingKind::Tree, 128 + }); 91 129 } 130 + 131 + prefix = rkey; 92 132 } 93 - Ok(Entries(children)) 133 + 134 + Ok(Entries(children, depth)) 94 135 } 95 136 } 96 137 deserializer.deserialize_seq(EntriesVisitor) ··· 117 158 let mut found_left = false; 118 159 let mut left = None; 119 160 let mut found_entries = false; 120 - let mut entries = Vec::with_capacity(4); // "fanout of 4" so does this make sense???? 161 + let mut things = Vec::with_capacity(4); // "fanout of 4" so does this make sense???? 162 + let mut depth = None; 121 163 122 164 while let Some(key) = map.next_key()? { 123 165 match key { ··· 126 168 return Err(de::Error::duplicate_field("l")); 127 169 } 128 170 found_left = true; 129 - left = map.next_value()?; 171 + if let Some(cid) = map.next_value()? { 172 + left = Some(NodeThing { cid, kind: ThingKind::Tree }); 173 + } 130 174 } 131 175 "e" => { 132 176 if found_entries { 133 177 return Err(de::Error::duplicate_field("e")); 134 178 } 135 179 found_entries = true; 136 - let mut child_entries: Entries = map.next_value()?; 137 - entries.append(&mut child_entries.0); 180 + let Entries(mut child_entries, d) = map.next_value()?; 181 + things.append(&mut child_entries); 182 + depth = d; 138 183 }, 139 184 f => return Err(de::Error::unknown_field(f, NODE_FIELDS)) 140 185 } ··· 145 190 if !found_entries { 146 191 return Err(de::Error::missing_field("e")); 147 192 } 148 - Ok(MstNode { left, entries }) 193 + 194 + things.reverse(); 195 + if let Some(l) = left { 196 + things.push(l); 197 + } 198 + 199 + Ok(MstNode { depth, things }) 149 200 } 150 201 } 151 202 ··· 156 207 157 208 impl MstNode { 158 209 pub(crate) fn is_empty(&self) -> bool { 159 - self.left.is_none() && self.entries.is_empty() 210 + self.things.is_empty() 160 211 } 161 212 } 162 213
+107 -280
src/walk.rs
··· 1 1 //! Depth-first MST traversal 2 2 3 - use crate::mst::NodeEntry; 3 + use crate::mst::NodeThing; 4 + use crate::mst::ThingKind; 4 5 use crate::mst::MstNode; 6 + use crate::mst::Depth; 5 7 use crate::Bytes; 6 8 use crate::HashMap; 7 9 use crate::disk::DiskStore; 8 10 use crate::drive::MaybeProcessedBlock; 9 11 use cid::Cid; 10 - use sha2::{Digest, Sha256}; 11 12 use std::convert::Infallible; 12 13 13 14 /// Errors that can happen while walking ··· 28 29 /// Errors from invalid Rkeys 29 30 #[derive(Debug, PartialEq, thiserror::Error)] 30 31 pub enum MstError { 31 - #[error("Failed to compute an rkey due to invalid prefix_len")] 32 - EntryPrefixOutOfbounds, 33 32 #[error("RKey was not utf-8")] 34 33 EntryRkeyNotUtf8(#[from] std::string::FromUtf8Error), 35 34 #[error("Nodes cannot be empty (except for an entirely empty MST)")] 36 35 EmptyNode, 37 - #[error("Found an entry with rkey at the wrong depth")] 38 - WrongDepth, 39 - #[error("Lost track of our depth (possible bug?)")] 40 - LostDepth, 36 + #[error("Expected node to be at depth {expected}, but it was at {depth}")] 37 + WrongDepth { depth: Depth, expected: Depth }, 41 38 #[error("MST depth underflow: depth-0 node with child trees")] 42 39 DepthUnderflow, 43 - #[error("Encountered an rkey out of order while walking the MST")] 44 - RkeyOutOfOrder, 40 + #[error("Encountered rkey {rkey:?} which cannot follow the previous: {prev:?}")] 41 + RkeyOutOfOrder { prev: String, rkey: String }, 45 42 } 46 43 47 44 /// Walker outputs 48 - #[derive(Debug)] 49 - pub enum Step { 50 - /// Reached the end of the MST! yay! 51 - Finish, 52 - /// A record was found! 53 - Found { rkey: String, data: Bytes }, 54 - } 55 - 56 - #[derive(Debug, Clone, PartialEq)] 57 - enum Need { 58 - Node { depth: Depth, cid: Cid }, 59 - Record { rkey: String, cid: Cid }, 60 - } 61 - 62 - #[derive(Debug, Clone, Copy, PartialEq)] 63 - pub enum Depth { 64 - Root, 65 - Depth(u32), 66 - } 67 - 68 - impl Depth { 69 - fn from_key(key: &[u8]) -> Self { 70 - let mut zeros = 0; 71 - for byte in Sha256::digest(key) { 72 - let leading = byte.leading_zeros(); 73 - zeros += leading; 74 - if leading < 8 { 75 - break; 76 - } 77 - } 78 - Self::Depth(zeros / 2) // truncating divide (rounds down) 79 - } 80 - fn next_expected(&self) -> Result<Option<u32>, MstError> { 81 - match self { 82 - Self::Root => Ok(None), 83 - Self::Depth(d) => d.checked_sub(1).ok_or(MstError::DepthUnderflow).map(Some), 84 - } 85 - } 86 - pub fn compute(key: &[u8]) -> u32 { 87 - let Depth::Depth(d) = Self::from_key(key) else { 88 - panic!("errr"); 89 - }; 90 - d 91 - } 92 - } 93 - 94 - fn push_from_node(stack: &mut Vec<Need>, node: &MstNode, parent_depth: Depth) -> Result<(), MstError> { 95 - // empty nodes are not allowed in the MST except in an empty MST 96 - if node.is_empty() { 97 - if parent_depth == Depth::Root { 98 - return Ok(()); // empty mst, nothing to push 99 - } else { 100 - return Err(MstError::EmptyNode); 101 - } 102 - } 103 - 104 - let mut this_depth = parent_depth.next_expected()?; 105 - 106 - for entry in node.entries.iter().rev() { 107 - // ok this loop sucks now esp with depth checking 108 - // should keep the entries together with a shared depth on the rkey 109 - // ...maybe. skipping the absent trees is nice? 110 - match entry { 111 - NodeEntry::Value(cid, rkey) => { 112 - stack.push(Need::Record { 113 - rkey: String::from_utf8(rkey.to_vec())?, 114 - cid: *cid, 115 - }); 116 - } 117 - NodeEntry::Tree(cid, depth) => { 118 - if let Some(expected) = this_depth { 119 - if *depth != expected { 120 - return Err(MstError::WrongDepth); 121 - } 122 - } else { 123 - // this_depth is `none` if we are the deepest child (directly below root) 124 - // in that case we accept whatever highest depth is claimed 125 - this_depth = Some(*depth); 126 - } 127 - stack.push(Need::Node { 128 - depth: Depth::Depth(*depth), 129 - cid: *cid, 130 - }); 131 - } 132 - } 133 - 134 - } 135 - 136 - let d = this_depth.ok_or(MstError::LostDepth)?; 137 - if let Some(tree) = node.left { 138 - stack.push(Need::Node { 139 - depth: Depth::Depth(d), 140 - cid: tree, 141 - }); 142 - } 143 - Ok(()) 45 + #[derive(Debug, PartialEq)] 46 + pub struct Output { 47 + pub rkey: String, 48 + pub cid: Cid, 49 + pub data: Bytes, 144 50 } 145 51 146 52 /// Traverser of an atproto MST ··· 148 54 /// Walks the tree from left-to-right in depth-first order 149 55 #[derive(Debug)] 150 56 pub struct Walker { 151 - stack: Vec<Need>, 152 - prev: String, 57 + prev_rkey: String, 58 + todo: Vec<(Depth, NodeThing)>, 153 59 } 154 60 155 61 impl Walker { 156 - pub fn new(tree_root_cid: Cid) -> Self { 62 + pub fn new( 63 + root_cid: Cid, 64 + depth: Depth, 65 + ) -> Self { 157 66 Self { 158 - stack: vec![Need::Node { 159 - depth: Depth::Root, 160 - cid: tree_root_cid, 161 - }], 162 - prev: "".to_string(), 67 + prev_rkey: "".to_string(), 68 + todo: vec![( 69 + depth + 1, // we're kind of inventing a fake root one above the real root 70 + // ... maybe we should just pass in the real root here??? 71 + NodeThing { 72 + cid: root_cid, 73 + kind: ThingKind::Tree, 74 + }, 75 + )], 163 76 } 164 77 } 165 78 166 - /// Advance through nodes until we find a record or can't go further 167 - pub fn step( 79 + fn mpb_step( 168 80 &mut self, 169 - blocks: &mut HashMap<Cid, MaybeProcessedBlock>, 81 + depth: Depth, 82 + kind: ThingKind, 83 + cid: Cid, 84 + mpb: &MaybeProcessedBlock, 170 85 process: impl Fn(Bytes) -> Bytes, 171 - ) -> Result<Step, WalkError> { 172 - loop { 173 - let Some(need) = self.stack.last_mut() else { 174 - log::trace!("tried to walk but we're actually done."); 175 - return Ok(Step::Finish); 176 - }; 86 + ) -> Result<Option<Output>, WalkError> { 87 + match kind { 88 + ThingKind::Value { rkey } => { 89 + let data = match mpb { 90 + MaybeProcessedBlock::Raw(data) => process(data.clone()), 91 + MaybeProcessedBlock::Processed(t) => t.clone(), 92 + }; 177 93 178 - match need { 179 - &mut Need::Node { depth, cid } => { 180 - log::trace!("need node {cid:?}"); 181 - let Some(block) = blocks.remove(&cid) else { 182 - return Err(WalkError::MissingBlock(cid)); 183 - }; 94 + if rkey <= self.prev_rkey { 95 + return Err(WalkError::MstError(MstError::RkeyOutOfOrder { 96 + rkey, 97 + prev: self.prev_rkey.clone(), 98 + })); 99 + } 100 + self.prev_rkey = rkey.clone(); 184 101 185 - let MaybeProcessedBlock::Raw(data) = block else { 186 - return Err(WalkError::BadCommitFingerprint); 187 - }; 188 - let node = serde_ipld_dagcbor::from_slice::<crate::mst::MstNode>(&data) 189 - .map_err(WalkError::BadCommit)?; 102 + Ok(Some(Output { 103 + rkey, 104 + cid, 105 + data, 106 + })) 107 + } 108 + ThingKind::Tree => { 109 + let MaybeProcessedBlock::Raw(data) = mpb else { 110 + return Err(WalkError::BadCommitFingerprint); 111 + }; 190 112 191 - // found node, make sure we remember 192 - self.stack.pop(); 113 + let node: MstNode = serde_ipld_dagcbor::from_slice(&data) 114 + .map_err(WalkError::BadCommit)?; 193 115 194 - // queue up work on the found node next 195 - push_from_node(&mut self.stack, &node, depth)?; 116 + if node.is_empty() { 117 + return Err(WalkError::MstError(MstError::EmptyNode)); 196 118 } 197 - Need::Record { rkey, cid } => { 198 - log::trace!("need record {cid:?}"); 199 - // note that we cannot *remove* a record block, sadly, since 200 - // there can be multiple rkeys pointing to the same cid. 201 - let Some(data) = blocks.get(cid) else { 202 - return Err(WalkError::MissingBlock(*cid)); 203 - }; 204 - let rkey = rkey.clone(); 205 - let data = match data { 206 - MaybeProcessedBlock::Raw(data) => process(data.clone()), 207 - MaybeProcessedBlock::Processed(t) => t.clone(), 208 - }; 209 119 210 - // found node, make sure we remember 211 - self.stack.pop(); 212 - 213 - // rkeys *must* be in order or else the tree is invalid (or 214 - // we have a bug) 215 - if rkey <= self.prev { 216 - return Err(MstError::RkeyOutOfOrder)?; 120 + let next_depth = depth.checked_sub(1).ok_or(MstError::DepthUnderflow)?; 121 + if let Some(d) = node.depth { 122 + if d != next_depth { 123 + return Err(WalkError::MstError(MstError::WrongDepth { 124 + depth: d, 125 + expected: next_depth, 126 + })); 217 127 } 218 - self.prev = rkey.clone(); 128 + } 219 129 220 - return Ok(Step::Found { rkey, data }); 130 + for thing in node.things { 131 + self.todo.push((next_depth, thing)); 221 132 } 133 + 134 + Ok(None) 222 135 } 223 136 } 224 137 } 225 138 226 - /// blocking!!!!!! 227 - pub fn disk_step( 139 + /// Advance through nodes until we find a record or can't go further 140 + pub fn step( 228 141 &mut self, 229 - reader: &mut DiskStore, 142 + blocks: &mut HashMap<Cid, MaybeProcessedBlock>, 230 143 process: impl Fn(Bytes) -> Bytes, 231 - ) -> Result<Step, WalkError> { 232 - loop { 233 - let Some(need) = self.stack.last_mut() else { 234 - log::trace!("tried to walk but we're actually done."); 235 - return Ok(Step::Finish); 144 + ) -> Result<Option<Output>, WalkError> { 145 + 146 + while let Some((depth, NodeThing { cid, kind })) = self.todo.pop() { 147 + let Some(mpb) = blocks.get(&cid) else { 148 + return Err(WalkError::MissingBlock(cid)); 236 149 }; 237 - 238 - match need { 239 - &mut Need::Node { depth, cid } => { 240 - let cid_bytes = cid.to_bytes(); 241 - log::trace!("need node {cid:?}"); 242 - let Some(block_slice) = reader.get(&cid_bytes)? else { 243 - return Err(WalkError::MissingBlock(cid)); 244 - }; 245 - 246 - let block = MaybeProcessedBlock::from_bytes(block_slice.to_vec()); 150 + if let Some(out) = self.mpb_step(depth, kind, cid, mpb, &process)? { 151 + return Ok(Some(out)); 152 + } 153 + } 247 154 248 - let MaybeProcessedBlock::Raw(data) = block else { 249 - return Err(WalkError::BadCommitFingerprint); 250 - }; 251 - let node = serde_ipld_dagcbor::from_slice::<MstNode>(&data) 252 - .map_err(WalkError::BadCommit)?; 155 + log::trace!("tried to walk but we're actually done."); 156 + Ok(None) 157 + } 253 158 254 - // found node, make sure we remember 255 - self.stack.pop(); 159 + /// blocking!!!!!! 160 + pub fn disk_step( 161 + &mut self, 162 + blocks: &mut DiskStore, 163 + process: impl Fn(Bytes) -> Bytes, 164 + ) -> Result<Option<Output>, WalkError> { 256 165 257 - // queue up work on the found node next 258 - push_from_node(&mut self.stack, &node, depth).map_err(WalkError::MstError)?; 259 - } 260 - Need::Record { rkey, cid } => { 261 - log::trace!("need record {cid:?}"); 262 - let cid_bytes = cid.to_bytes(); 263 - let Some(data_slice) = reader.get(&cid_bytes)? else { 264 - return Err(WalkError::MissingBlock(*cid)); 265 - }; 266 - let data = MaybeProcessedBlock::from_bytes(data_slice.to_vec()); 267 - let rkey = rkey.clone(); 268 - let data = match data { 269 - MaybeProcessedBlock::Raw(data) => process(data), 270 - MaybeProcessedBlock::Processed(t) => t, 271 - }; 272 - 273 - // found node, make sure we remember 274 - self.stack.pop(); 275 - 276 - log::trace!("emitting a block as a step. depth={}", self.stack.len()); 277 - 278 - // rkeys *must* be in order or else the tree is invalid (or 279 - // we have a bug) 280 - if rkey <= self.prev { 281 - return Err(MstError::RkeyOutOfOrder)?; 282 - } 283 - self.prev = rkey.clone(); 284 - 285 - return Ok(Step::Found { rkey, data }); 286 - } 166 + while let Some((depth, NodeThing { cid, kind })) = self.todo.pop() { 167 + let Some(block_slice) = blocks.get(&cid.to_bytes())? else { 168 + return Err(WalkError::MissingBlock(cid)); 169 + }; 170 + let mpb = MaybeProcessedBlock::from_bytes(block_slice.to_vec()); 171 + if let Some(out) = self.mpb_step(depth, kind, cid, &mpb, &process)? { 172 + return Ok(Some(out)); 287 173 } 288 174 } 175 + log::trace!("tried to walk but we're actually done."); 176 + Ok(None) 289 177 } 290 178 } 291 179 ··· 293 181 mod test { 294 182 use super::*; 295 183 296 - fn cid1() -> Cid { 297 - "bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m" 298 - .parse() 299 - .unwrap() 300 - } 301 - 302 - #[test] 303 - fn test_depth_spec_0() { 304 - let d = Depth::from_key(b"2653ae71"); 305 - assert_eq!(d, Depth::Depth(0)) 306 - } 307 - 308 - #[test] 309 - fn test_depth_spec_1() { 310 - let d = Depth::from_key(b"blue"); 311 - assert_eq!(d, Depth::Depth(1)) 312 - } 313 - 314 - #[test] 315 - fn test_depth_spec_4() { 316 - let d = Depth::from_key(b"app.bsky.feed.post/454397e440ec"); 317 - assert_eq!(d, Depth::Depth(4)) 318 - } 319 - 320 - #[test] 321 - fn test_depth_spec_8() { 322 - let d = Depth::from_key(b"app.bsky.feed.post/9adeb165882c"); 323 - assert_eq!(d, Depth::Depth(8)) 324 - } 325 - 326 - #[test] 327 - fn test_depth_ietf_draft_0() { 328 - let d = Depth::from_key(b"key1"); 329 - assert_eq!(d, Depth::Depth(0)) 330 - } 331 - 332 - #[test] 333 - fn test_depth_ietf_draft_1() { 334 - let d = Depth::from_key(b"key7"); 335 - assert_eq!(d, Depth::Depth(1)) 336 - } 337 - 338 - #[test] 339 - fn test_depth_ietf_draft_4() { 340 - let d = Depth::from_key(b"key515"); 341 - assert_eq!(d, Depth::Depth(4)) 342 - } 343 - 344 - #[test] 345 - fn test_depth_interop() { 346 - // examples from https://github.com/bluesky-social/atproto-interop-tests/blob/main/mst/key_heights.json 347 - for (k, expected) in [ 348 - ("", 0), 349 - ("asdf", 0), 350 - ("blue", 1), 351 - ("2653ae71", 0), 352 - ("88bfafc7", 2), 353 - ("2a92d355", 4), 354 - ("884976f5", 6), 355 - ("app.bsky.feed.post/454397e440ec", 4), 356 - ("app.bsky.feed.post/9adeb165882c", 8), 357 - ] { 358 - let d = Depth::from_key(k.as_bytes()); 359 - assert_eq!(d, Depth::Depth(expected), "key: {}", k); 360 - } 361 - } 184 + // fn cid1() -> Cid { 185 + // "bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m" 186 + // .parse() 187 + // .unwrap() 188 + // } 362 189 363 190 // #[test] 364 191 // fn test_push_empty_fails() {
+216
tests/mst-depth.rs
··· 1 + // use repo_stream::Driver; 2 + use repo_stream::mst::atproto_mst_depth; 3 + 4 + // https://github.com/bluesky-social/atproto-interop-tests/blob/main/mst/example_keys.txt 5 + const INTEROP_EXAMPLE_KEYS: &str = "\ 6 + A0/374913 7 + A1/076595 8 + A2/827942 9 + A3/578971 10 + A4/055903 11 + A5/518415 12 + B0/601692 13 + B1/986427 14 + B2/827649 15 + B3/095483 16 + B4/774183 17 + B5/116729 18 + C0/451630 19 + C1/438573 20 + C2/014073 21 + C3/564755 22 + C4/134079 23 + C5/141153 24 + D0/952776 25 + D1/834852 26 + D2/269196 27 + D3/038750 28 + D4/052059 29 + D5/563177 30 + E0/670489 31 + E1/091396 32 + E2/819540 33 + E3/391311 34 + E4/820614 35 + E5/512478 36 + F0/697858 37 + F1/085263 38 + F2/483591 39 + F3/409933 40 + F4/789697 41 + F5/271416 42 + G0/765327 43 + G1/209912 44 + G2/611528 45 + G3/649394 46 + G4/585887 47 + G5/298495 48 + H0/131238 49 + H1/566929 50 + H2/618272 51 + H3/500151 52 + H4/841548 53 + H5/642354 54 + I0/536928 55 + I1/525517 56 + I2/800680 57 + I3/818503 58 + I4/561177 59 + I5/010047 60 + J0/453243 61 + J1/217783 62 + J2/960389 63 + J3/501274 64 + J4/042054 65 + J5/743154 66 + K0/125271 67 + K1/317361 68 + K2/453868 69 + K3/214010 70 + K4/164720 71 + K5/177856 72 + L0/502889 73 + L1/574576 74 + L2/596333 75 + L3/683657 76 + L4/724989 77 + L5/093883 78 + M0/141744 79 + M1/643368 80 + M2/919782 81 + M3/836327 82 + M4/177463 83 + M5/563354 84 + N0/370604 85 + N1/563732 86 + N2/177587 87 + N3/678428 88 + N4/599183 89 + N5/567564 90 + O0/523870 91 + O1/052141 92 + O2/037651 93 + O3/773808 94 + O4/140952 95 + O5/318605 96 + P0/133157 97 + P1/394633 98 + P2/521462 99 + P3/493488 100 + P4/908754 101 + P5/109455 102 + Q0/835234 103 + Q1/131542 104 + Q2/680035 105 + Q3/253381 106 + Q4/019053 107 + Q5/658167 108 + R0/129386 109 + R1/363149 110 + R2/742766 111 + R3/039235 112 + R4/482275 113 + R5/817312 114 + S0/340283 115 + S1/561525 116 + S2/914574 117 + S3/909434 118 + S4/789708 119 + S5/803866 120 + T0/255204 121 + T1/716687 122 + T2/256231 123 + T3/054247 124 + T4/419247 125 + T5/509584 126 + U0/298296 127 + U1/851680 128 + U2/342856 129 + U3/597327 130 + U4/311686 131 + U5/030156 132 + V0/221100 133 + V1/741554 134 + V2/267990 135 + V3/674163 136 + V4/739931 137 + V5/573718 138 + W0/034202 139 + W1/697411 140 + W2/460313 141 + W3/189647 142 + W4/847299 143 + W5/648086 144 + X0/287498 145 + X1/044093 146 + X2/613770 147 + X3/577587 148 + X4/779391 149 + X5/339246 150 + Y0/986350 151 + Y1/044567 152 + Y2/478044 153 + Y3/757097 154 + Y4/396913 155 + Y5/802264 156 + Z0/425878 157 + Z1/127557 158 + Z2/441927 159 + Z3/064474 160 + Z4/888344 161 + Z5/977983"; 162 + 163 + #[test] 164 + fn test_interop_example_keys() { 165 + for key in INTEROP_EXAMPLE_KEYS.split('\n') { 166 + let expected: u32 = key.chars().nth(1).unwrap().to_digit(16).unwrap(); 167 + let computed: u32 = atproto_mst_depth(key); 168 + assert_eq!(computed, expected); 169 + } 170 + } 171 + 172 + #[test] 173 + fn test_iterop_key_heights() { 174 + // examples from https://github.com/bluesky-social/atproto-interop-tests/blob/main/mst/key_heights.json 175 + for (key, expected) in [ 176 + ("", 0), 177 + ("asdf", 0), 178 + ("blue", 1), 179 + ("2653ae71", 0), 180 + ("88bfafc7", 2), 181 + ("2a92d355", 4), 182 + ("884976f5", 6), 183 + ("app.bsky.feed.post/454397e440ec", 4), 184 + ("app.bsky.feed.post/9adeb165882c", 8), 185 + ] { 186 + let computed = atproto_mst_depth(key); 187 + assert_eq!(computed, expected); 188 + } 189 + } 190 + 191 + #[test] 192 + fn test_spec_example_keys() { 193 + // https://atproto.com/specs/repository#mst-structure 194 + for (key, expected) in [ 195 + ("2653ae71", 0), 196 + ("blue", 1), 197 + ("app.bsky.feed.post/454397e440ec", 4), 198 + ("app.bsky.feed.post/9adeb165882c", 8), 199 + ] { 200 + let computed = atproto_mst_depth(key); 201 + assert_eq!(computed, expected); 202 + } 203 + } 204 + 205 + #[test] 206 + fn test_ietf_example_keys() { 207 + // https://atproto.com/specs/repository#mst-structure 208 + for (key, expected) in [ 209 + ("key1", 0), 210 + ("key7", 1), 211 + ("key515", 4), 212 + ] { 213 + let computed = atproto_mst_depth(key); 214 + assert_eq!(computed, expected); 215 + } 216 + }