STreaming ARchives: stricter, verifiable, deterministic, highly compressible alternatives to CAR files for atproto repositories.
atproto car
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

fmt

phil 6326dfb3 5ecc07f8

+95 -66
+2 -2
src/async_io.rs
··· 1 + use crate::error::StarError; 1 2 use crate::parser::StarParser; 2 3 use crate::types::StarItem; 3 - use crate::error::StarError; 4 - use bytes::{BytesMut, Buf}; 4 + use bytes::{Buf, BytesMut}; 5 5 use tokio_util::codec::Decoder; 6 6 7 7 impl Decoder for StarParser {
+6 -6
src/blocking.rs
··· 1 - use std::io::Read; 1 + use crate::error::Result; 2 2 use crate::parser::StarParser; 3 - use crate::error::Result; 4 3 use crate::types::StarItem; 5 4 use cid::Cid; 6 5 use std::collections::VecDeque; 6 + use std::io::Read; 7 7 8 8 pub struct StarIterator<R> { 9 9 reader: R, ··· 55 55 // make_contiguous() allows us to get a single slice, but might involve a memory copy/move 56 56 // if the buffer is wrapped. This is amortized O(1) in many cases but can be O(N). 57 57 // However, it solves the "head removal" problem perfectly (pop_front is O(1)). 58 - 58 + 59 59 self.buffer.make_contiguous(); 60 60 let (slice, _) = self.buffer.as_slices(); 61 - 61 + 62 62 match self.parser.parse(slice) { 63 63 Ok((consumed, Some(item))) => { 64 64 self.buffer.drain(..consumed); ··· 66 66 } 67 67 Ok((consumed, None)) => { 68 68 self.buffer.drain(..consumed); 69 - 69 + 70 70 // Read more 71 71 // We can't read directly into VecDeque easily without temporary buffer 72 72 // because it doesn't expose a mutable slice to uninitialized memory. ··· 76 76 Ok(n) => self.buffer.extend(&temp[..n]), 77 77 Err(e) => return Some(Err(e.into())), 78 78 } 79 - }, 79 + } 80 80 Err(e) => return Some(Err(e)), 81 81 } 82 82 }
+44 -31
src/parser.rs
··· 7 7 #[derive(Debug)] 8 8 enum State { 9 9 Header, 10 - Body { 10 + Body { 11 11 stack: Vec<StackItem>, 12 12 current_len: Option<usize>, 13 13 }, ··· 46 46 47 47 pub fn parse(&mut self, buf: &[u8]) -> Result<(usize, Option<StarItem>)> { 48 48 let mut consumed = 0; 49 - 49 + 50 50 loop { 51 51 let is_body_done = if let State::Body { stack, .. } = &self.state { 52 52 stack.is_empty() ··· 80 80 Some((n, len)) => (n, len), 81 81 None => return Ok((consumed, None)), 82 82 }; 83 - 83 + 84 84 let body_buf = &current_buf[len_consumed..]; 85 85 if body_buf.len() < len { 86 86 consumed += len_consumed; ··· 89 89 90 90 let block_bytes = &body_buf[..len]; 91 91 *current_len = None; 92 - 92 + 93 93 let item = stack.pop().unwrap(); 94 94 let result_item = match item { 95 - StackItem::Node { expected, expected_height } => { 96 - Self::process_node(block_bytes, expected, expected_height, stack)? 97 - }, 98 - StackItem::Record { key, expected, implicit_index } => { 95 + StackItem::Node { 96 + expected, 97 + expected_height, 98 + } => Self::process_node(block_bytes, expected, expected_height, stack)?, 99 + StackItem::Record { 100 + key, 101 + expected, 102 + implicit_index, 103 + } => { 99 104 Self::process_record(block_bytes, key, expected, implicit_index, stack)? 100 - }, 105 + } 101 106 _ => return Err(StarError::InvalidState("Unexpected stack item".into())), 102 107 }; 103 108 ··· 117 122 } 118 123 119 124 let slice = &buf[1..]; 120 - 125 + 121 126 let (ver, remaining1) = match unsigned_varint::decode::usize(slice) { 122 127 Ok(res) => res, 123 128 Err(unsigned_varint::decode::Error::Insufficient) => return Ok(None), 124 129 Err(e) => return Err(StarError::InvalidState(format!("Varint error: {}", e))), 125 130 }; 126 - 131 + 127 132 let (len, remaining2) = match unsigned_varint::decode::usize(remaining1) { 128 133 Ok(res) => res, 129 134 Err(unsigned_varint::decode::Error::Insufficient) => return Ok(None), 130 135 Err(e) => return Err(StarError::InvalidState(format!("Varint error: {}", e))), 131 136 }; 132 137 133 - let header_varints_len = buf.len() - 1 - remaining2.len(); 134 - let total_header_len = 1 + header_varints_len; 138 + let header_varints_len = buf.len() - 1 - remaining2.len(); 139 + let total_header_len = 1 + header_varints_len; 135 140 let total_len = total_header_len + len; 136 141 137 142 if buf.len() < total_len { ··· 141 146 let commit_bytes = &buf[total_header_len..total_len]; 142 147 let commit: StarCommit = serde_ipld_dagcbor::from_slice(commit_bytes) 143 148 .map_err(|e| StarError::Cbor(e.to_string()))?; 144 - 145 - let _ = ver; 149 + 150 + let _ = ver; 146 151 147 152 let mut stack = Vec::new(); 148 153 if let Some(root_cid) = commit.data { ··· 152 157 }); 153 158 } 154 159 155 - self.state = State::Body { 160 + self.state = State::Body { 156 161 stack, 157 - current_len: None 162 + current_len: None, 158 163 }; 159 164 Ok(Some((total_len, StarItem::Commit(commit)))) 160 165 } ··· 205 210 let consumed = buf.len() - remaining.len(); 206 211 *current_len = Some(l); 207 212 Ok(Some((consumed, l))) 208 - }, 213 + } 209 214 Err(unsigned_varint::decode::Error::Insufficient) => Ok(None), 210 215 Err(e) => Err(StarError::InvalidState(format!("Varint error: {}", e))), 211 216 } 212 217 } 213 218 214 - fn process_node(block_bytes: &[u8], expected: Option<Cid>, expected_height: Option<u32>, stack: &mut Vec<StackItem>) -> Result<Option<StarItem>> { 219 + fn process_node( 220 + block_bytes: &[u8], 221 + expected: Option<Cid>, 222 + expected_height: Option<u32>, 223 + stack: &mut Vec<StackItem>, 224 + ) -> Result<Option<StarItem>> { 215 225 let node: StarMstNode = serde_ipld_dagcbor::from_slice(block_bytes) 216 226 .map_err(|e| StarError::Cbor(e.to_string()))?; 217 227 ··· 237 247 .map_err(|e| StarError::Cbor(e.to_string()))?; 238 248 239 249 let hash = Sha256::digest(&bytes); 240 - let cid = Cid::new_v1( 241 - 0x71, 242 - cid::multihash::Multihash::wrap(0x12, &hash)?, 243 - ); 250 + let cid = Cid::new_v1(0x71, cid::multihash::Multihash::wrap(0x12, &hash)?); 244 251 if let Some(exp) = expected { 245 252 if cid != exp { 246 253 return Err(StarError::VerificationFailed { ··· 260 267 261 268 // Push children in reverse 262 269 let child_expected_height = height.checked_sub(1); 263 - 270 + 264 271 if height > 0 { 265 272 let next_h = child_expected_height.unwrap(); 266 - 273 + 267 274 for i in (0..node.e.len()).rev() { 268 275 let e = &node.e[i]; 269 276 let key = entry_keys[i].clone(); 270 277 271 278 if e.t_archived == Some(true) { 272 - stack.push(StackItem::Node { 279 + stack.push(StackItem::Node { 273 280 expected: e.t, 274 - expected_height: Some(next_h), 281 + expected_height: Some(next_h), 275 282 }); 276 283 } 277 284 ··· 286 293 } 287 294 288 295 if node.l_archived == Some(true) { 289 - stack.push(StackItem::Node { 296 + stack.push(StackItem::Node { 290 297 expected: node.l, 291 298 expected_height: Some(next_h), 292 299 }); 293 300 } 294 301 } else { 295 302 // Height 0: Push records only 296 - for i in (0..node.e.len()).rev() { 303 + for i in (0..node.e.len()).rev() { 297 304 let e = &node.e[i]; 298 305 let key = entry_keys[i].clone(); 299 - 306 + 300 307 if e.v_archived == Some(true) { 301 308 let implicit_index = if e.v.is_none() { Some(i) } else { None }; 302 309 stack.push(StackItem::Record { ··· 311 318 Ok(Some(StarItem::Node(node))) 312 319 } 313 320 314 - fn process_record(block_bytes: &[u8], key: Vec<u8>, expected: Option<Cid>, implicit_index: Option<usize>, stack: &mut Vec<StackItem>) -> Result<Option<StarItem>> { 321 + fn process_record( 322 + block_bytes: &[u8], 323 + key: Vec<u8>, 324 + expected: Option<Cid>, 325 + implicit_index: Option<usize>, 326 + stack: &mut Vec<StackItem>, 327 + ) -> Result<Option<StarItem>> { 315 328 let hash = Sha256::digest(block_bytes); 316 329 let cid = Cid::new_v1(0x71, cid::multihash::Multihash::wrap(0x12, &hash)?); 317 330
+41 -25
src/ser.rs
··· 1 1 use crate::error::Result; 2 + use crate::error::StarError; 2 3 use crate::types::{StarCommit, StarItem, StarMstNode}; 3 4 use crate::validation::validate_node_structure; 4 - use crate::error::StarError; 5 5 use std::io::Write; 6 6 7 7 pub struct StarEncoder; ··· 15 15 16 16 pub fn write_header<W: Write>(commit: &StarCommit, dst: &mut W) -> Result<()> { 17 17 dst.write_all(&[0x2A])?; 18 - 18 + 19 19 Self::write_varint(1, dst)?; 20 - 20 + 21 21 let commit_bytes = serde_ipld_dagcbor::to_vec(commit) 22 22 .map_err(|e| crate::error::StarError::Cbor(e.to_string()))?; 23 - 23 + 24 24 Self::write_varint(commit_bytes.len(), dst)?; 25 25 dst.write_all(&commit_bytes)?; 26 - 26 + 27 27 Ok(()) 28 28 } 29 29 30 30 pub fn write_node<W: Write>(node: &StarMstNode, dst: &mut W) -> Result<()> { 31 31 let node_bytes = serde_ipld_dagcbor::to_vec(node) 32 32 .map_err(|e| crate::error::StarError::Cbor(e.to_string()))?; 33 - 33 + 34 34 Self::write_varint(node_bytes.len(), dst)?; 35 35 dst.write_all(&node_bytes)?; 36 - 36 + 37 37 Ok(()) 38 38 } 39 39 40 40 pub fn write_record<W: Write>(record_bytes: &[u8], dst: &mut W) -> Result<()> { 41 41 Self::write_varint(record_bytes.len(), dst)?; 42 42 dst.write_all(record_bytes)?; 43 - 43 + 44 44 Ok(()) 45 45 } 46 46 } ··· 53 53 use bytes::BufMut; 54 54 // BytesMut::writer() returns an impl Write 55 55 let mut writer = dst.writer(); 56 - 56 + 57 57 match item { 58 58 StarItem::Commit(c) => Self::write_header(&c, &mut writer), 59 59 StarItem::Node(n) => Self::write_node(&n, &mut writer), ··· 61 61 if let Some(bytes) = content { 62 62 Self::write_record(&bytes, &mut writer) 63 63 } else { 64 - Err(crate::error::StarError::InvalidState("Cannot serialize record without content".into())) 64 + Err(crate::error::StarError::InvalidState( 65 + "Cannot serialize record without content".into(), 66 + )) 65 67 } 66 68 } 67 69 } ··· 96 98 self.validator.accept_record(record_bytes)?; 97 99 StarEncoder::write_record(record_bytes, &mut self.writer) 98 100 } 99 - 101 + 100 102 pub fn finish(self) -> Result<W> { 101 103 if !self.validator.is_done() { 102 - return Err(crate::error::StarError::InvalidState("Incomplete tree".into())); 104 + return Err(crate::error::StarError::InvalidState( 105 + "Incomplete tree".into(), 106 + )); 103 107 } 104 108 Ok(self.writer) 105 109 } ··· 128 132 129 133 impl StarValidator { 130 134 pub fn new() -> Self { 131 - Self { state: ValidatorState::Header } 135 + Self { 136 + state: ValidatorState::Header, 137 + } 132 138 } 133 139 134 140 pub fn accept_header(&mut self, commit: &StarCommit) -> Result<()> { ··· 141 147 }; 142 148 self.state = ValidatorState::Body { stack }; 143 149 Ok(()) 144 - }, 145 - _ => Err(StarError::InvalidState("Header already written or invalid state".into())), 150 + } 151 + _ => Err(StarError::InvalidState( 152 + "Header already written or invalid state".into(), 153 + )), 146 154 } 147 155 } 148 156 ··· 150 158 match &mut self.state { 151 159 ValidatorState::Body { stack } => { 152 160 if stack.is_empty() { 153 - return Err(StarError::InvalidState("Unexpected node: tree is complete".into())); 161 + return Err(StarError::InvalidState( 162 + "Unexpected node: tree is complete".into(), 163 + )); 154 164 } 155 - 165 + 156 166 let expectation = stack.pop().unwrap(); 157 167 let expected_height = match expectation { 158 168 Expectation::Record => { 159 169 return Err(StarError::InvalidState("Expected record, got node".into())); 160 - }, 170 + } 161 171 Expectation::Root => None, 162 172 Expectation::Node { height } => Some(height), 163 173 }; ··· 166 176 let (height, _) = validate_node_structure(node, expected_height)?; 167 177 168 178 let child_height = if height > 0 { height - 1 } else { 0 }; 169 - 179 + 170 180 for e in node.e.iter().rev() { 171 181 if e.t_archived == Some(true) { 172 - stack.push(Expectation::Node { height: child_height }); 182 + stack.push(Expectation::Node { 183 + height: child_height, 184 + }); 173 185 } 174 186 if e.v_archived == Some(true) { 175 187 stack.push(Expectation::Record); ··· 177 189 } 178 190 179 191 if node.l_archived == Some(true) { 180 - stack.push(Expectation::Node { height: child_height }); 192 + stack.push(Expectation::Node { 193 + height: child_height, 194 + }); 181 195 } 182 196 183 197 Ok(()) 184 - }, 198 + } 185 199 _ => Err(StarError::InvalidState("Invalid state for node".into())), 186 200 } 187 201 } ··· 190 204 match &mut self.state { 191 205 ValidatorState::Body { stack } => { 192 206 if stack.is_empty() { 193 - return Err(StarError::InvalidState("Unexpected record: tree is complete".into())); 207 + return Err(StarError::InvalidState( 208 + "Unexpected record: tree is complete".into(), 209 + )); 194 210 } 195 211 match stack.pop().unwrap() { 196 212 Expectation::Record => Ok(()), 197 213 _ => Err(StarError::InvalidState("Expected node, got record".into())), 198 214 } 199 - }, 215 + } 200 216 _ => Err(StarError::InvalidState("Invalid state for record".into())), 201 217 } 202 218 } 203 - 219 + 204 220 pub fn is_done(&self) -> bool { 205 221 match &self.state { 206 222 ValidatorState::Body { stack } => stack.is_empty(),
+2 -2
src/validation.rs
··· 1 1 use crate::error::{Result, StarError}; 2 - use crate::types::{calculate_height, StarMstNode}; 2 + use crate::types::{StarMstNode, calculate_height}; 3 3 4 4 /// Validates the structure of a STAR MST node. 5 - /// 5 + /// 6 6 /// Checks: 7 7 /// - Key height consistency (all keys must have same height) 8 8 /// - Height matching against expectation