Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

tests pass but the api is wrong

doing mst slices should require a prefix

doing sparse msts should be its own thing

phil 36107a37 12fe017e

+265 -26
car-samples/slice-node-before.car car-samples/slice-node-first-key.car
car-samples/slice-proving-absence.car

This is a binary file and will not be displayed.

+32
examples/print-tree/main.rs
··· 1 + /*! 2 + Read a CAR slice in memory and show some info about it. 3 + */ 4 + 5 + extern crate repo_stream; 6 + use repo_stream::{Driver, DriverBuilder}; 7 + 8 + type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; 9 + 10 + #[tokio::main] 11 + async fn main() -> Result<()> { 12 + env_logger::init(); 13 + let reader = tokio::io::BufReader::new(tokio::io::stdin()); 14 + 15 + let (commit, driver) = match DriverBuilder::new() 16 + .with_block_processor(|block| block.len().to_ne_bytes().to_vec()) 17 + .load_car(reader) 18 + .await? 19 + { 20 + Driver::Memory(commit, _, mem_driver) => (commit, mem_driver), 21 + Driver::Disk(_) => panic!("this example doesn't handle big CARs"), 22 + }; 23 + 24 + println!( 25 + "\nthis slice is from {}, repo rev {}\n\n", 26 + commit.did, commit.rev 27 + ); 28 + 29 + driver.viz(commit.data)?; 30 + 31 + Ok(()) 32 + }
+18 -2
src/drive.rs
··· 242 242 }; 243 243 let mut walker = Walker::new(root_node); 244 244 245 + // eprintln!("going to edge..."); 245 246 let edge = walker.step_to_edge(&mem_blocks)?; 247 + // eprintln!("got edge? {edge:?}"); 246 248 247 249 Ok(Driver::Memory( 248 250 commit, ··· 274 276 pub struct MemDriver { 275 277 blocks: HashMap<ObjectLink, MaybeProcessedBlock>, 276 278 walker: Walker, 277 - process: fn(Bytes) -> Bytes, 279 + process: fn(Bytes) -> Bytes, // TODO: impl Fn(bytes) -> Bytes? 278 280 next_missing: Option<NodeThing>, 279 281 } 280 282 281 283 impl MemDriver { 284 + pub fn viz(&self, tree: ObjectLink) -> Result<(), WalkError> { 285 + self.walker.viz(&self.blocks, tree) 286 + } 282 287 /// Step through the record outputs, in rkey order 283 288 pub async fn next_chunk(&mut self, n: usize) -> Result<Step<BlockChunk>, DriveError> { 284 - if let Some(missing) = &self.next_missing { 289 + if let Some(ref mut missing) = self.next_missing { 290 + while let Step::Value(sparse_out) = 291 + self.walker.step_sparse(&self.blocks, self.process)? 292 + { 293 + if missing.kind == ThingKind::ChildNode { 294 + *missing = NodeThing { 295 + link: sparse_out.cid.into(), 296 + kind: ThingKind::Record(sparse_out.rkey), 297 + }; 298 + } 299 + } 300 + // TODO: l asdflkja slfkja lkdfj lakjd f 285 301 // TODO: make the walker finish walking to verify no more present blocks (oops sparse tree) 286 302 // HACK: just get the last rkey if it's there -- i think we might actually need to walk for it though 287 303 // ...and walk to verify rkey order of the rest of the nodes anyway?
+1 -1
src/link.rs
··· 36 36 } 37 37 } 38 38 39 - #[derive(Debug, Clone)] 39 + #[derive(Debug, Clone, PartialEq)] 40 40 pub enum ThingKind { 41 41 ChildNode, 42 42 Record(crate::Rkey),
+1 -1
src/mst.rs
··· 47 47 u128::from_be_bytes(Sha256::digest(key).split_at(16).0.try_into().unwrap()).leading_zeros() / 2 48 48 } 49 49 50 - #[derive(Debug)] 50 + #[derive(Debug, Clone)] 51 51 pub(crate) struct MstNode { 52 52 pub depth: Option<Depth>, // known for nodes with entries (required for root) 53 53 pub things: Vec<NodeThing>,
+165 -4
src/walk.rs
··· 35 35 } 36 36 37 37 /// Walker outputs 38 + /// 39 + /// TODO: rename to "Record" or "Entry" or something 38 40 #[derive(Debug, PartialEq)] 39 - pub struct Output { 40 - pub rkey: Rkey, 41 + pub struct Output<T = Bytes> { 42 + pub rkey: Rkey, // TODO: aaa it's not really rkey, it's just "key" (or split to collection/rkey??) 41 43 pub cid: Cid, 42 - pub data: Bytes, 44 + pub data: T, 43 45 } 44 46 45 47 #[derive(Debug, PartialEq)] ··· 67 69 } 68 70 } 69 71 72 + pub fn viz( 73 + &self, 74 + blocks: &HashMap<ObjectLink, MaybeProcessedBlock>, 75 + root_link: ObjectLink, 76 + ) -> Result<(), WalkError> { 77 + let root_block = blocks.get(&root_link).ok_or(WalkError::MissingBlock( 78 + NodeThing { 79 + link: root_link.clone(), 80 + kind: ThingKind::ChildNode, 81 + } 82 + .into(), 83 + ))?; 84 + 85 + let root_node: MstNode = match root_block { 86 + MaybeProcessedBlock::Processed(_) => return Err(WalkError::BadCommitFingerprint), 87 + MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(bytes)?, 88 + }; 89 + 90 + let mut positions = HashMap::new(); 91 + let mut w = Walker::new(root_node.clone()); 92 + 93 + let mut pos_idx = 0; 94 + while let Step::Value(Output { rkey, .. }) = w.step_sparse(blocks, noop)? { 95 + positions.insert(rkey, pos_idx); 96 + pos_idx += 1; 97 + } 98 + 99 + Self::vnext( 100 + root_node.depth.unwrap(), 101 + vec![root_link], 102 + blocks, 103 + &positions, 104 + )?; 105 + 106 + Ok(()) 107 + } 108 + 109 + pub fn vnext( 110 + level: u32, 111 + links: Vec<ObjectLink>, 112 + blocks: &HashMap<ObjectLink, MaybeProcessedBlock>, 113 + positions: &HashMap<Rkey, usize>, 114 + ) -> Result<Vec<usize>, WalkError> { 115 + let mut offsets = Vec::new(); 116 + let mut level_keys = Vec::new(); 117 + let mut child_links = Vec::new(); 118 + 119 + for link in links { 120 + println!( 121 + "\n{level}~{}..", 122 + link.to_bytes() 123 + .iter() 124 + .take(5) 125 + .map(|c| format!("{c:02x}")) 126 + .collect::<Vec<_>>() 127 + .join("") 128 + ); 129 + 130 + let Some(mpb) = blocks.get(&link) else { 131 + // TODO: drop an 'x' for missing node 132 + continue; 133 + }; 134 + let node: MstNode = match mpb { 135 + MaybeProcessedBlock::Processed(_) => return Err(WalkError::BadCommitFingerprint), 136 + MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(bytes)?, 137 + }; 138 + 139 + let mut last_key = "".to_string(); 140 + let mut last_was_record = true; 141 + for thing in node.things { 142 + let mut node_keys = Vec::new(); 143 + 144 + let has = blocks.contains_key(&thing.link); 145 + 146 + match thing.kind { 147 + ThingKind::ChildNode => { 148 + if has { 149 + child_links.push(thing.link); 150 + last_was_record = false; 151 + } 152 + } 153 + ThingKind::Record(key) => { 154 + let us = positions[&key]; 155 + 156 + if !last_was_record && last_key.is_empty() { 157 + let them = positions[&last_key]; 158 + for i in 0..(them - 1) { 159 + if i < (us + 1) { 160 + print!(" "); 161 + } else { 162 + print!("~~"); 163 + } 164 + } 165 + println!("~"); 166 + } 167 + 168 + for _ in 0..us { 169 + print!(" "); 170 + } 171 + if has { 172 + print!("O"); 173 + } else { 174 + print!("x"); 175 + } 176 + println!(" {key}"); 177 + node_keys.push(key.clone()); 178 + last_key = key; 179 + last_was_record = true; 180 + } 181 + } 182 + level_keys.push(node_keys); 183 + } 184 + 185 + offsets.push(1); 186 + } 187 + 188 + if !child_links.is_empty() { 189 + Self::vnext(level - 1, child_links, blocks, positions)?; // TODO use offsets 190 + } 191 + 192 + Ok(offsets) 193 + } 194 + 70 195 fn mpb_step( 71 196 &mut self, 72 197 thing: NodeThing, ··· 156 281 Ok(Step::End(None)) 157 282 } 158 283 284 + /// Advance through nodes, allowing for missing records 285 + pub fn step_sparse( 286 + &mut self, 287 + blocks: &HashMap<ObjectLink, MaybeProcessedBlock>, 288 + process: impl Fn(Bytes) -> Bytes, 289 + ) -> Result<Step<Output<Option<Bytes>>>, WalkError> { 290 + while let Some(NodeThing { link, kind }) = self.next_todo() { 291 + let mut dummy = false; 292 + let mpb = match blocks.get(&link) { 293 + Some(mpb) => mpb, 294 + None => { 295 + if let ThingKind::Record(_) = kind { 296 + dummy = true; 297 + &MaybeProcessedBlock::Processed(vec![]) 298 + } else { 299 + continue; 300 + } 301 + } 302 + }; 303 + if let Some(out) = self.mpb_step(NodeThing { link, kind }, mpb, |bytes| { 304 + if dummy { bytes } else { process(bytes) } 305 + })? { 306 + // eprintln!(" ----- {}", out.rkey); 307 + return Ok(Step::Value(Output { 308 + cid: out.cid, 309 + rkey: out.rkey, 310 + data: if dummy { None } else { Some(out.data) }, 311 + })); 312 + } 313 + } 314 + Ok(Step::End(None)) 315 + } 316 + 159 317 pub fn step_to_edge( 160 318 &mut self, 161 319 blocks: &HashMap<ObjectLink, MaybeProcessedBlock>, ··· 172 330 ant = self.clone(); 173 331 } 174 332 Err(anyother) => return Err(anyother), 175 - Ok(_) => return Ok(rkey_prev), // oop real record, mutant went too far 333 + Ok(z) => { 334 + eprintln!("apparently we are too far at {z:?}"); 335 + return Ok(rkey_prev); // oop real record, mutant went too far 336 + } 176 337 } 177 338 } 178 339 }
+48 -18
tests/car-slices.rs
··· 2 2 use repo_stream::{Driver, Output, Step}; 3 3 4 4 const RECORD_SLICE: &'static [u8] = include_bytes!("../car-samples/slice-one.car"); 5 - const RECORD_NODE_BEFORE: &'static [u8] = include_bytes!("../car-samples/slice-node-before.car"); 5 + const RECORD_NODE_FIRST_KEY: &'static [u8] = 6 + include_bytes!("../car-samples/slice-node-first-key.car"); 6 7 const RECORD_NODE_AFTER: &'static [u8] = include_bytes!("../car-samples/slice-node-after.car"); 7 - // TODO: absense proof (zero records in slice) 8 + const RECORD_NODE_ABSENT: &'static [u8] = 9 + include_bytes!("../car-samples/slice-proving-absence.car"); 8 10 9 11 async fn test_car_slice( 10 12 bytes: &[u8], 11 13 expected_records: usize, 12 14 expected_sum: usize, 13 - expect_preceeding: &str, 14 - expect_rkey: &str, 15 - expect_proceeding: &str, 15 + expect_preceeding: Option<&str>, 16 + expect_rkey: Option<&str>, 17 + expect_proceeding: Option<&str>, 16 18 ) { 17 19 let (mut driver, before) = match Driver::load_car( 18 20 bytes, ··· 26 28 Driver::Disk(_) => panic!("too big"), 27 29 }; 28 30 29 - assert_eq!(before, Some(expect_preceeding.into())); 31 + assert_eq!(before.as_deref(), expect_preceeding); 30 32 31 33 let mut found_records = 0; 32 34 let mut sum = 0; ··· 43 45 let size = usize::from_ne_bytes(int_bytes.try_into().unwrap()); 44 46 45 47 sum += size; 46 - if rkey == expect_rkey { 48 + if Some(rkey.as_str()) == expect_rkey { 47 49 found_expected_rkey = true; 48 50 } 51 + eprintln!("!!!! {rkey}"); 49 52 assert!(rkey > prev_rkey, "rkeys are streamed in order"); 50 53 prev_rkey = rkey; 51 54 } 52 55 } 53 56 Step::End(proceeding) => { 54 - assert_eq!(proceeding, Some(expect_proceeding.into())); 57 + assert_eq!(proceeding.as_deref(), expect_proceeding); 55 58 break; 56 59 } 57 60 } 58 61 } 59 62 60 63 assert_eq!(found_records, expected_records); 61 - assert_eq!(sum, expected_sum); 62 - assert!(found_expected_rkey); 64 + if expected_records > 0 { 65 + assert!(found_expected_rkey); 66 + assert_eq!(sum, expected_sum); 67 + } else { 68 + assert!(!found_expected_rkey); 69 + } 63 70 } 64 71 65 72 #[tokio::test] ··· 68 75 RECORD_SLICE, 69 76 1, 70 77 212, 71 - "app.bsky.feed.like/3mcfzfbpaml27", 72 - "app.bsky.feed.like/3mcg72x6bi32z", 73 - "app.bsky.feed.like/3mcga2o2efq27", 78 + Some("app.bsky.feed.like/3mcfzfbpaml27"), 79 + Some("app.bsky.feed.like/3mcg72x6bi32z"), 80 + Some("app.bsky.feed.like/3mcga2o2efq27"), 74 81 ) 75 82 .await 76 83 } 77 84 78 85 #[tokio::test] 79 - async fn test_record_slice_node_before() { 80 - test_car_slice(RECORD_NODE_BEFORE, 1, 212, "", "", "").await 86 + async fn test_record_slice_node_first_key() { 87 + test_car_slice( 88 + RECORD_NODE_FIRST_KEY, 89 + 1, 90 + 212, 91 + None, 92 + Some("app.bsky.feed.like/3lohfzs6qea24"), 93 + Some("app.bsky.feed.post/3m72vlnelw227"), 94 + ) 95 + .await 81 96 } 82 97 83 98 #[tokio::test] ··· 86 101 RECORD_NODE_AFTER, 87 102 1, 88 103 212, 89 - "app.bsky.feed.like/3mbzi6ttskp2c", 90 - "", 91 - "", 104 + Some("app.bsky.feed.like/3mbzi6ttskp2c"), 105 + Some("app.bsky.feed.like/3mcqqwzsc7x26"), 106 + Some("app.bsky.feed.post/3lbn6of6qxc2a"), 107 + ) 108 + .await 109 + } 110 + 111 + #[tokio::test] 112 + async fn test_record_slice_proving_absence() { 113 + // missing key is `app.bsky.feed.like/3lohfzs6qea23` 114 + // NOTE: repo-stream output here isn't enough info for proof 115 + test_car_slice( 116 + RECORD_NODE_ABSENT, 117 + 0, 118 + 0, 119 + Some("app.bsky.feed.post/3m72vlnelw227"), 120 + None, 121 + None, 92 122 ) 93 123 .await 94 124 }