Fast and robust atproto CAR file processing in rust
15
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix rkey -> key terminology

phil c3c8a55a 6ab51051

+13 -13
+1 -1
changelog.md
··· 18 18 19 19 - drop sqlite, pick up fjall v3 for some speeeeeeed (and code simplification and easier build requirements and) 20 20 - no more `Processable` trait, process functions are just `Vec<u8> -> Vec<u8>` now (bring your own ser/de). there's a potential small cost here where processors need to now actually go through serialization even for in-memory car walking, but i think zero-copy approaches (eg. rkyv) are low-cost enough 21 - - custom deserialize for MST nodes that does as much depth calculation and rkey validation as - possible in-line. (not clear if it actually made anything faster) 21 + - custom deserialize for MST nodes that does as much depth calculation and key validation as possible in-line. (not clear if it actually made anything faster) 22 22 - check MST depth at every node properly (previously it could do some walking before being able to check and included some assumptions) 23 23 - check MST for empty leaf nodes (which not allowed) 24 24 - shave 0.6 nanoseconds (really) from MST depth calculation (don't ask)
+1 -1
examples/disk-read-file/main.rs
··· 64 64 log::info!("walking..."); 65 65 66 66 // this example uses the disk driver's channel mode: the tree walking is 67 - // spawned onto a blocking thread, and we get chunks of rkey+blocks back 67 + // spawned onto a blocking thread, and we get chunks of key + record pairs back 68 68 let (mut rx, join) = driver.to_channel(512); 69 69 while let Some(outputs) = rx.recv().await { 70 70 let outputs = outputs?;
+1 -1
readme.md
··· 64 64 more recent todo 65 65 - [ ] add a zero-copy rkyv process function example 66 66 - [ ] car slices 67 - - [ ] lazy-value stream (for rkey -> CID diffing; tap-like `#sync` handling; save a fjall record `.get` when not needed) 67 + - [ ] lazy-value stream (for key -> CID diffing; tap-like `#sync` handling; save a fjall record `.get` when not needed) 68 68 - [x] get an *emtpy* car for the test suite 69 69 - [x] implement a max size on disk limit 70 70
+10 -10
src/mst.rs
··· 137 137 let mut prefix: Vec<u8> = vec![]; 138 138 139 139 for entry in map.next_value::<Vec<Entry>>()? { 140 - let mut rkey: Vec<u8> = vec![]; 140 + let mut key_bytes: Vec<u8> = vec![]; 141 141 let pre_checked = 142 142 prefix.get(..entry.prefix_len).ok_or_else(|| { 143 143 de::Error::invalid_value( ··· 146 146 ) 147 147 })?; 148 148 149 - rkey.extend_from_slice(pre_checked); 150 - rkey.extend_from_slice(&entry.keysuffix); 149 + key_bytes.extend_from_slice(pre_checked); 150 + key_bytes.extend_from_slice(&entry.keysuffix); 151 151 152 - let rkey_s = String::from_utf8(rkey.clone()).map_err(|_| { 152 + let key = String::from_utf8(key_bytes.clone()).map_err(|_| { 153 153 de::Error::invalid_value( 154 - Unexpected::Bytes(&rkey), 155 - &"a valid utf-8 rkey", 154 + Unexpected::Bytes(&key_bytes), 155 + &"a valid utf-8 key", 156 156 ) 157 157 })?; 158 158 159 - let key_layer = atproto_mst_layer(&rkey_s); 159 + let key_layer = atproto_mst_layer(&key); 160 160 if layer.is_none() { 161 161 layer = Some(key_layer); 162 162 } else if Some(key_layer) != layer { 163 163 return Err(de::Error::invalid_value( 164 164 Unexpected::Bytes(&prefix), 165 - &"all rkeys to have equal MST layer", 165 + &"all keys to have equal MST layer", 166 166 )); 167 167 } 168 168 169 169 things.push(NodeThing { 170 170 link: entry.value.into(), 171 - kind: ThingKind::Record(rkey_s), 171 + kind: ThingKind::Record(key), 172 172 }); 173 173 174 174 if let Some(link) = entry.tree { ··· 178 178 }); 179 179 } 180 180 181 - prefix = rkey; 181 + prefix = key_bytes; 182 182 } 183 183 } 184 184 f => return Err(de::Error::unknown_field(f, NODE_FIELDS)),