STreaming ARchives: stricter, verifiable, deterministic, highly compressible alternatives to CAR files for atproto repositories.
atproto car
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

further deps simplification

phil ae3c0680 a895d829

+173 -103
+2 -2
Cargo.toml
··· 6 6 [features] 7 7 default = ["async"] 8 8 blocking = [] 9 - async = ["dep:tokio", "dep:tokio-util", "dep:futures"] 9 + async = ["dep:tokio", "dep:tokio-util", "dep:futures", "dep:bytes"] 10 10 11 11 [dependencies] 12 12 # Core ··· 16 16 cid = { version = "0.11", features = ["serde-codec"] } 17 17 thiserror = "2" 18 18 sha2 = "0.10" 19 - bytes = "1" 20 19 unsigned-varint = "0.8" 21 20 22 21 # Async 22 + bytes = { version = "1", optional = true } 23 23 tokio = { version = "1", features = ["io-util", "macros", "rt"], optional = true } 24 24 tokio-util = { version = "0.7", features = ["codec"], optional = true } 25 25 futures = { version = "0.3", optional = true }
+4
readme.md
··· 185 185 186 186 CIDs make up around 20% of uncompressed CAR file sizes. the first approach gets that down to 4%; second 1%. however, CIDs are uncompressible, so it's probably worth measuring the real effect of both approaches on large repos post-compression before completely committing one way or another. 187 187 188 + 189 + 190 + 191 + TODO: note use of multiformats varint instead of LEB128 (stricter)
+5 -3
src/async_io.rs
··· 1 - use crate::error::StarError; 2 1 use crate::parser::StarParser; 3 2 use crate::types::StarItem; 4 - use bytes::BytesMut; 3 + use crate::error::StarError; 4 + use bytes::{BytesMut, Buf}; 5 5 use tokio_util::codec::Decoder; 6 6 7 7 impl Decoder for StarParser { ··· 9 9 type Error = StarError; 10 10 11 11 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { 12 - self.input(src) 12 + let (consumed, item) = self.parse(src)?; 13 + src.advance(consumed); 14 + Ok(item) 13 15 } 14 16 }
+26 -13
src/blocking.rs
··· 1 + use std::io::Read; 2 + use crate::parser::StarParser; 1 3 use crate::error::Result; 2 - use crate::parser::StarParser; 3 4 use crate::types::StarItem; 4 - use bytes::BytesMut; 5 5 use cid::Cid; 6 - use std::io::Read; 6 + use std::collections::VecDeque; 7 7 8 8 pub struct StarIterator<R> { 9 9 reader: R, 10 10 parser: StarParser, 11 - buffer: BytesMut, 11 + buffer: VecDeque<u8>, 12 12 } 13 13 14 14 impl<R: Read> StarIterator<R> { ··· 16 16 Self { 17 17 reader, 18 18 parser: StarParser::new(), 19 - buffer: BytesMut::new(), 19 + buffer: VecDeque::new(), 20 20 } 21 21 } 22 22 ··· 51 51 52 52 fn next(&mut self) -> Option<Self::Item> { 53 53 loop { 54 - match self.parser.input(&mut self.buffer) { 55 - Ok(Some(item)) => return Some(Ok(item)), 56 - Ok(None) => { 57 - if self.buffer.capacity() == 0 { 58 - self.buffer.reserve(8192); 59 - } 54 + // VecDeque must be contiguous to be parsed as a slice. 55 + // make_contiguous() allows us to get a single slice, but might involve a memory copy/move 56 + // if the buffer is wrapped. This is amortized O(1) in many cases but can be O(N). 57 + // However, it solves the "head removal" problem perfectly (pop_front is O(1)). 58 + 59 + self.buffer.make_contiguous(); 60 + let (slice, _) = self.buffer.as_slices(); 61 + 62 + match self.parser.parse(slice) { 63 + Ok((consumed, Some(item))) => { 64 + self.buffer.drain(..consumed); 65 + return Some(Ok(item)); 66 + } 67 + Ok((consumed, None)) => { 68 + self.buffer.drain(..consumed); 69 + 70 + // Read more 71 + // We can't read directly into VecDeque easily without temporary buffer 72 + // because it doesn't expose a mutable slice to uninitialized memory. 60 73 let mut temp = [0u8; 8192]; 61 74 match self.reader.read(&mut temp) { 62 75 Ok(0) => return None, // EOF 63 - Ok(n) => self.buffer.extend_from_slice(&temp[..n]), 76 + Ok(n) => self.buffer.extend(&temp[..n]), 64 77 Err(e) => return Some(Err(e.into())), 65 78 } 66 - } 79 + }, 67 80 Err(e) => return Some(Err(e)), 68 81 } 69 82 }
+84 -53
src/parser.rs
··· 1 1 use crate::error::{Result, StarError}; 2 2 use crate::types::{StarCommit, StarItem, StarMstNode}; 3 - use bytes::{Buf, BytesMut}; 4 3 use cid::Cid; 5 4 use sha2::{Digest, Sha256}; 6 5 ··· 42 41 } 43 42 } 44 43 45 - pub fn input(&mut self, buf: &mut BytesMut) -> Result<Option<StarItem>> { 44 + /// Parses the input buffer. 45 + /// Returns (bytes_consumed, Option<Item>). 46 + pub fn parse(&mut self, buf: &[u8]) -> Result<(usize, Option<StarItem>)> { 47 + let mut consumed = 0; 48 + 49 + // Loop allows state transitions (e.g. Header -> Body) without returning 50 + // but we must be careful to track consumed bytes correctly. 46 51 loop { 47 52 // Check if we need to transition from Body to Done 48 53 let is_body_done = if let State::Body { stack, .. } = &self.state { ··· 53 58 54 59 if is_body_done { 55 60 self.state = State::Done; 56 - return Ok(None); 61 + return Ok((consumed, None)); 57 62 } 63 + 64 + // Slice the buffer to the remaining part 65 + let current_buf = &buf[consumed..]; 58 66 59 67 match &mut self.state { 60 - State::Done => return Ok(None), 68 + State::Done => return Ok((consumed, None)), 61 69 State::Header => { 62 - return self.parse_header(buf) 70 + let (n, item) = match self.parse_header(current_buf)? { 71 + Some((n, item)) => (n, item), 72 + None => return Ok((consumed, None)), 73 + }; 74 + consumed += n; 75 + return Ok((consumed, Some(item))); 63 76 } 64 77 State::Body { stack, current_len } => { 65 78 if Self::process_verification(stack)? { 79 + // Verification doesn't consume bytes, but changes state/stack. 80 + // We continue the loop to try reading the next item immediately. 66 81 continue; 67 82 } 68 83 69 - if let Some(len) = Self::read_length(buf, current_len)? { 70 - let block_bytes = buf.split_to(len); 71 - *current_len = None; 72 - 73 - let item = stack.pop().unwrap(); 74 - match item { 75 - StackItem::Node { expected } => { 76 - return Self::process_node(block_bytes, expected, stack); 77 - }, 78 - StackItem::Record { key, expected, implicit_index } => { 79 - return Self::process_record(block_bytes, key, expected, implicit_index, stack); 80 - }, 81 - _ => return Err(StarError::InvalidState("Unexpected stack item".into())), 82 - } 83 - } else { 84 - return Ok(None); 84 + // Try to read length 85 + let (len_consumed, len) = match Self::read_length(current_buf, current_len)? { 86 + Some((n, len)) => (n, len), 87 + None => return Ok((consumed, None)), 88 + }; 89 + 90 + // Note: read_length advances internal state (current_len) but also returns 91 + // how many bytes of the varint were consumed from current_buf. 92 + // If we have the full length, we now check if we have the body. 93 + 94 + let body_buf = &current_buf[len_consumed..]; 95 + if body_buf.len() < len { 96 + // We read the length varint, but don't have enough bytes for the body. 97 + // We must report the varint itself as consumed so the caller advances, 98 + // and we have stored the in via mutation. 99 + consumed += len_consumed; 100 + return Ok((consumed, None)); 85 101 } 102 + 103 + // We have the body. 104 + let block_bytes = &body_buf[..len]; 105 + 106 + // Reset current_len since we are consuming the block 107 + *current_len = None; 108 + 109 + let item = stack.pop().unwrap(); 110 + let result_item = match item { 111 + StackItem::Node { expected } => { 112 + Self::process_node(block_bytes, expected, stack)? 113 + }, 114 + StackItem::Record { key, expected, implicit_index } => { 115 + Self::process_record(block_bytes, key, expected, implicit_index, stack)? 116 + }, 117 + _ => return Err(StarError::InvalidState("Unexpected stack item".into())), 118 + }; 119 + 120 + consumed += len_consumed + len; 121 + return Ok((consumed, result_item)); 86 122 } 87 123 } 88 124 } 89 125 } 90 126 91 - fn parse_header(&mut self, buf: &mut BytesMut) -> Result<Option<StarItem>> { 127 + // Returns Option<(bytes_consumed, Item)> 128 + fn parse_header(&mut self, buf: &[u8]) -> Result<Option<(usize, StarItem)>> { 92 129 if buf.len() < 1 { 93 130 return Ok(None); 94 131 } ··· 96 133 return Err(StarError::InvalidHeader); 97 134 } 98 135 99 - // We use a slice of the buffer starting after the magic byte 100 - // unsigned_varint operates on &[u8] 101 136 let slice = &buf[1..]; 102 137 103 138 let (ver, remaining1) = match unsigned_varint::decode::usize(slice) { ··· 112 147 Err(e) => return Err(StarError::InvalidState(format!("Varint error: {}", e))), 113 148 }; 114 149 115 - let header_varints_len = buf.len() - 1 - remaining2.len(); // bytes consumed by varints 116 - let total_header_len = 1 + header_varints_len; // + magic byte 150 + let header_varints_len = buf.len() - 1 - remaining2.len(); 151 + let total_header_len = 1 + header_varints_len; 117 152 let total_len = total_header_len + len; 118 153 119 154 if buf.len() < total_len { 120 155 return Ok(None); 121 156 } 122 157 123 - buf.advance(total_header_len); 124 - let commit_bytes = buf.split_to(len); 125 - let commit: StarCommit = serde_ipld_dagcbor::from_slice(&commit_bytes) 158 + // We have the full commit 159 + let commit_bytes = &buf[total_header_len..total_len]; 160 + let commit: StarCommit = serde_ipld_dagcbor::from_slice(commit_bytes) 126 161 .map_err(|e| StarError::Cbor(e.to_string()))?; 127 162 128 - // Check version (conceptually) 129 163 let _ = ver; 130 164 131 165 let mut stack = Vec::new(); ··· 139 173 stack, 140 174 current_len: None 141 175 }; 142 - Ok(Some(StarItem::Commit(commit))) 176 + 177 + Ok(Some((total_len, StarItem::Commit(commit)))) 143 178 } 144 179 145 180 fn process_verification(stack: &mut Vec<StackItem>) -> Result<bool> { ··· 177 212 Ok(false) 178 213 } 179 214 180 - fn read_length(buf: &mut BytesMut, current_len: &mut Option<usize>) -> Result<Option<usize>> { 181 - if current_len.is_none() { 182 - match unsigned_varint::decode::usize(&buf[..]) { 183 - Ok((l, remaining)) => { 184 - let consumed = buf.len() - remaining.len(); 185 - *current_len = Some(l); 186 - buf.advance(consumed); 187 - }, 188 - Err(unsigned_varint::decode::Error::Insufficient) => return Ok(None), 189 - Err(e) => return Err(StarError::InvalidState(format!("Varint error: {}", e))), 190 - } 215 + // Returns Option<(bytes_consumed, length_value)> 216 + // If successful, updates current_len to the read length value 217 + fn read_length(buf: &[u8], current_len: &mut Option<usize>) -> Result<Option<(usize, usize)>> { 218 + // If we already have a length (from a previous partial read), we consumed 0 bytes *now* to get it 219 + if let Some(len) = current_len { 220 + return Ok(Some((0, *len))); 191 221 } 192 222 193 - let len = current_len.unwrap(); 194 - if buf.len() < len { 195 - return Ok(None); 223 + match unsigned_varint::decode::usize(buf) { 224 + Ok((l, remaining)) => { 225 + let consumed = buf.len() - remaining.len(); 226 + *current_len = Some(l); 227 + Ok(Some((consumed, l))) 228 + }, 229 + Err(unsigned_varint::decode::Error::Insufficient) => Ok(None), 230 + Err(e) => Err(StarError::InvalidState(format!("Varint error: {}", e))), 196 231 } 197 - Ok(Some(len)) 198 232 } 199 233 200 - fn process_node(block_bytes: BytesMut, expected: Option<Cid>, stack: &mut Vec<StackItem>) -> Result<Option<StarItem>> { 201 - let node: StarMstNode = serde_ipld_dagcbor::from_slice(&block_bytes) 234 + fn process_node(block_bytes: &[u8], expected: Option<Cid>, stack: &mut Vec<StackItem>) -> Result<Option<StarItem>> { 235 + let node: StarMstNode = serde_ipld_dagcbor::from_slice(block_bytes) 202 236 .map_err(|e| StarError::Cbor(e.to_string()))?; 203 237 204 - // Check for implicit records 205 238 let mut has_implicit = false; 206 239 for e in &node.e { 207 240 if e.v_archived == Some(true) && e.v.is_none() { ··· 236 269 }); 237 270 } 238 271 239 - // Reconstruct keys 240 272 let mut prev_key_bytes = Vec::new(); 241 273 let mut entry_keys = Vec::new(); 242 274 for e in &node.e { ··· 250 282 prev_key_bytes = key; 251 283 } 252 284 253 - // Push children in reverse 254 285 for i in (0..node.e.len()).rev() { 255 286 let e = &node.e[i]; 256 287 let key = entry_keys[i].clone(); ··· 276 307 Ok(Some(StarItem::Node(node))) 277 308 } 278 309 279 - fn process_record(block_bytes: BytesMut, key: Vec<u8>, expected: Option<Cid>, implicit_index: Option<usize>, stack: &mut Vec<StackItem>) -> Result<Option<StarItem>> { 280 - let hash = Sha256::digest(&block_bytes); 310 + fn process_record(block_bytes: &[u8], key: Vec<u8>, expected: Option<Cid>, implicit_index: Option<usize>, stack: &mut Vec<StackItem>) -> Result<Option<StarItem>> { 311 + let hash = Sha256::digest(block_bytes); 281 312 let cid = Cid::new_v1(0x71, cid::multihash::Multihash::wrap(0x12, &hash)?); 282 313 283 314 if let Some(exp) = expected {
+22 -18
src/ser.rs
··· 1 1 use crate::error::Result; 2 2 use crate::types::{StarCommit, StarItem, StarMstNode}; 3 - use bytes::{BufMut, BytesMut}; 3 + use std::io::Write; 4 4 5 5 pub struct StarEncoder; 6 6 7 7 impl StarEncoder { 8 - fn write_varint(val: usize, dst: &mut BytesMut) { 8 + fn write_varint<W: Write>(val: usize, dst: &mut W) -> std::io::Result<()> { 9 9 let mut buf = unsigned_varint::encode::usize_buffer(); 10 10 let encoded = unsigned_varint::encode::usize(val, &mut buf); 11 - dst.extend_from_slice(encoded); 11 + dst.write_all(encoded) 12 12 } 13 13 14 - pub fn write_header(commit: &StarCommit, dst: &mut BytesMut) -> Result<()> { 15 - dst.put_u8(0x2A); 14 + pub fn write_header<W: Write>(commit: &StarCommit, dst: &mut W) -> Result<()> { 15 + dst.write_all(&[0x2A])?; 16 16 17 - Self::write_varint(1, dst); 17 + Self::write_varint(1, dst)?; 18 18 19 19 let commit_bytes = serde_ipld_dagcbor::to_vec(commit) 20 20 .map_err(|e| crate::error::StarError::Cbor(e.to_string()))?; 21 21 22 - Self::write_varint(commit_bytes.len(), dst); 23 - dst.extend_from_slice(&commit_bytes); 22 + Self::write_varint(commit_bytes.len(), dst)?; 23 + dst.write_all(&commit_bytes)?; 24 24 25 25 Ok(()) 26 26 } 27 27 28 - pub fn write_node(node: &StarMstNode, dst: &mut BytesMut) -> Result<()> { 28 + pub fn write_node<W: Write>(node: &StarMstNode, dst: &mut W) -> Result<()> { 29 29 let node_bytes = serde_ipld_dagcbor::to_vec(node) 30 30 .map_err(|e| crate::error::StarError::Cbor(e.to_string()))?; 31 31 32 - Self::write_varint(node_bytes.len(), dst); 33 - dst.extend_from_slice(&node_bytes); 32 + Self::write_varint(node_bytes.len(), dst)?; 33 + dst.write_all(&node_bytes)?; 34 34 35 35 Ok(()) 36 36 } 37 37 38 - pub fn write_record(record_bytes: &[u8], dst: &mut BytesMut) -> Result<()> { 39 - Self::write_varint(record_bytes.len(), dst); 40 - dst.extend_from_slice(record_bytes); 38 + pub fn write_record<W: Write>(record_bytes: &[u8], dst: &mut W) -> Result<()> { 39 + Self::write_varint(record_bytes.len(), dst)?; 40 + dst.write_all(record_bytes)?; 41 41 42 42 Ok(()) 43 43 } ··· 47 47 impl tokio_util::codec::Encoder<StarItem> for StarEncoder { 48 48 type Error = crate::error::StarError; 49 49 50 - fn encode(&mut self, item: StarItem, dst: &mut BytesMut) -> Result<()> { 50 + fn encode(&mut self, item: StarItem, dst: &mut bytes::BytesMut) -> Result<()> { 51 + use bytes::BufMut; 52 + // BytesMut::writer() returns an impl Write 53 + let mut writer = dst.writer(); 54 + 51 55 match item { 52 - StarItem::Commit(c) => Self::write_header(&c, dst), 53 - StarItem::Node(n) => Self::write_node(&n, dst), 56 + StarItem::Commit(c) => Self::write_header(&c, &mut writer), 57 + StarItem::Node(n) => Self::write_node(&n, &mut writer), 54 58 StarItem::Record { content, .. } => { 55 59 if let Some(bytes) = content { 56 - Self::write_record(&bytes, dst) 60 + Self::write_record(&bytes, &mut writer) 57 61 } else { 58 62 Err(crate::error::StarError::InvalidState("Cannot serialize record without content".into())) 59 63 }
+30 -14
src/tests.rs
··· 5 5 use crate::types::{ 6 6 RepoMstEntry, RepoMstNode, StarCommit, StarItem, StarMstEntry, StarMstNode, 7 7 }; 8 - use bytes::BytesMut; 9 8 use cid::Cid; 10 9 use serde_bytes::ByteBuf; 11 10 use sha2::{Digest, Sha256}; ··· 63 62 }; 64 63 65 64 // 5. Serialize to Buffer 66 - let mut buf = BytesMut::new(); 65 + let mut buf = Vec::new(); 67 66 StarEncoder::write_header(&commit, &mut buf).unwrap(); 68 67 StarEncoder::write_node(&star_node, &mut buf).unwrap(); 69 68 StarEncoder::write_record(record_data, &mut buf).unwrap(); ··· 71 70 // 6. Deserialize and Verify 72 71 let mut parser = StarParser::new(); 73 72 73 + // Helper to mimic Decoder loop for tests 74 + fn parse_helper(parser: &mut StarParser, buf: &mut Vec<u8>, offset: &mut usize) -> Option<StarItem> { 75 + let (consumed, item) = parser.parse(&buf[*offset..]).unwrap(); 76 + *offset += consumed; 77 + item 78 + } 79 + 80 + let mut offset = 0; 81 + 74 82 // Header 75 - let item1 = parser.input(&mut buf).unwrap().unwrap(); 83 + let item1 = parse_helper(&mut parser, &mut buf, &mut offset).unwrap(); 76 84 match item1 { 77 85 StarItem::Commit(c) => assert_eq!(c, commit), 78 86 _ => panic!("Expected commit"), 79 87 } 80 88 81 89 // Node 82 - let item2 = parser.input(&mut buf).unwrap().unwrap(); 90 + let item2 = parse_helper(&mut parser, &mut buf, &mut offset).unwrap(); 83 91 match item2 { 84 92 StarItem::Node(n) => { 85 93 assert_eq!(n.e[0].v, None); ··· 89 97 } 90 98 91 99 // Record 92 - let item3 = parser.input(&mut buf).unwrap().unwrap(); 100 + let item3 = parse_helper(&mut parser, &mut buf, &mut offset).unwrap(); 93 101 match item3 { 94 102 StarItem::Record { 95 103 key: k, ··· 104 112 } 105 113 106 114 // Done 107 - assert!(parser.input(&mut buf).unwrap().is_none()); 115 + assert!(parse_helper(&mut parser, &mut buf, &mut offset).is_none()); 108 116 } 109 117 110 118 #[test] ··· 140 148 }; 141 149 142 150 // 4. Serialize 143 - let mut buf = BytesMut::new(); 151 + let mut buf = Vec::new(); 144 152 StarEncoder::write_header(&commit, &mut buf).unwrap(); 145 153 StarEncoder::write_node(&star_node, &mut buf).unwrap(); 146 154 StarEncoder::write_record(record_data, &mut buf).unwrap(); 147 155 148 156 // 5. Parse 149 157 let mut parser = StarParser::new(); 150 - parser.input(&mut buf).unwrap(); // Header OK 151 - parser.input(&mut buf).unwrap(); // Node OK (verification deferred) 152 - parser.input(&mut buf).unwrap(); // Record OK (verification scheduled) 153 - 154 - // 6. Trigger verification (processing VerifyLayer0) 155 - let result = parser.input(&mut buf); 158 + let mut offset = 0; 159 + 160 + fn parse_helper(parser: &mut StarParser, buf: &mut Vec<u8>, offset: &mut usize) -> Result<Option<StarItem>, crate::error::StarError> { 161 + let (consumed, item) = parser.parse(&buf[*offset..])?; 162 + *offset += consumed; 163 + Ok(item) 164 + } 156 165 166 + parse_helper(&mut parser, &mut buf, &mut offset).unwrap(); // Header OK 167 + parse_helper(&mut parser, &mut buf, &mut offset).unwrap(); // Node OK 168 + parse_helper(&mut parser, &mut buf, &mut offset).unwrap(); // Record OK 169 + 170 + // 6. Trigger verification 171 + let result = parse_helper(&mut parser, &mut buf, &mut offset); 172 + 157 173 assert!(result.is_err()); 158 174 match result.unwrap_err() { 159 - crate::error::StarError::VerificationFailed { .. } => {} 175 + crate::error::StarError::VerificationFailed { .. } => {}, 160 176 e => panic!("Expected VerificationFailed, got {:?}", e), 161 177 } 162 178 }