STreaming ARchives: stricter, verifiable, deterministic, highly compressible alternatives to CAR files for atproto repositories.
atproto car
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

getting there...

phil a895d829 1ac5f1de

+485 -262
+1 -7
Cargo.lock
··· 243 243 ] 244 244 245 245 [[package]] 246 - name = "leb128" 247 - version = "0.2.5" 248 - source = "registry+https://github.com/rust-lang/crates.io-index" 249 - checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" 250 - 251 - [[package]] 252 246 name = "libc" 253 247 version = "0.2.180" 254 248 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 406 400 "bytes", 407 401 "cid", 408 402 "futures", 409 - "leb128", 410 403 "serde", 411 404 "serde_bytes", 412 405 "serde_ipld_dagcbor", ··· 414 407 "thiserror", 415 408 "tokio", 416 409 "tokio-util", 410 + "unsigned-varint", 417 411 ] 418 412 419 413 [[package]]
+1 -1
Cargo.toml
··· 14 14 serde_ipld_dagcbor = "0.6" 15 15 serde_bytes = "0.11" 16 16 cid = { version = "0.11", features = ["serde-codec"] } 17 - leb128 = "0.2" 18 17 thiserror = "2" 19 18 sha2 = "0.10" 20 19 bytes = "1" 20 + unsigned-varint = "0.8" 21 21 22 22 # Async 23 23 tokio = { version = "1", features = ["io-util", "macros", "rt"], optional = true }
+1 -1
src/blocking.rs
··· 1 1 use crate::error::Result; 2 2 use crate::parser::StarParser; 3 - use crate::types::{Commit, StarItem}; 3 + use crate::types::StarItem; 4 4 use bytes::BytesMut; 5 5 use cid::Cid; 6 6 use std::io::Read;
+4 -1
src/lib.rs
··· 12 12 pub use error::{Result, StarError}; 13 13 pub use parser::StarParser; 14 14 pub use ser::StarEncoder; 15 - pub use types::{Commit, MstEntry, MstNode, StarItem}; 15 + pub use types::{RepoMstEntry, RepoMstNode, StarCommit, StarItem, StarMstEntry, StarMstNode}; 16 16 17 17 #[cfg(feature = "blocking")] 18 18 pub use blocking::StarIterator; 19 + 20 + #[cfg(test)] 21 + mod tests;
+237 -206
src/parser.rs
··· 1 1 use crate::error::{Result, StarError}; 2 - use crate::types::{Commit, MstNode, StarItem}; 2 + use crate::types::{StarCommit, StarItem, StarMstNode}; 3 3 use bytes::{Buf, BytesMut}; 4 4 use cid::Cid; 5 5 use sha2::{Digest, Sha256}; 6 - use std::io::Cursor; 7 6 8 7 #[derive(Debug)] 9 8 enum State { 10 9 Header, 11 - Body { 10 + Body { 12 11 stack: Vec<StackItem>, 13 12 current_len: Option<usize>, 14 13 }, ··· 26 25 implicit_index: Option<usize>, 27 26 }, 28 27 VerifyLayer0 { 29 - node: MstNode, 28 + node: StarMstNode, 30 29 parent_expected: Option<Cid>, 31 30 pending_records: Vec<(usize, Cid)>, 32 31 }, ··· 45 44 46 45 pub fn input(&mut self, buf: &mut BytesMut) -> Result<Option<StarItem>> { 47 46 loop { 48 - match self.state { 47 + // Check if we need to transition from Body to Done 48 + let is_body_done = if let State::Body { stack, .. } = &self.state { 49 + stack.is_empty() 50 + } else { 51 + false 52 + }; 53 + 54 + if is_body_done { 55 + self.state = State::Done; 56 + return Ok(None); 57 + } 58 + 59 + match &mut self.state { 49 60 State::Done => return Ok(None), 50 61 State::Header => { 51 - if buf.is_empty() { 52 - return Ok(None); 62 + return self.parse_header(buf) 63 + } 64 + State::Body { stack, current_len } => { 65 + if Self::process_verification(stack)? { 66 + continue; 53 67 } 54 - if buf[0] != 0x2A { 55 - return Err(StarError::InvalidHeader); 68 + 69 + if let Some(len) = Self::read_length(buf, current_len)? { 70 + let block_bytes = buf.split_to(len); 71 + *current_len = None; 72 + 73 + let item = stack.pop().unwrap(); 74 + match item { 75 + StackItem::Node { expected } => { 76 + return Self::process_node(block_bytes, expected, stack); 77 + }, 78 + StackItem::Record { key, expected, implicit_index } => { 79 + return Self::process_record(block_bytes, key, expected, implicit_index, stack); 80 + }, 81 + _ => return Err(StarError::InvalidState("Unexpected stack item".into())), 82 + } 83 + } else { 84 + return Ok(None); 56 85 } 86 + } 87 + } 88 + } 89 + } 57 90 58 - let mut cursor = Cursor::new(&buf[1..]); 91 + fn parse_header(&mut self, buf: &mut BytesMut) -> Result<Option<StarItem>> { 92 + if buf.len() < 1 { 93 + return Ok(None); 94 + } 95 + if buf[0] != 0x2A { 96 + return Err(StarError::InvalidHeader); 97 + } 59 98 60 - let _ver = match leb128::read::unsigned(&mut cursor) { 61 - Ok(v) => v, 62 - Err(_) => return Ok(None), 63 - }; 99 + // We use a slice of the buffer starting after the magic byte 100 + // unsigned_varint operates on &[u8] 101 + let slice = &buf[1..]; 102 + 103 + let (ver, remaining1) = match unsigned_varint::decode::usize(slice) { 104 + Ok(res) => res, 105 + Err(unsigned_varint::decode::Error::Insufficient) => return Ok(None), 106 + Err(e) => return Err(StarError::InvalidState(format!("Varint error: {}", e))), 107 + }; 108 + 109 + let (len, remaining2) = match unsigned_varint::decode::usize(remaining1) { 110 + Ok(res) => res, 111 + Err(unsigned_varint::decode::Error::Insufficient) => return Ok(None), 112 + Err(e) => return Err(StarError::InvalidState(format!("Varint error: {}", e))), 113 + }; 64 114 65 - let len = match leb128::read::unsigned(&mut cursor) { 66 - Ok(l) => l, 67 - Err(_) => return Ok(None), 68 - }; 115 + let header_varints_len = buf.len() - 1 - remaining2.len(); // bytes consumed by varints 116 + let total_header_len = 1 + header_varints_len; // + magic byte 117 + let total_len = total_header_len + len; 69 118 70 - let header_len = 1 + cursor.position() as usize; 71 - let total_len = header_len + len as usize; 119 + if buf.len() < total_len { 120 + return Ok(None); 121 + } 72 122 73 - if buf.len() < total_len { 74 - return Ok(None); 75 - } 123 + buf.advance(total_header_len); 124 + let commit_bytes = buf.split_to(len); 125 + let commit: StarCommit = serde_ipld_dagcbor::from_slice(&commit_bytes) 126 + .map_err(|e| StarError::Cbor(e.to_string()))?; 127 + 128 + // Check version (conceptually) 129 + let _ = ver; 76 130 77 - buf.advance(header_len); 78 - let commit_bytes = buf.split_to(len as usize); 79 - let commit: Commit = serde_ipld_dagcbor::from_slice(&commit_bytes) 80 - .map_err(|e| StarError::Cbor(e.to_string()))?; 131 + let mut stack = Vec::new(); 132 + if let Some(root_cid) = commit.data { 133 + stack.push(StackItem::Node { 134 + expected: Some(root_cid), 135 + }); 136 + } 81 137 82 - self.state = State::Body { 83 - stack: commit 84 - .data 85 - .map(|root_cid| { 86 - vec![StackItem::Node { 87 - expected: Some(root_cid), 88 - }] 89 - }) 90 - .unwrap_or(vec![]), 91 - current_len: None, 92 - }; 93 - return Ok(Some(StarItem::Commit(commit))); 94 - } 95 - State::Body { 96 - ref mut stack, 97 - mut current_len, 98 - } => { 99 - if stack.is_empty() { 100 - self.state = State::Done; 101 - return Ok(None); 138 + self.state = State::Body { 139 + stack, 140 + current_len: None 141 + }; 142 + Ok(Some(StarItem::Commit(commit))) 143 + } 144 + 145 + fn process_verification(stack: &mut Vec<StackItem>) -> Result<bool> { 146 + if let Some(StackItem::VerifyLayer0 { .. }) = stack.last() { 147 + if let Some(StackItem::VerifyLayer0 { 148 + mut node, 149 + parent_expected, 150 + pending_records, 151 + }) = stack.pop() 152 + { 153 + for (idx, cid) in pending_records { 154 + if idx < node.e.len() { 155 + node.e[idx].v = Some(cid); 102 156 } 157 + } 103 158 104 - // Check for verification task 105 - if let Some(StackItem::VerifyLayer0 { .. }) = stack.last() 106 - && let Some(StackItem::VerifyLayer0 { 107 - mut node, 108 - parent_expected, 109 - pending_records, 110 - }) = stack.pop() 111 - { 112 - for (idx, cid) in pending_records { 113 - if idx < node.e.len() { 114 - node.e[idx].v = Some(cid); 115 - } 116 - } 159 + let repo_node = node.to_repo()?; 160 + let bytes = serde_ipld_dagcbor::to_vec(&repo_node) 161 + .map_err(|e| StarError::Cbor(e.to_string()))?; 117 162 118 - let bytes = serde_ipld_dagcbor::to_vec(&node) 119 - .map_err(|e| StarError::Cbor(e.to_string()))?; 120 - let hash = Sha256::digest(&bytes); 121 - // CIDv1 DAG-CBOR SHA2-256 122 - let cid = Cid::new_v1(0x71, cid::multihash::Multihash::wrap(0x12, &hash)?); 163 + let hash = Sha256::digest(&bytes); 164 + let cid = Cid::new_v1(0x71, cid::multihash::Multihash::wrap(0x12, &hash)?); 123 165 124 - if let Some(expected) = parent_expected 125 - && cid != expected 126 - { 127 - return Err(StarError::VerificationFailed { 128 - expected: expected.to_string(), 129 - computed: cid.to_string(), 130 - }); 131 - } 132 - continue; 166 + if let Some(expected) = parent_expected { 167 + if cid != expected { 168 + return Err(StarError::VerificationFailed { 169 + expected: expected.to_string(), 170 + computed: cid.to_string(), 171 + }); 133 172 } 173 + } 174 + return Ok(true); 175 + } 176 + } 177 + Ok(false) 178 + } 134 179 135 - // Read Length 136 - let len = if let Some(l) = current_len { l } else { 137 - let mut cursor = Cursor::new(&buf[..]); 138 - match leb128::read::unsigned(&mut cursor) { 139 - Ok(l) => { 140 - current_len.replace(l as usize); 141 - buf.advance(cursor.position() as usize); 142 - l as usize 143 - } 144 - Err(_) => return Ok(None), 145 - } 146 - }; 180 + fn read_length(buf: &mut BytesMut, current_len: &mut Option<usize>) -> Result<Option<usize>> { 181 + if current_len.is_none() { 182 + match unsigned_varint::decode::usize(&buf[..]) { 183 + Ok((l, remaining)) => { 184 + let consumed = buf.len() - remaining.len(); 185 + *current_len = Some(l); 186 + buf.advance(consumed); 187 + }, 188 + Err(unsigned_varint::decode::Error::Insufficient) => return Ok(None), 189 + Err(e) => return Err(StarError::InvalidState(format!("Varint error: {}", e))), 190 + } 191 + } 147 192 148 - if buf.len() < len { 149 - return Ok(None); 150 - } 193 + let len = current_len.unwrap(); 194 + if buf.len() < len { 195 + return Ok(None); 196 + } 197 + Ok(Some(len)) 198 + } 151 199 152 - let block_bytes = buf.split_to(len); 153 - current_len.take(); 200 + fn process_node(block_bytes: BytesMut, expected: Option<Cid>, stack: &mut Vec<StackItem>) -> Result<Option<StarItem>> { 201 + let node: StarMstNode = serde_ipld_dagcbor::from_slice(&block_bytes) 202 + .map_err(|e| StarError::Cbor(e.to_string()))?; 154 203 155 - match stack.pop().unwrap() { 156 - StackItem::Node { expected } => { 157 - let node: MstNode = serde_ipld_dagcbor::from_slice(&block_bytes) 158 - .map_err(|e| StarError::Cbor(e.to_string()))?; 204 + // Check for implicit records 205 + let mut has_implicit = false; 206 + for e in &node.e { 207 + if e.v_archived == Some(true) && e.v.is_none() { 208 + has_implicit = true; 209 + break; 210 + } 211 + } 159 212 160 - // Check for implicit records 161 - let mut has_implicit = false; 162 - for e in &node.e { 163 - if e.v_archived == Some(true) && e.v.is_none() { 164 - has_implicit = true; 165 - break; 166 - } 167 - } 213 + if !has_implicit { 214 + let repo_node = node.to_repo()?; 215 + let bytes = serde_ipld_dagcbor::to_vec(&repo_node) 216 + .map_err(|e| StarError::Cbor(e.to_string()))?; 168 217 169 - if !has_implicit { 170 - let hash = Sha256::digest(&block_bytes); 171 - let cid = Cid::new_v1( 172 - 0x71, 173 - cid::multihash::Multihash::wrap(0x12, &hash)?, 174 - ); 175 - if let Some(exp) = expected 176 - && cid != exp 177 - { 178 - return Err(StarError::VerificationFailed { 179 - expected: exp.to_string(), 180 - computed: cid.to_string(), 181 - }); 182 - } 183 - } else { 184 - stack.push(StackItem::VerifyLayer0 { 185 - node: node.clone(), 186 - parent_expected: expected, 187 - pending_records: Vec::new(), 188 - }); 189 - } 218 + let hash = Sha256::digest(&bytes); 219 + let cid = Cid::new_v1( 220 + 0x71, 221 + cid::multihash::Multihash::wrap(0x12, &hash)?, 222 + ); 223 + if let Some(exp) = expected { 224 + if cid != exp { 225 + return Err(StarError::VerificationFailed { 226 + expected: exp.to_string(), 227 + computed: cid.to_string(), 228 + }); 229 + } 230 + } 231 + } else { 232 + stack.push(StackItem::VerifyLayer0 { 233 + node: node.clone(), 234 + parent_expected: expected, 235 + pending_records: Vec::new(), 236 + }); 237 + } 190 238 191 - // Reconstruct keys 192 - let mut prev_key_bytes = Vec::new(); 193 - let mut entry_keys = Vec::new(); 194 - for e in &node.e { 195 - let mut key = if e.p as usize <= prev_key_bytes.len() { 196 - prev_key_bytes[..e.p as usize].to_vec() 197 - } else { 198 - // Should not happen in valid MST 199 - prev_key_bytes.clone() 200 - }; 201 - key.extend_from_slice(&e.k); 202 - entry_keys.push(key.clone()); 203 - prev_key_bytes = key; 204 - } 239 + // Reconstruct keys 240 + let mut prev_key_bytes = Vec::new(); 241 + let mut entry_keys = Vec::new(); 242 + for e in &node.e { 243 + let mut key = if e.p as usize <= prev_key_bytes.len() { 244 + prev_key_bytes[..e.p as usize].to_vec() 245 + } else { 246 + prev_key_bytes.clone() 247 + }; 248 + key.extend_from_slice(&e.k); 249 + entry_keys.push(key.clone()); 250 + prev_key_bytes = key; 251 + } 205 252 206 - // Push children in reverse 207 - for i in (0..node.e.len()).rev() { 208 - let e = &node.e[i]; 209 - let key = entry_keys[i].clone(); 253 + // Push children in reverse 254 + for i in (0..node.e.len()).rev() { 255 + let e = &node.e[i]; 256 + let key = entry_keys[i].clone(); 210 257 211 - if e.t_archived == Some(true) { 212 - stack.push(StackItem::Node { expected: e.t }); 213 - } 258 + if e.t_archived == Some(true) { 259 + stack.push(StackItem::Node { expected: e.t }); 260 + } 214 261 215 - if e.v_archived == Some(true) { 216 - let implicit_index = if e.v.is_none() { Some(i) } else { None }; 217 - stack.push(StackItem::Record { 218 - key, 219 - expected: e.v, 220 - implicit_index, 221 - }); 222 - } 223 - } 262 + if e.v_archived == Some(true) { 263 + let implicit_index = if e.v.is_none() { Some(i) } else { None }; 264 + stack.push(StackItem::Record { 265 + key, 266 + expected: e.v, 267 + implicit_index, 268 + }); 269 + } 270 + } 224 271 225 - if node.l_archived == Some(true) { 226 - stack.push(StackItem::Node { expected: node.l }); 227 - } 272 + if node.l_archived == Some(true) { 273 + stack.push(StackItem::Node { expected: node.l }); 274 + } 228 275 229 - return Ok(Some(StarItem::Node(node))); 230 - } 231 - StackItem::Record { 232 - key, 233 - expected, 234 - implicit_index, 235 - } => { 236 - let hash = Sha256::digest(&block_bytes); 237 - let cid = 238 - Cid::new_v1(0x71, cid::multihash::Multihash::wrap(0x12, &hash)?); 276 + Ok(Some(StarItem::Node(node))) 277 + } 239 278 240 - if let Some(exp) = expected 241 - && cid != exp 242 - { 243 - return Err(StarError::VerificationFailed { 244 - expected: exp.to_string(), 245 - computed: cid.to_string(), 246 - }); 247 - } 279 + fn process_record(block_bytes: BytesMut, key: Vec<u8>, expected: Option<Cid>, implicit_index: Option<usize>, stack: &mut Vec<StackItem>) -> Result<Option<StarItem>> { 280 + let hash = Sha256::digest(&block_bytes); 281 + let cid = Cid::new_v1(0x71, cid::multihash::Multihash::wrap(0x12, &hash)?); 248 282 249 - if let Some(idx) = implicit_index { 250 - let mut found = false; 251 - for item in stack.iter_mut().rev() { 252 - if let StackItem::VerifyLayer0 { 253 - pending_records, .. 254 - } = item 255 - { 256 - pending_records.push((idx, cid)); 257 - found = true; 258 - break; 259 - } 260 - } 261 - if !found { 262 - return Err(StarError::InvalidState( 263 - "Implicit record verification context missing".into(), 264 - )); 265 - } 266 - } 283 + if let Some(exp) = expected { 284 + if cid != exp { 285 + return Err(StarError::VerificationFailed { 286 + expected: exp.to_string(), 287 + computed: cid.to_string(), 288 + }); 289 + } 290 + } 267 291 268 - return Ok(Some(StarItem::Record { 269 - key, 270 - cid, 271 - content: Some(block_bytes.to_vec()), 272 - })); 273 - } 274 - _ => return Err(StarError::InvalidState("Unexpected stack item".into())), 275 - } 292 + if let Some(idx) = implicit_index { 293 + let mut found = false; 294 + for item in stack.iter_mut().rev() { 295 + if let StackItem::VerifyLayer0 { 296 + pending_records, .. 297 + } = item 298 + { 299 + pending_records.push((idx, cid)); 300 + found = true; 301 + break; 276 302 } 277 303 } 304 + if !found { 305 + return Err(StarError::InvalidState( 306 + "Implicit record verification context missing".into(), 307 + )); 308 + } 278 309 } 279 - } 280 - } 281 310 282 - impl Default for StarParser { 283 - fn default() -> Self { 284 - Self::new() 311 + Ok(Some(StarItem::Record { 312 + key, 313 + cid, 314 + content: Some(block_bytes.to_vec()), 315 + })) 285 316 } 286 317 }
+21 -28
src/ser.rs
··· 1 1 use crate::error::Result; 2 - use crate::types::{Commit, MstNode, StarItem}; 2 + use crate::types::{StarCommit, StarItem, StarMstNode}; 3 3 use bytes::{BufMut, BytesMut}; 4 4 5 5 pub struct StarEncoder; 6 6 7 7 impl StarEncoder { 8 - pub fn write_header(commit: &Commit, dst: &mut BytesMut) -> Result<()> { 8 + fn write_varint(val: usize, dst: &mut BytesMut) { 9 + let mut buf = unsigned_varint::encode::usize_buffer(); 10 + let encoded = unsigned_varint::encode::usize(val, &mut buf); 11 + dst.extend_from_slice(encoded); 12 + } 13 + 14 + pub fn write_header(commit: &StarCommit, dst: &mut BytesMut) -> Result<()> { 9 15 dst.put_u8(0x2A); 10 - 11 - let mut buf = Vec::new(); 12 - leb128::write::unsigned(&mut buf, 1)?; 13 - dst.extend_from_slice(&buf); 14 - 16 + 17 + Self::write_varint(1, dst); 18 + 15 19 let commit_bytes = serde_ipld_dagcbor::to_vec(commit) 16 20 .map_err(|e| crate::error::StarError::Cbor(e.to_string()))?; 17 - 18 - let mut buf = Vec::new(); 19 - leb128::write::unsigned(&mut buf, commit_bytes.len() as u64)?; 20 - dst.extend_from_slice(&buf); 21 - 21 + 22 + Self::write_varint(commit_bytes.len(), dst); 22 23 dst.extend_from_slice(&commit_bytes); 23 - 24 + 24 25 Ok(()) 25 26 } 26 27 27 - pub fn write_node(node: &MstNode, dst: &mut BytesMut) -> Result<()> { 28 + pub fn write_node(node: &StarMstNode, dst: &mut BytesMut) -> Result<()> { 28 29 let node_bytes = serde_ipld_dagcbor::to_vec(node) 29 30 .map_err(|e| crate::error::StarError::Cbor(e.to_string()))?; 30 - 31 - let mut buf = Vec::new(); 32 - leb128::write::unsigned(&mut buf, node_bytes.len() as u64)?; 33 - dst.extend_from_slice(&buf); 34 - 31 + 32 + Self::write_varint(node_bytes.len(), dst); 35 33 dst.extend_from_slice(&node_bytes); 36 - 34 + 37 35 Ok(()) 38 36 } 39 37 40 38 pub fn write_record(record_bytes: &[u8], dst: &mut BytesMut) -> Result<()> { 41 - let mut buf = Vec::new(); 42 - leb128::write::unsigned(&mut buf, record_bytes.len() as u64)?; 43 - dst.extend_from_slice(&buf); 44 - 39 + Self::write_varint(record_bytes.len(), dst); 45 40 dst.extend_from_slice(record_bytes); 46 - 41 + 47 42 Ok(()) 48 43 } 49 44 } ··· 60 55 if let Some(bytes) = content { 61 56 Self::write_record(&bytes, dst) 62 57 } else { 63 - Err(crate::error::StarError::InvalidState( 64 - "Cannot serialize record without content".into(), 65 - )) 58 + Err(crate::error::StarError::InvalidState("Cannot serialize record without content".into())) 66 59 } 67 60 } 68 61 }
+163
src/tests.rs
··· 1 + #[cfg(test)] 2 + mod tests { 3 + use crate::parser::StarParser; 4 + use crate::ser::StarEncoder; 5 + use crate::types::{ 6 + RepoMstEntry, RepoMstNode, StarCommit, StarItem, StarMstEntry, StarMstNode, 7 + }; 8 + use bytes::BytesMut; 9 + use cid::Cid; 10 + use serde_bytes::ByteBuf; 11 + use sha2::{Digest, Sha256}; 12 + 13 + fn create_test_cid(data: &[u8]) -> Cid { 14 + let hash = Sha256::digest(data); 15 + Cid::new_v1(0x71, cid::multihash::Multihash::wrap(0x12, &hash).unwrap()) 16 + } 17 + 18 + #[test] 19 + fn test_roundtrip_basic() { 20 + // 1. Create a dummy record 21 + let record_data = b"hello world"; 22 + let record_cid = create_test_cid(record_data); 23 + let key = b"foo".to_vec(); 24 + 25 + // 2. Create a dummy MST Node (Layer 0, implicit record) 26 + let node_entry = StarMstEntry { 27 + p: 0, 28 + k: ByteBuf::from(key.clone()), 29 + v: None, 30 + v_archived: Some(true), 31 + t: None, 32 + t_archived: None, 33 + }; 34 + 35 + let star_node = StarMstNode { 36 + l: None, 37 + l_archived: None, 38 + e: vec![node_entry], 39 + }; 40 + 41 + // 3. Manually compute the expected Node CID for verification 42 + let repo_entry = RepoMstEntry { 43 + p: 0, 44 + k: ByteBuf::from(key.clone()), 45 + v: record_cid, 46 + t: None, 47 + }; 48 + let repo_node = RepoMstNode { 49 + l: None, 50 + e: vec![repo_entry], 51 + }; 52 + let repo_bytes = serde_ipld_dagcbor::to_vec(&repo_node).unwrap(); 53 + let node_cid = create_test_cid(&repo_bytes); 54 + 55 + // 4. Create Commit 56 + let commit = StarCommit { 57 + did: "did:example:test".into(), 58 + version: 3, 59 + data: Some(node_cid), 60 + rev: "rev1".into(), 61 + prev: None, 62 + sig: None, 63 + }; 64 + 65 + // 5. Serialize to Buffer 66 + let mut buf = BytesMut::new(); 67 + StarEncoder::write_header(&commit, &mut buf).unwrap(); 68 + StarEncoder::write_node(&star_node, &mut buf).unwrap(); 69 + StarEncoder::write_record(record_data, &mut buf).unwrap(); 70 + 71 + // 6. Deserialize and Verify 72 + let mut parser = StarParser::new(); 73 + 74 + // Header 75 + let item1 = parser.input(&mut buf).unwrap().unwrap(); 76 + match item1 { 77 + StarItem::Commit(c) => assert_eq!(c, commit), 78 + _ => panic!("Expected commit"), 79 + } 80 + 81 + // Node 82 + let item2 = parser.input(&mut buf).unwrap().unwrap(); 83 + match item2 { 84 + StarItem::Node(n) => { 85 + assert_eq!(n.e[0].v, None); 86 + assert_eq!(n.e[0].v_archived, Some(true)); 87 + } 88 + _ => panic!("Expected node"), 89 + } 90 + 91 + // Record 92 + let item3 = parser.input(&mut buf).unwrap().unwrap(); 93 + match item3 { 94 + StarItem::Record { 95 + key: k, 96 + cid, 97 + content, 98 + } => { 99 + assert_eq!(k, key); 100 + assert_eq!(cid, record_cid); 101 + assert_eq!(content.unwrap(), record_data); 102 + } 103 + _ => panic!("Expected record"), 104 + } 105 + 106 + // Done 107 + assert!(parser.input(&mut buf).unwrap().is_none()); 108 + } 109 + 110 + #[test] 111 + fn test_verification_failure() { 112 + // 1. Create a dummy record 113 + let record_data = b"hello world"; 114 + let key = b"foo".to_vec(); 115 + 116 + // 2. Create STAR Node 117 + let node_entry = StarMstEntry { 118 + p: 0, 119 + k: ByteBuf::from(key.clone()), 120 + v: None, 121 + v_archived: Some(true), 122 + t: None, 123 + t_archived: None, 124 + }; 125 + let star_node = StarMstNode { 126 + l: None, 127 + l_archived: None, 128 + e: vec![node_entry], 129 + }; 130 + 131 + // 3. Create Commit with WRONG CID 132 + let wrong_cid = create_test_cid(b"garbage"); 133 + let commit = StarCommit { 134 + did: "did:example:test".into(), 135 + version: 3, 136 + data: Some(wrong_cid), 137 + rev: "rev1".into(), 138 + prev: None, 139 + sig: None, 140 + }; 141 + 142 + // 4. Serialize 143 + let mut buf = BytesMut::new(); 144 + StarEncoder::write_header(&commit, &mut buf).unwrap(); 145 + StarEncoder::write_node(&star_node, &mut buf).unwrap(); 146 + StarEncoder::write_record(record_data, &mut buf).unwrap(); 147 + 148 + // 5. Parse 149 + let mut parser = StarParser::new(); 150 + parser.input(&mut buf).unwrap(); // Header OK 151 + parser.input(&mut buf).unwrap(); // Node OK (verification deferred) 152 + parser.input(&mut buf).unwrap(); // Record OK (verification scheduled) 153 + 154 + // 6. Trigger verification (processing VerifyLayer0) 155 + let result = parser.input(&mut buf); 156 + 157 + assert!(result.is_err()); 158 + match result.unwrap_err() { 159 + crate::error::StarError::VerificationFailed { .. } => {} 160 + e => panic!("Expected VerificationFailed, got {:?}", e), 161 + } 162 + } 163 + }
+57 -18
src/types.rs
··· 1 + use crate::error::{Result, StarError}; 1 2 use cid::Cid; 2 3 use serde::{Deserialize, Serialize}; 3 4 use serde_bytes::ByteBuf; 4 5 5 - /// The ATProto Commit object (DAG-CBOR) 6 + // --- STAR Types (Wire Format) --- 7 + 8 + /// The STAR Commit object 6 9 #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] 7 - pub struct Commit { 10 + pub struct StarCommit { 8 11 pub did: String, 9 - pub version: u64, 12 + pub version: i64, 10 13 #[serde(default, skip_serializing_if = "Option::is_none")] 11 14 pub data: Option<Cid>, 12 15 pub rev: String, ··· 16 19 pub sig: Option<ByteBuf>, 17 20 } 18 21 19 - /// The MST Node object (DAG-CBOR) 22 + /// The STAR MST Node object 20 23 #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] 21 - pub struct MstNode { 24 + pub struct StarMstNode { 22 25 #[serde(default, skip_serializing_if = "Option::is_none")] 23 26 pub l: Option<Cid>, 24 - 25 27 #[serde(rename = "L", default, skip_serializing_if = "Option::is_none")] 26 28 pub l_archived: Option<bool>, 27 - 28 - pub e: Vec<MstEntry>, 29 + pub e: Vec<StarMstEntry>, 29 30 } 30 31 31 - /// The MST Entry object 32 + /// The STAR MST Entry object 32 33 #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] 33 - pub struct MstEntry { 34 - /// Prefix length 34 + pub struct StarMstEntry { 35 35 pub p: u32, 36 - /// Key suffix 37 36 pub k: ByteBuf, 38 - /// Value (Record CID) - Optional for implicit records 39 37 #[serde(default, skip_serializing_if = "Option::is_none")] 40 38 pub v: Option<Cid>, 41 - /// Value Archived Flag 42 39 #[serde(rename = "V", default, skip_serializing_if = "Option::is_none")] 43 40 pub v_archived: Option<bool>, 44 - /// Right Subtree CID 45 41 #[serde(default, skip_serializing_if = "Option::is_none")] 46 42 pub t: Option<Cid>, 47 - /// Right Subtree Archived Flag 48 43 #[serde(rename = "T", default, skip_serializing_if = "Option::is_none")] 49 44 pub t_archived: Option<bool>, 50 45 } 51 46 47 + // --- CAR Repo Types (Canonical / Hashing Format) --- 48 + 49 + /// The Canonical MST Node object (for CID calculation) 50 + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] 51 + pub struct RepoMstNode { 52 + #[serde(default, skip_serializing_if = "Option::is_none")] 53 + pub l: Option<Cid>, 54 + pub e: Vec<RepoMstEntry>, 55 + } 56 + 57 + /// The Canonical MST Entry object 58 + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] 59 + pub struct RepoMstEntry { 60 + pub p: u32, 61 + pub k: ByteBuf, 62 + pub v: Cid, // Required in Repo spec 63 + #[serde(default, skip_serializing_if = "Option::is_none")] 64 + pub t: Option<Cid>, 65 + } 66 + 67 + // --- Conversion --- 68 + 69 + impl StarMstNode { 70 + pub fn to_repo(&self) -> Result<RepoMstNode> { 71 + let mut entries = Vec::with_capacity(self.e.len()); 72 + for e in &self.e { 73 + entries.push(RepoMstEntry { 74 + p: e.p, 75 + k: e.k.clone(), 76 + v: e.v.ok_or_else(|| { 77 + StarError::InvalidState( 78 + "Cannot convert implicit STAR entry to Repo entry: missing 'v'".into(), 79 + ) 80 + })?, 81 + t: e.t, 82 + }); 83 + } 84 + Ok(RepoMstNode { 85 + l: self.l, 86 + e: entries, 87 + }) 88 + } 89 + } 90 + 52 91 /// A parsed item from the STAR stream 53 92 #[derive(Debug, Clone)] 54 93 pub enum StarItem { 55 - Commit(Commit), 56 - Node(MstNode), 94 + Commit(StarCommit), 95 + Node(StarMstNode), 57 96 Record { 58 97 key: Vec<u8>, 59 98 cid: Cid,