Fast and robust atproto CAR file processing in rust
15
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 233 lines 9.0 kB view raw
1//! Low-level types for parsing raw atproto MST CARs 2//! 3//! The primary aim is to work through the **tree** structure. Non-node blocks 4//! are left as raw bytes, for upper levels to parse into DAG-CBOR or whatever. 5 6use cid::Cid; 7use serde::Deserialize; 8use sha2::{Digest, Sha256}; 9 10/// The top-level data object in a repository's tree is a signed commit. 11#[derive(Debug, Deserialize)] 12// #[serde(deny_unknown_fields)] 13pub struct Commit { 14 /// the account DID associated with the repo, in strictly normalized form 15 /// (eg, lowercase as appropriate) 16 pub did: String, 17 /// fixed value of 3 for this repo format version 18 pub version: u64, 19 /// pointer to the top of the repo contents tree structure (MST) 20 pub data: Cid, 21 /// revision of the repo, used as a logical clock. 22 /// 23 /// TID format. Must increase monotonically. Recommend using current 24 /// timestamp as TID; rev values in the "future" (beyond a fudge factor) 25 /// should be ignored and not processed 26 pub rev: String, 27 /// pointer (by hash) to a previous commit object for this repository. 28 /// 29 /// Could be used to create a chain of history, but largely unused (included 30 /// for v2 backwards compatibility). In version 3 repos, this field must 31 /// exist in the CBOR object, but is virtually always null. NOTE: previously 32 /// specified as nullable and optional, but this caused interoperability 33 /// issues. 34 pub prev: Option<Cid>, 35 /// cryptographic signature of this commit, as raw bytes 36 #[serde(with = "serde_bytes")] 37 pub sig: serde_bytes::ByteBuf, 38} 39 40use serde::de::{self, Deserializer, MapAccess, Unexpected, Visitor}; 41use std::fmt; 42 43pub type Depth = u32; 44 45#[inline(always)] 46pub fn atproto_mst_depth(key: &str) -> Depth { 47 // 128 bits oughta be enough: https://bsky.app/profile/retr0.id/post/3jwwbf4izps24 48 u128::from_be_bytes(Sha256::digest(key).split_at(16).0.try_into().unwrap()).leading_zeros() / 2 49} 50 51#[derive(Debug)] 52pub(crate) struct MstNode { 53 pub depth: Option<Depth>, // known for nodes with entries (required for root) 54 pub things: Vec<NodeThing>, 55} 56 57#[derive(Debug)] 58pub(crate) struct NodeThing { 59 pub(crate) cid: Cid, 60 pub(crate) kind: ThingKind, 61} 62 63#[derive(Debug)] 64pub(crate) enum ThingKind { 65 Tree, 66 Value { rkey: String }, 67} 68 69impl<'de> Deserialize<'de> for MstNode { 70 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> 71 where 72 D: Deserializer<'de>, 73 { 74 struct NodeVisitor; 75 impl<'de> Visitor<'de> for NodeVisitor { 76 type Value = MstNode; 77 78 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { 79 formatter.write_str("struct MstNode") 80 } 81 82 fn visit_map<V>(self, mut map: V) -> Result<MstNode, V::Error> 83 where 84 V: MapAccess<'de>, 85 { 86 let mut found_left = false; 87 let mut left = None; 88 let mut found_entries = false; 89 let mut things = Vec::new(); 90 let mut depth = None; 91 92 while let Some(key) = map.next_key()? { 93 match key { 94 "l" => { 95 if found_left { 96 return Err(de::Error::duplicate_field("l")); 97 } 98 found_left = true; 99 if let Some(cid) = map.next_value()? { 100 left = Some(NodeThing { 101 cid, 102 kind: ThingKind::Tree, 103 }); 104 } 105 } 106 "e" => { 107 if found_entries { 108 return Err(de::Error::duplicate_field("e")); 109 } 110 found_entries = true; 111 112 let mut prefix: Vec<u8> = vec![]; 113 114 for entry in map.next_value::<Vec<Entry>>()? { 115 let mut rkey: Vec<u8> = vec![]; 116 let pre_checked = 117 prefix.get(..entry.prefix_len).ok_or_else(|| { 118 de::Error::invalid_value( 119 Unexpected::Bytes(&prefix), 120 &"a prefix at least as long as the prefix_len", 121 ) 122 })?; 123 124 rkey.extend_from_slice(pre_checked); 125 rkey.extend_from_slice(&entry.keysuffix); 126 127 let rkey_s = String::from_utf8(rkey.clone()).map_err(|_| { 128 de::Error::invalid_value( 129 Unexpected::Bytes(&rkey), 130 &"a valid utf-8 rkey", 131 ) 132 })?; 133 134 let key_depth = atproto_mst_depth(&rkey_s); 135 if depth.is_none() { 136 depth = Some(key_depth); 137 } else if Some(key_depth) != depth { 138 return Err(de::Error::invalid_value( 139 Unexpected::Bytes(&prefix), 140 &"all rkeys to have equal MST depth", 141 )); 142 } 143 144 things.push(NodeThing { 145 cid: entry.value, 146 kind: ThingKind::Value { rkey: rkey_s }, 147 }); 148 149 if let Some(cid) = entry.tree { 150 things.push(NodeThing { 151 cid, 152 kind: ThingKind::Tree, 153 }); 154 } 155 156 prefix = rkey; 157 } 158 } 159 f => return Err(de::Error::unknown_field(f, NODE_FIELDS)), 160 } 161 } 162 if !found_left { 163 return Err(de::Error::missing_field("l")); 164 } 165 if !found_entries { 166 return Err(de::Error::missing_field("e")); 167 } 168 169 things.reverse(); 170 if let Some(l) = left { 171 things.push(l); 172 } 173 174 Ok(MstNode { depth, things }) 175 } 176 } 177 178 const NODE_FIELDS: &[&str] = &["l", "e"]; 179 deserializer.deserialize_struct("MstNode", NODE_FIELDS, NodeVisitor) 180 } 181} 182 183impl MstNode { 184 pub(crate) fn is_empty(&self) -> bool { 185 self.things.is_empty() 186 } 187 /// test if a block could possibly be a node 188 /// 189 /// we can't eagerly decode records except where we're *sure* they cannot be 190 /// an mst node (and even then we can only attempt) because you can't know 191 /// with certainty what a block is supposed to be without actually walking 192 /// the tree. 193 /// 194 /// so if a block *could be* a node, any record converter must postpone 195 /// processing. if it turns out it happens to be a very node-looking record, 196 /// well, sorry, it just has to only be processed later when that's known. 197 #[inline(always)] 198 pub(crate) fn could_be(bytes: impl AsRef<[u8]>) -> bool { 199 const NODE_FINGERPRINT: [u8; 3] = [ 200 0xA2, // map length 2 (for "l" and "e" keys) 201 0x61, // text length 1 202 b'e', // "e" before "l" because map keys have to be lex-sorted 203 // 0x8?: "e" has array (0x100 upper 3 bits) of some length 204 ]; 205 let bytes = bytes.as_ref(); 206 bytes.starts_with(&NODE_FINGERPRINT) 207 && bytes 208 .get(3) 209 .map(|b| b & 0b1110_0000 == 0x80) 210 .unwrap_or(false) 211 } 212} 213 214/// TreeEntry object 215#[derive(Debug, Deserialize, PartialEq)] 216#[serde(deny_unknown_fields)] 217pub(crate) struct Entry { 218 /// count of bytes shared with previous TreeEntry in this Node (if any) 219 #[serde(rename = "p")] 220 pub prefix_len: usize, 221 /// remainder of key for this TreeEntry, after "prefixlen" have been removed 222 #[serde(rename = "k")] 223 pub keysuffix: serde_bytes::ByteBuf, 224 /// link to the record data (CBOR) for this entry 225 #[serde(rename = "v")] 226 pub value: Cid, 227 /// link to a sub-tree Node at a lower level 228 /// 229 /// the lower level must have keys sorting after this TreeEntry's key (to 230 /// the "right"), but before the next TreeEntry's key in this Node (if any) 231 #[serde(rename = "t")] 232 pub tree: Option<Cid>, 233}