Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

remove old code

authored by

phil and committed by tangled.org c29117a6 843447f4

+62 -161
+1 -1
readme.md
··· 75 75 - 5.0MiB: `6.8ms` 76 76 - 279KiB: `170us` 77 77 - 3.4KiB: `5.2us` 78 - - empty: `710ns` 78 + - empty: `670ns` 79 79 80 80 it's a little faster with `mimalloc` 81 81
+2 -2
src/drive.rs
··· 4 4 use crate::Bytes; 5 5 use crate::HashMap; 6 6 use crate::disk::{DiskError, DiskStore}; 7 - use crate::mst::{Node, MstNode}; 7 + use crate::mst::MstNode; 8 8 use cid::Cid; 9 9 use iroh_car::CarReader; 10 10 use std::convert::Infallible; ··· 65 65 66 66 impl MaybeProcessedBlock { 67 67 pub(crate) fn maybe(process: fn(Bytes) -> Bytes, data: Bytes) -> Self { 68 - if Node::could_be(&data) { 68 + if MstNode::could_be(&data) { 69 69 MaybeProcessedBlock::Raw(data) 70 70 } else { 71 71 MaybeProcessedBlock::Processed(process(data))
+47 -105
src/mst.rs
··· 37 37 pub sig: serde_bytes::ByteBuf, 38 38 } 39 39 40 - use serde::de::{self, Deserializer, Visitor, MapAccess, SeqAccess, Unexpected}; 40 + use serde::de::{self, Deserializer, Visitor, MapAccess, Unexpected}; 41 41 use std::fmt; 42 42 43 43 pub type Depth = u32; ··· 66 66 Value { rkey: String }, 67 67 } 68 68 69 - pub(crate) struct Entries(Vec<NodeThing>, Option<Depth>); 70 - 71 - impl<'de> Deserialize<'de> for Entries { 72 - fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> 73 - where 74 - D: Deserializer<'de>, 75 - { 76 - struct EntriesVisitor; 77 - impl<'de> Visitor<'de> for EntriesVisitor { 78 - type Value = Entries; 79 - 80 - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { 81 - formatter.write_str("seq MstEntries") 82 - } 83 - 84 - fn visit_seq<S>(self, mut seq: S) -> Result<Self::Value, S::Error> 85 - where 86 - S: SeqAccess<'de>, 87 - { 88 - let mut children: Vec<NodeThing> = Vec::with_capacity(seq.size_hint().unwrap_or(5)); 89 - let mut prefix: Vec<u8> = vec![]; 90 - let mut depth = None; 91 - while let Some(entry) = seq.next_element::<Entry>()? { 92 - let mut rkey: Vec<u8> = vec![]; 93 - let pre_checked = prefix 94 - .get(..entry.prefix_len) 95 - .ok_or_else(|| de::Error::invalid_value( 96 - Unexpected::Bytes(&prefix), 97 - &"a prefix at least as long as the prefix_len", 98 - ))?; 99 - 100 - rkey.extend_from_slice(pre_checked); 101 - rkey.extend_from_slice(&entry.keysuffix); 102 - 103 - let rkey_s = String::from_utf8(rkey.clone()) 104 - .map_err(|_| de::Error::invalid_value( 105 - Unexpected::Bytes(&rkey), 106 - &"a valid utf-8 rkey", 107 - ))?; 108 - 109 - let key_depth = atproto_mst_depth(&rkey_s); 110 - if depth.is_none() { 111 - depth = Some(key_depth); 112 - } else if Some(key_depth) != depth { 113 - return Err(de::Error::invalid_value( 114 - Unexpected::Bytes(&prefix), 115 - &"all rkeys to have equal MST depth", 116 - )); 117 - } 118 - 119 - children.push(NodeThing { 120 - cid: entry.value, 121 - kind: ThingKind::Value { rkey: rkey_s }, 122 - }); 123 - 124 - if let Some(cid) = entry.tree { 125 - children.push(NodeThing { 126 - cid, 127 - kind: ThingKind::Tree, 128 - }); 129 - } 130 - 131 - prefix = rkey; 132 - } 133 - 134 - Ok(Entries(children, depth)) 135 - } 136 - } 137 - deserializer.deserialize_seq(EntriesVisitor) 138 - } 139 - } 140 - 141 69 impl<'de> Deserialize<'de> for MstNode { 142 70 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> 143 71 where ··· 158 86 let mut found_left = false; 159 87 let mut left = None; 160 88 let mut found_entries = false; 161 - let mut things = Vec::with_capacity(4); // "fanout of 4" so does this make sense???? 89 + let mut things = Vec::new(); 162 90 let mut depth = None; 163 91 164 92 while let Some(key) = map.next_key()? { ··· 177 105 return Err(de::Error::duplicate_field("e")); 178 106 } 179 107 found_entries = true; 180 - let Entries(mut child_entries, d) = map.next_value()?; 181 - things.append(&mut child_entries); 182 - depth = d; 108 + 109 + let mut prefix: Vec<u8> = vec![]; 110 + 111 + for entry in map.next_value::<Vec<Entry>>()? { 112 + let mut rkey: Vec<u8> = vec![]; 113 + let pre_checked = prefix 114 + .get(..entry.prefix_len) 115 + .ok_or_else(|| de::Error::invalid_value( 116 + Unexpected::Bytes(&prefix), 117 + &"a prefix at least as long as the prefix_len", 118 + ))?; 119 + 120 + rkey.extend_from_slice(pre_checked); 121 + rkey.extend_from_slice(&entry.keysuffix); 122 + 123 + let rkey_s = String::from_utf8(rkey.clone()) 124 + .map_err(|_| de::Error::invalid_value( 125 + Unexpected::Bytes(&rkey), 126 + &"a valid utf-8 rkey", 127 + ))?; 128 + 129 + let key_depth = atproto_mst_depth(&rkey_s); 130 + if depth.is_none() { 131 + depth = Some(key_depth); 132 + } else if Some(key_depth) != depth { 133 + return Err(de::Error::invalid_value( 134 + Unexpected::Bytes(&prefix), 135 + &"all rkeys to have equal MST depth", 136 + )); 137 + } 138 + 139 + things.push(NodeThing { 140 + cid: entry.value, 141 + kind: ThingKind::Value { rkey: rkey_s }, 142 + }); 143 + 144 + if let Some(cid) = entry.tree { 145 + things.push(NodeThing { 146 + cid, 147 + kind: ThingKind::Tree, 148 + }); 149 + } 150 + 151 + prefix = rkey; 152 + } 183 153 }, 184 154 f => return Err(de::Error::unknown_field(f, NODE_FIELDS)) 185 155 } ··· 209 179 pub(crate) fn is_empty(&self) -> bool { 210 180 self.things.is_empty() 211 181 } 212 - } 213 - 214 - /// MST node data schema 215 - #[derive(Debug, Deserialize, PartialEq)] 216 - #[serde(deny_unknown_fields)] 217 - pub(crate) struct Node { 218 - /// link to sub-tree Node on a lower level and with all keys sorting before 219 - /// keys at this node 220 - #[serde(rename = "l")] 221 - pub left: Option<Cid>, 222 - /// ordered list of TreeEntry objects 223 - /// 224 - /// atproto MSTs have a fanout of 4, so there can be max 4 entries. 225 - #[serde(rename = "e")] 226 - pub entries: Vec<Entry>, // maybe we can do [Option<Entry>; 4]? 227 - } 228 - 229 - impl Node { 230 182 /// test if a block could possibly be a node 231 183 /// 232 184 /// we can't eagerly decode records except where we're *sure* they cannot be ··· 252 204 .map(|b| b & 0b1110_0000 == 0x80) 253 205 .unwrap_or(false) 254 206 } 255 - 256 - // /// Check if a node has any entries 257 - // /// 258 - // /// An empty repository with no records is represented as a single MST node 259 - // /// with an empty array of entries. This is the only situation in which a 260 - // /// tree may contain an empty leaf node which does not either contain keys 261 - // /// ("entries") or point to a sub-tree containing entries. 262 - // pub(crate) fn is_empty(&self) -> bool { 263 - // self.left.is_none() && self.entries.is_empty() 264 - // } 265 207 } 266 208 267 209 /// TreeEntry object
+12 -53
src/walk.rs
··· 70 70 }) 71 71 } 72 72 73 - fn next_todo(&mut self) -> Option<NodeThing> { 74 - while let Some(last) = self.todo.last_mut() { 75 - let Some(thing) = last.pop() else { 76 - self.todo.pop(); 77 - continue; 78 - }; 79 - return Some(thing); 80 - } 81 - None 82 - } 83 - 84 73 fn mpb_step( 85 74 &mut self, 86 75 kind: ThingKind, ··· 140 129 } 141 130 } 142 131 132 + #[inline(always)] 133 + fn next_todo(&mut self) -> Option<NodeThing> { 134 + while let Some(last) = self.todo.last_mut() { 135 + let Some(thing) = last.pop() else { 136 + self.todo.pop(); 137 + continue; 138 + }; 139 + return Some(thing); 140 + } 141 + None 142 + } 143 + 143 144 /// Advance through nodes until we find a record or can't go further 144 145 pub fn step( 145 146 &mut self, 146 147 blocks: &mut HashMap<Cid, MaybeProcessedBlock>, 147 148 process: impl Fn(Bytes) -> Bytes, 148 149 ) -> Result<Option<Output>, WalkError> { 149 - 150 150 while let Some(NodeThing { cid, kind }) = self.next_todo() { 151 151 let Some(mpb) = blocks.get(&cid) else { 152 152 return Err(WalkError::MissingBlock(cid)); 153 153 }; 154 - 155 154 if let Some(out) = self.mpb_step(kind, cid, mpb, &process)? { 156 155 return Ok(Some(out)); 157 156 } ··· 177 176 Ok(None) 178 177 } 179 178 } 180 - 181 - #[cfg(test)] 182 - mod test { 183 - use super::*; 184 - 185 - // fn cid1() -> Cid { 186 - // "bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m" 187 - // .parse() 188 - // .unwrap() 189 - // } 190 - 191 - // #[test] 192 - // fn test_push_empty_fails() { 193 - // let empty_node = Node { 194 - // left: None, 195 - // entries: vec![], 196 - // }; 197 - // let mut stack = vec![]; 198 - // let err = push_from_node(&mut stack, &empty_node, Depth::Depth(4)); 199 - // assert_eq!(err, Err(MstError::EmptyNode)); 200 - // } 201 - 202 - // #[test] 203 - // fn test_push_one_node() { 204 - // let node = Node { 205 - // left: Some(cid1()), 206 - // entries: vec![], 207 - // }; 208 - // let mut stack = vec![]; 209 - // push_from_node(&mut stack, &node, Depth::Depth(4)).unwrap(); 210 - // assert_eq!( 211 - // stack.last(), 212 - // Some(Need::Node { 213 - // depth: Depth::Depth(3), 214 - // cid: cid1() 215 - // }) 216 - // .as_ref() 217 - // ); 218 - // } 219 - }