Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

custom mst node deserialize

try to put the data into a more walk-friendly shape

oops it made things slower

benches:
huge: +0.36% (within noise margin)
midsize: +3.3%
little: +3.7%
tiny: +4.6%
empty: +6.4%

authored by

phil and committed by tangled.org cc77d06f 483f9d6c

+201 -86
+137 -12
src/mst.rs
··· 5 5 6 6 use cid::Cid; 7 7 use serde::Deserialize; 8 + use crate::walk::Depth; 8 9 9 10 /// The top-level data object in a repository's tree is a signed commit. 10 11 #[derive(Debug, Deserialize)] ··· 33 34 pub prev: Option<Cid>, 34 35 /// cryptographic signature of this commit, as raw bytes 35 36 #[serde(with = "serde_bytes")] 36 - pub sig: Vec<u8>, 37 + pub sig: serde_bytes::ByteBuf, 38 + } 39 + 40 + use serde::{de, de::{Deserializer, Visitor, MapAccess, SeqAccess}}; 41 + use std::fmt; 42 + 43 + pub(crate) enum NodeEntry { 44 + Value(Cid, Vec<u8>), // rkey 45 + Tree(Cid, u32), // depth 46 + } 47 + 48 + pub(crate) struct MstNode { 49 + pub left: Option<Cid>, // a tree but we don't know the depth 50 + pub entries: Vec<NodeEntry>, 51 + } 52 + 53 + pub(crate) struct Entries(pub(crate) Vec<NodeEntry>); 54 + 55 + impl<'de> Deserialize<'de> for Entries { 56 + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> 57 + where 58 + D: Deserializer<'de>, 59 + { 60 + struct EntriesVisitor; 61 + impl<'de> Visitor<'de> for EntriesVisitor { 62 + type Value = Entries; 63 + 64 + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { 65 + formatter.write_str("seq MstEntries") 66 + } 67 + 68 + fn visit_seq<S>(self, mut seq: S) -> Result<Self::Value, S::Error> 69 + where 70 + S: SeqAccess<'de>, 71 + { 72 + let mut children: Vec<NodeEntry> = Vec::with_capacity(seq.size_hint().unwrap_or(5)); 73 + let mut prefix: Vec<u8> = vec![]; 74 + while let Some(entry) = seq.next_element::<Entry>()? { 75 + let mut rkey: Vec<u8> = vec![]; 76 + let pre_checked = prefix 77 + .get(..entry.prefix_len) 78 + // .ok_or(MstError::EntryPrefixOutOfbounds)?; 79 + .ok_or_else(|| todo!()).unwrap(); 80 + 81 + rkey.extend_from_slice(pre_checked); 82 + rkey.extend_from_slice(&entry.keysuffix); 83 + let depth = Depth::compute(&rkey); 84 + 85 + prefix = rkey.clone(); 86 + 87 + children.push(NodeEntry::Value(entry.value, rkey)); 88 + 89 + if let Some(ref tree) = entry.tree { 90 + children.push(NodeEntry::Tree(*tree, depth)); 91 + } 92 + } 93 + Ok(Entries(children)) 94 + } 95 + } 96 + deserializer.deserialize_seq(EntriesVisitor) 97 + } 98 + } 99 + 100 + impl<'de> Deserialize<'de> for MstNode { 101 + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> 102 + where 103 + D: Deserializer<'de>, 104 + { 105 + struct NodeVisitor; 106 + impl<'de> Visitor<'de> for NodeVisitor { 107 + type Value = MstNode; 108 + 109 + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { 110 + formatter.write_str("struct MstNode") 111 + } 112 + 113 + fn visit_map<V>(self, mut map: V) -> Result<MstNode, V::Error> 114 + where 115 + V: MapAccess<'de>, 116 + { 117 + let mut found_left = false; 118 + let mut left = None; 119 + let mut found_entries = false; 120 + let mut entries = Vec::with_capacity(4); // "fanout of 4" so does this make sense???? 121 + 122 + while let Some(key) = map.next_key()? { 123 + match key { 124 + "l" => { 125 + if found_left { 126 + return Err(de::Error::duplicate_field("l")); 127 + } 128 + found_left = true; 129 + left = map.next_value()?; 130 + } 131 + "e" => { 132 + if found_entries { 133 + return Err(de::Error::duplicate_field("e")); 134 + } 135 + found_entries = true; 136 + let mut child_entries: Entries = map.next_value()?; 137 + entries.append(&mut child_entries.0); 138 + }, 139 + f => return Err(de::Error::unknown_field(f, NODE_FIELDS)) 140 + } 141 + } 142 + if !found_left { 143 + return Err(de::Error::missing_field("l")); 144 + } 145 + if !found_entries { 146 + return Err(de::Error::missing_field("e")); 147 + } 148 + Ok(MstNode { left, entries }) 149 + } 150 + } 151 + 152 + const NODE_FIELDS: &[&str] = &["l", "e"]; 153 + deserializer.deserialize_struct("MstNode", NODE_FIELDS, NodeVisitor) 154 + } 155 + } 156 + 157 + impl MstNode { 158 + pub(crate) fn is_empty(&self) -> bool { 159 + self.left.is_none() && self.entries.is_empty() 160 + } 37 161 } 38 162 39 163 /// MST node data schema ··· 62 186 /// so if a block *could be* a node, any record converter must postpone 63 187 /// processing. if it turns out it happens to be a very node-looking record, 64 188 /// well, sorry, it just has to only be processed later when that's known. 189 + #[inline(always)] 65 190 pub(crate) fn could_be(bytes: impl AsRef<[u8]>) -> bool { 66 191 const NODE_FINGERPRINT: [u8; 3] = [ 67 192 0xA2, // map length 2 (for "l" and "e" keys) ··· 77 202 .unwrap_or(false) 78 203 } 79 204 80 - /// Check if a node has any entries 81 - /// 82 - /// An empty repository with no records is represented as a single MST node 83 - /// with an empty array of entries. This is the only situation in which a 84 - /// tree may contain an empty leaf node which does not either contain keys 85 - /// ("entries") or point to a sub-tree containing entries. 86 - pub(crate) fn is_empty(&self) -> bool { 87 - self.left.is_none() && self.entries.is_empty() 88 - } 205 + // /// Check if a node has any entries 206 + // /// 207 + // /// An empty repository with no records is represented as a single MST node 208 + // /// with an empty array of entries. This is the only situation in which a 209 + // /// tree may contain an empty leaf node which does not either contain keys 210 + // /// ("entries") or point to a sub-tree containing entries. 211 + // pub(crate) fn is_empty(&self) -> bool { 212 + // self.left.is_none() && self.entries.is_empty() 213 + // } 89 214 } 90 215 91 216 /// TreeEntry object ··· 96 221 #[serde(rename = "p")] 97 222 pub prefix_len: usize, 98 223 /// remainder of key for this TreeEntry, after "prefixlen" have been removed 99 - #[serde(rename = "k", with = "serde_bytes")] 100 - pub keysuffix: Vec<u8>, // can we String this here? 224 + #[serde(rename = "k")] 225 + pub keysuffix: serde_bytes::ByteBuf, 101 226 /// link to the record data (CBOR) for this entry 102 227 #[serde(rename = "v")] 103 228 pub value: Cid,
+64 -74
src/walk.rs
··· 1 1 //! Depth-first MST traversal 2 2 3 + use crate::mst::NodeEntry; 4 + use crate::mst::MstNode; 3 5 use crate::Bytes; 4 6 use crate::HashMap; 5 7 use crate::disk::DiskStore; 6 8 use crate::drive::MaybeProcessedBlock; 7 - use crate::mst::Node; 8 9 use cid::Cid; 9 10 use sha2::{Digest, Sha256}; 10 11 use std::convert::Infallible; ··· 59 60 } 60 61 61 62 #[derive(Debug, Clone, Copy, PartialEq)] 62 - enum Depth { 63 + pub enum Depth { 63 64 Root, 64 65 Depth(u32), 65 66 } ··· 81 82 Self::Root => Ok(None), 82 83 Self::Depth(d) => d.checked_sub(1).ok_or(MstError::DepthUnderflow).map(Some), 83 84 } 85 + } 86 + pub fn compute(key: &[u8]) -> u32 { 87 + let Depth::Depth(d) = Self::from_key(key) else { 88 + panic!("errr"); 89 + }; 90 + d 84 91 } 85 92 } 86 93 87 - fn push_from_node(stack: &mut Vec<Need>, node: &Node, parent_depth: Depth) -> Result<(), MstError> { 94 + fn push_from_node(stack: &mut Vec<Need>, node: &MstNode, parent_depth: Depth) -> Result<(), MstError> { 88 95 // empty nodes are not allowed in the MST except in an empty MST 89 96 if node.is_empty() { 90 97 if parent_depth == Depth::Root { ··· 94 101 } 95 102 } 96 103 97 - let mut entries = Vec::with_capacity(node.entries.len()); 98 - let mut prefix = vec![]; 99 104 let mut this_depth = parent_depth.next_expected()?; 100 105 101 - for entry in &node.entries { 102 - let mut rkey = vec![]; 103 - let pre_checked = prefix 104 - .get(..entry.prefix_len) 105 - .ok_or(MstError::EntryPrefixOutOfbounds)?; 106 - rkey.extend_from_slice(pre_checked); 107 - rkey.extend_from_slice(&entry.keysuffix); 108 - 109 - let Depth::Depth(key_depth) = Depth::from_key(&rkey) else { 110 - return Err(MstError::WrongDepth); 111 - }; 112 - 113 - // this_depth is `none` if we are the deepest child (directly below root) 114 - // in that case we accept whatever highest depth is claimed 115 - let expected_depth = match this_depth { 116 - Some(d) => d, 117 - None => { 118 - this_depth = Some(key_depth); 119 - key_depth 106 + for entry in node.entries.iter().rev() { 107 + // ok this loop sucks now esp with depth checking 108 + // should keep the entries together with a shared depth on the rkey 109 + // ...maybe. skipping the absent trees is nice? 110 + match entry { 111 + NodeEntry::Value(cid, rkey) => { 112 + stack.push(Need::Record { 113 + rkey: String::from_utf8(rkey.to_vec())?, 114 + cid: *cid, 115 + }); 116 + } 117 + NodeEntry::Tree(cid, depth) => { 118 + if let Some(expected) = this_depth { 119 + if *depth != expected { 120 + return Err(MstError::WrongDepth); 121 + } 122 + } else { 123 + // this_depth is `none` if we are the deepest child (directly below root) 124 + // in that case we accept whatever highest depth is claimed 125 + this_depth = Some(*depth); 126 + } 127 + stack.push(Need::Node { 128 + depth: Depth::Depth(*depth), 129 + cid: *cid, 130 + }); 120 131 } 121 - }; 122 - 123 - // all keys we find should be this depth 124 - if key_depth != expected_depth { 125 - return Err(MstError::DepthUnderflow); 126 132 } 127 133 128 - prefix = rkey.clone(); 129 - 130 - entries.push(Need::Record { 131 - rkey: String::from_utf8(rkey)?, 132 - cid: entry.value, 133 - }); 134 - if let Some(ref tree) = entry.tree { 135 - entries.push(Need::Node { 136 - depth: Depth::Depth(key_depth), 137 - cid: *tree, 138 - }); 139 - } 140 134 } 141 135 142 - entries.reverse(); 143 - stack.append(&mut entries); 144 - 145 136 let d = this_depth.ok_or(MstError::LostDepth)?; 146 - 147 137 if let Some(tree) = node.left { 148 138 stack.push(Need::Node { 149 139 depth: Depth::Depth(d), ··· 195 185 let MaybeProcessedBlock::Raw(data) = block else { 196 186 return Err(WalkError::BadCommitFingerprint); 197 187 }; 198 - let node = serde_ipld_dagcbor::from_slice::<Node>(&data) 188 + let node = serde_ipld_dagcbor::from_slice::<crate::mst::MstNode>(&data) 199 189 .map_err(WalkError::BadCommit)?; 200 190 201 191 // found node, make sure we remember ··· 258 248 let MaybeProcessedBlock::Raw(data) = block else { 259 249 return Err(WalkError::BadCommitFingerprint); 260 250 }; 261 - let node = serde_ipld_dagcbor::from_slice::<Node>(&data) 251 + let node = serde_ipld_dagcbor::from_slice::<MstNode>(&data) 262 252 .map_err(WalkError::BadCommit)?; 263 253 264 254 // found node, make sure we remember ··· 370 360 } 371 361 } 372 362 373 - #[test] 374 - fn test_push_empty_fails() { 375 - let empty_node = Node { 376 - left: None, 377 - entries: vec![], 378 - }; 379 - let mut stack = vec![]; 380 - let err = push_from_node(&mut stack, &empty_node, Depth::Depth(4)); 381 - assert_eq!(err, Err(MstError::EmptyNode)); 382 - } 363 + // #[test] 364 + // fn test_push_empty_fails() { 365 + // let empty_node = Node { 366 + // left: None, 367 + // entries: vec![], 368 + // }; 369 + // let mut stack = vec![]; 370 + // let err = push_from_node(&mut stack, &empty_node, Depth::Depth(4)); 371 + // assert_eq!(err, Err(MstError::EmptyNode)); 372 + // } 383 373 384 - #[test] 385 - fn test_push_one_node() { 386 - let node = Node { 387 - left: Some(cid1()), 388 - entries: vec![], 389 - }; 390 - let mut stack = vec![]; 391 - push_from_node(&mut stack, &node, Depth::Depth(4)).unwrap(); 392 - assert_eq!( 393 - stack.last(), 394 - Some(Need::Node { 395 - depth: Depth::Depth(3), 396 - cid: cid1() 397 - }) 398 - .as_ref() 399 - ); 400 - } 374 + // #[test] 375 + // fn test_push_one_node() { 376 + // let node = Node { 377 + // left: Some(cid1()), 378 + // entries: vec![], 379 + // }; 380 + // let mut stack = vec![]; 381 + // push_from_node(&mut stack, &node, Depth::Depth(4)).unwrap(); 382 + // assert_eq!( 383 + // stack.last(), 384 + // Some(Need::Node { 385 + // depth: Depth::Depth(3), 386 + // cid: cid1() 387 + // }) 388 + // .as_ref() 389 + // ); 390 + // } 401 391 }