Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

simplify flat stack handling

pretty much no perf impact still

phil 4cb98218 f9b01f11

+46 -70
-2
src/mst.rs
··· 73 73 bytes.starts_with(&NODE_FINGERPRINT) 74 74 // && bytes.get(3).map(|b| b & 0xF0 == 0x80).unwrap_or(false) 75 75 } 76 - } 77 76 78 - impl Node { 79 77 /// Check if a node has any entries 80 78 /// 81 79 /// An empty repository with no records is represented as a single MST node
+46 -68
src/walk.rs
··· 35 35 Step { rkey: String, data: T }, 36 36 } 37 37 38 - /// a transformed mst::Node which we can mutate and track progress on 39 - /// 40 - /// contains an optional left subtree, whose contents are all to the left of 41 - /// every entry in the entries array. 42 - #[derive(Debug, PartialEq)] 43 - struct ActionNode { 44 - entries: Vec<Need>, 45 - } 46 - 47 - impl ActionNode { 48 - fn from_root(tree_root_cid: Cid) -> Self { 49 - ActionNode { 50 - entries: vec![Need::Node(tree_root_cid)], 51 - } 52 - } 53 - fn next(&self) -> Option<Need> { 54 - self.entries.last().cloned() 55 - } 56 - fn found(&mut self) { 57 - self.entries.pop(); 58 - } 59 - } 60 - 61 38 #[derive(Debug, Clone, PartialEq)] 62 39 enum Need { 63 40 Node(Cid), 64 41 Record { rkey: String, cid: Cid }, 65 42 } 66 43 67 - impl TryFrom<&Node> for ActionNode { 68 - type Error = ActionNodeError; 44 + fn push_from_node(stack: &mut Vec<Need>, node: &Node) -> Result<(), ActionNodeError> { 45 + let mut entries = Vec::with_capacity(node.entries.len()); 69 46 70 - fn try_from(node: &Node) -> Result<Self, Self::Error> { 71 - let mut entries = vec![]; 72 - 73 - if let Some(tree) = node.left { 74 - entries.push(Need::Node(tree)); 75 - } 76 - 77 - let mut prefix = vec![]; 78 - for entry in &node.entries { 79 - let mut rkey = vec![]; 80 - let pre_checked = prefix 81 - .get(..entry.prefix_len) 82 - .ok_or(ActionNodeError::EntryPrefixOutOfbounds)?; 83 - rkey.extend_from_slice(pre_checked); 84 - rkey.extend_from_slice(&entry.keysuffix); 85 - prefix = rkey.clone(); 47 + let mut prefix = vec![]; 48 + for entry in &node.entries { 49 + let mut rkey = vec![]; 50 + let pre_checked = prefix 51 + .get(..entry.prefix_len) 52 + .ok_or(ActionNodeError::EntryPrefixOutOfbounds)?; 53 + rkey.extend_from_slice(pre_checked); 54 + rkey.extend_from_slice(&entry.keysuffix); 55 + prefix = rkey.clone(); 86 56 87 - entries.push(Need::Record { 88 - rkey: String::from_utf8(rkey)?, 89 - cid: entry.value, 90 - }); 91 - if let Some(ref tree) = entry.tree { 92 - entries.push(Need::Node(*tree)); 93 - } 57 + entries.push(Need::Record { 58 + rkey: String::from_utf8(rkey)?, 59 + cid: entry.value, 60 + }); 61 + if let Some(ref tree) = entry.tree { 62 + entries.push(Need::Node(*tree)); 94 63 } 64 + } 95 65 96 - entries.reverse(); 66 + entries.reverse(); 67 + stack.append(&mut entries); 97 68 98 - Ok(ActionNode { entries }) 69 + if let Some(tree) = node.left { 70 + stack.push(Need::Node(tree)); 99 71 } 72 + Ok(()) 100 73 } 101 74 102 75 #[derive(Debug)] 103 76 pub struct Walker { 104 - stack: Vec<ActionNode>, 77 + stack: Vec<Need>, 105 78 } 106 79 107 80 impl Walker { 108 81 pub fn new(tree_root_cid: Cid) -> Self { 109 82 Self { 110 - stack: vec![ActionNode::from_root(tree_root_cid)], 83 + stack: vec![Need::Node(tree_root_cid)], 111 84 } 112 85 } 113 86 ··· 117 90 process: impl Fn(&[u8]) -> ProcRes<T, E>, 118 91 ) -> Result<Step<T>, Trip<E>> { 119 92 loop { 120 - let Some(current_node) = self.stack.last_mut() else { 93 + let Some(mut need) = self.stack.last() else { 121 94 log::trace!("tried to walk but we're actually done."); 122 95 return Ok(Step::Finish); 123 96 }; 124 - let Some(mut need) = current_node.next() else { 125 - self.stack.pop(); 126 - continue; 127 - }; 128 97 129 98 match &mut need { 130 99 Need::Node(cid) => { ··· 141 110 .map_err(|e| Trip::BadCommit(e.into()))?; 142 111 143 112 // found node, make sure we remember 144 - current_node.found(); 113 + self.stack.pop(); 145 114 146 115 // queue up work on the found node next 147 - self.stack.push((&node).try_into()?); 116 + push_from_node(&mut self.stack, &node)?; 148 117 } 149 118 Need::Record { rkey, cid } => { 150 119 log::trace!("need record {cid:?}"); 151 - let Some(data) = blocks.get(cid) else { 120 + let Some(data) = blocks.get_mut(cid) else { 152 121 log::trace!("record block not found, resting"); 153 122 return Ok(Step::Rest(*cid)); 154 123 }; ··· 156 125 let data = match data { 157 126 MaybeProcessedBlock::Raw(data) => process(data), 158 127 MaybeProcessedBlock::Processed(Ok(t)) => Ok(t.clone()), 159 - MaybeProcessedBlock::Processed(_e) => { 160 - return Err(Trip::RecordFailedProcessing("booo".into())); 161 - } // TODO 128 + bad => { 129 + // big hack to pull the error out -- this corrupts 130 + // a block, so we should not continue trying to work 131 + let mut steal = MaybeProcessedBlock::Raw(vec![]); 132 + std::mem::swap(&mut steal, bad); 133 + let MaybeProcessedBlock::Processed(Err(e)) = steal else { 134 + unreachable!(); 135 + }; 136 + return Err(Trip::ProcessFailed(e)); 137 + } 162 138 }; 163 139 164 140 // found node, make sure we remember 165 - current_node.found(); 141 + self.stack.pop(); 166 142 167 143 log::trace!("emitting a block as a step. depth={}", self.stack.len()); 168 144 let data = data.map_err(Trip::ProcessFailed)?; ··· 230 206 left: None, 231 207 entries: vec![], 232 208 }; 233 - let action_node: ActionNode = (&node).try_into().unwrap(); 234 - assert_eq!(action_node.next(), None); 209 + let mut stack = vec![]; 210 + push_from_node(&mut stack, &node).unwrap(); 211 + assert_eq!(stack.last(), None); 235 212 } 236 213 237 214 #[test] ··· 240 217 left: Some(cid1()), 241 218 entries: vec![], 242 219 }; 243 - let action_node: ActionNode = (&node).try_into().unwrap(); 244 - assert_eq!(action_node.next(), Some(Need::Node(cid1()))); 220 + let mut stack = vec![]; 221 + push_from_node(&mut stack, &node).unwrap(); 222 + assert_eq!(stack.last(), Some(Need::Node(cid1())).as_ref()); 245 223 } 246 224 247 225 // #[test]