Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

some tests for new stuff

phil 58e54858 1d1d4005

+286 -37
+1 -1
Cargo.lock
··· 1195 1195 1196 1196 [[package]] 1197 1197 name = "repo-stream" 1198 - version = "0.4.0" 1198 + version = "0.5.0-alpha.1" 1199 1199 dependencies = [ 1200 1200 "async-channel", 1201 1201 "cid",
+1 -1
Cargo.toml
··· 1 1 [package] 2 2 name = "repo-stream" 3 - version = "0.4.0" 3 + version = "0.5.0-alpha.1" 4 4 edition = "2024" 5 5 license = "MIT OR Apache-2.0" 6 6 description = "Fast and robust atproto CAR file processing"
+42 -33
benches/node-counts.rs
··· 143 143 .build() 144 144 .expect("Creating runtime failed"); 145 145 146 - // let cars = [ 147 - // ("empty", EMPTY_CAR), 148 - // ("tiny", TINY_CAR), 149 - // ("little", LITTLE_CAR), 150 - // ("midsize", MIDSIZE_CAR), 151 - // ]; 146 + let cars = [ 147 + ("empty", EMPTY_CAR), 148 + ("tiny", TINY_CAR), 149 + ("little", LITTLE_CAR), 150 + ("midsize", MIDSIZE_CAR), 151 + ]; 152 152 153 - // // Sanity-check: both approaches agree on record count. 154 - // for (name, bytes) in cars { 155 - // let a = rt.block_on(count_records_filter(bytes)); 156 - // let b = rt.block_on(count_records_separate(bytes)); 157 - // assert_eq!(a, b, "approaches disagree on record count for {name}"); 158 - // let (records, nodes) = rt.block_on(count_records_and_nodes(bytes)); 159 - // println!("{name}: {records} records, {nodes} nodes"); 160 - // } 153 + // Sanity-check: both approaches agree on record count. 154 + for (name, bytes) in cars { 155 + let a = rt.block_on(count_records_filter(bytes)); 156 + let b = rt.block_on(count_records_separate(bytes)); 157 + assert_eq!(a, b, "approaches disagree on record count for {name}"); 158 + let (records, nodes) = rt.block_on(count_records_and_nodes(bytes)); 159 + println!("{name}: {records} records, {nodes} nodes"); 160 + } 161 161 162 - // let mut group = c.benchmark_group("node-counts"); 162 + let mut group = c.benchmark_group("node-counts"); 163 163 164 - // for (name, bytes) in cars { 165 - // group.bench_with_input( 166 - // BenchmarkId::new("records-filter-approach", name), 167 - // bytes, 168 - // |b, bytes| b.to_async(&rt).iter(async || count_records_filter(bytes).await), 169 - // ); 170 - // group.bench_with_input( 171 - // BenchmarkId::new("records-separate-approach", name), 172 - // bytes, 173 - // |b, bytes| b.to_async(&rt).iter(async || count_records_separate(bytes).await), 174 - // ); 175 - // group.bench_with_input( 176 - // BenchmarkId::new("records-and-nodes", name), 177 - // bytes, 178 - // |b, bytes| b.to_async(&rt).iter(async || count_records_and_nodes(bytes).await), 179 - // ); 180 - // } 164 + for (name, bytes) in cars { 165 + group.bench_with_input( 166 + BenchmarkId::new("records-filter-approach", name), 167 + bytes, 168 + |b, bytes| { 169 + b.to_async(&rt) 170 + .iter(async || count_records_filter(bytes).await) 171 + }, 172 + ); 173 + group.bench_with_input( 174 + BenchmarkId::new("records-separate-approach", name), 175 + bytes, 176 + |b, bytes| { 177 + b.to_async(&rt) 178 + .iter(async || count_records_separate(bytes).await) 179 + }, 180 + ); 181 + group.bench_with_input( 182 + BenchmarkId::new("records-and-nodes", name), 183 + bytes, 184 + |b, bytes| { 185 + b.to_async(&rt) 186 + .iter(async || count_records_and_nodes(bytes).await) 187 + }, 188 + ); 189 + } 181 190 182 - // group.finish(); 191 + group.finish(); 183 192 184 193 if let Ok(huge_car) = std::env::var("HUGE_CAR") { 185 194 let path: std::path::PathBuf = huge_car.into();
+55 -1
tests/car-slices.rs
··· 1 1 extern crate repo_stream; 2 - use repo_stream::{DriverBuilder, LoadError, Output, WalkItem}; 2 + use repo_stream::{DriverBuilder, LoadError, Output, SliceError, WalkItem}; 3 3 4 4 const RECORD_SLICE: &[u8] = include_bytes!("../car-samples/slice-one.car"); 5 5 const RECORD_NODE_FIRST_KEY: &[u8] = include_bytes!("../car-samples/slice-node-first-key.car"); ··· 120 120 Some("app.bsky.feed.post/3lbn6of6qxc2a"), 121 121 ) 122 122 .await 123 + } 124 + 125 + /// Test the SliceWalker API directly: walk_slice, proof keys, and SliceError on 126 + /// missing subtrees inside the range. 127 + #[tokio::test] 128 + async fn test_walk_slice_api() { 129 + // Known from test_record_slice_car: the slice contains exactly one record 130 + // ("app.bsky.feed.like/3mcg72x6bi32z") bounded by two MissingRecord neighbours. 131 + let key = "app.bsky.feed.like/3mcg72x6bi32z"; 132 + let expected_preceding = "app.bsky.feed.like/3mcfzfbpaml27"; 133 + let expected_following = "app.bsky.feed.like/3mcga2o2efq27"; 134 + 135 + let mut mem_car = DriverBuilder::new() 136 + .load_car(RECORD_SLICE) 137 + .await 138 + .expect("should load"); 139 + 140 + let mut walker = mem_car.walk_slice(key..=key).unwrap(); 141 + let record = walker.next().unwrap().expect("should find the record"); 142 + assert_eq!(record.key, key); 143 + 144 + // Next call should return None and lock in the following key. 145 + assert!(walker.next().unwrap().is_none()); 146 + 147 + let proof = walker.finish().unwrap(); 148 + assert_eq!(proof.preceding_key.as_deref(), Some(expected_preceding)); 149 + assert_eq!(proof.following_key.as_deref(), Some(expected_following)); 150 + } 151 + 152 + /// A walk_slice range that is empty (both bounds exclude all records) still 153 + /// produces a valid proof via finish(). 154 + #[tokio::test] 155 + async fn test_walk_slice_absent_key() { 156 + // This key is absent from the slice (between the two MissingRecord neighbours). 157 + // SliceWalker should prove absence by finding the bounding neighbours. 158 + let absent = "app.bsky.feed.like/3mcg72x6bi32z-absent"; 159 + 160 + let mut mem_car = DriverBuilder::new() 161 + .load_car(RECORD_SLICE) 162 + .await 163 + .expect("should load"); 164 + 165 + // Use get() which is the idiomatic API for single-key lookup. 166 + let result = mem_car.get(absent); 167 + // Should either return Ok(None) (provably absent) or Err(IncompleteRange) 168 + // depending on whether the slice's MST nodes bound the key. Either is valid; 169 + // we just assert it doesn't panic or return Ok(Some(_)). 170 + match result { 171 + Ok(None) => {} // proven absent 172 + Err(SliceError::IncompleteRange { .. }) => {} // block missing within range 173 + Err(SliceError::MissingNode { .. }) => {} // subtree missing, can't prove 174 + Ok(Some(output)) => panic!("unexpected record for absent key: {}", output.key), 175 + Err(e) => panic!("unexpected error: {e}"), 176 + } 123 177 } 124 178 125 179 #[tokio::test]
+187 -1
tests/non-huge-cars.rs
··· 1 1 extern crate repo_stream; 2 - use repo_stream::{DriverBuilder, Output}; 2 + use repo_stream::{DriverBuilder, Output, WalkItem}; 3 3 4 4 const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car"); 5 5 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); ··· 64 64 async fn test_midsize_car() { 65 65 test_car(MIDSIZE_CAR, 11585, 3741393, true).await 66 66 } 67 + 68 + // --------------------------------------------------------------------------- 69 + // next_chunk_keys tests 70 + // --------------------------------------------------------------------------- 71 + 72 + async fn count_keys(bytes: &[u8]) -> usize { 73 + let mut mem_car = DriverBuilder::new() 74 + .with_mem_limit_mb(10) 75 + .load_car(bytes) 76 + .await 77 + .expect("should fit in memory"); 78 + 79 + let mut count = 0; 80 + let mut prev_key = String::new(); 81 + while let Some(pairs) = mem_car.next_chunk_keys(256).unwrap() { 82 + for (key, _cid) in pairs { 83 + assert!(key > prev_key, "next_chunk_keys keys must be in order"); 84 + prev_key = key; 85 + count += 1; 86 + } 87 + } 88 + count 89 + } 90 + 91 + #[tokio::test] 92 + async fn test_next_chunk_keys_counts() { 93 + assert_eq!(count_keys(EMPTY_CAR).await, 0); 94 + assert_eq!(count_keys(TINY_CAR).await, 8); 95 + assert_eq!(count_keys(LITTLE_CAR).await, 278); 96 + assert_eq!(count_keys(MIDSIZE_CAR).await, 11585); 97 + } 98 + 99 + /// Verify that next_chunk_keys returns the same (key, cid) pairs as next_chunk_strict. 100 + #[tokio::test] 101 + async fn test_next_chunk_keys_agrees_with_strict() { 102 + let mut mc_strict = DriverBuilder::new() 103 + .with_mem_limit_mb(10) 104 + .load_car(TINY_CAR) 105 + .await 106 + .unwrap(); 107 + let mut mc_keys = DriverBuilder::new() 108 + .with_mem_limit_mb(10) 109 + .load_car(TINY_CAR) 110 + .await 111 + .unwrap(); 112 + 113 + let mut from_strict = Vec::new(); 114 + while let Some(chunk) = mc_strict.next_chunk_strict(256).unwrap() { 115 + for output in chunk { 116 + from_strict.push((output.key, output.cid)); 117 + } 118 + } 119 + 120 + let mut from_keys = Vec::new(); 121 + while let Some(pairs) = mc_keys.next_chunk_keys(256).unwrap() { 122 + from_keys.extend(pairs); 123 + } 124 + 125 + assert_eq!(from_strict, from_keys); 126 + } 127 + 128 + // --------------------------------------------------------------------------- 129 + // next_chunk_with_nodes tests 130 + // --------------------------------------------------------------------------- 131 + 132 + async fn with_nodes_counts(bytes: &[u8]) -> (usize, usize) { 133 + let mut mem_car = DriverBuilder::new() 134 + .with_mem_limit_mb(10) 135 + .load_car(bytes) 136 + .await 137 + .expect("should fit in memory"); 138 + 139 + let mut records = 0; 140 + let mut nodes = 0; 141 + let mut first_item_is_node = None; 142 + 143 + while let Some(items) = mem_car.next_chunk_with_nodes(256).unwrap() { 144 + for item in &items { 145 + if first_item_is_node.is_none() { 146 + first_item_is_node = Some(matches!(item, WalkItem::Node { .. })); 147 + } 148 + } 149 + for item in items { 150 + match item { 151 + WalkItem::Record(_) => records += 1, 152 + WalkItem::Node { .. } => nodes += 1, 153 + _ => {} 154 + } 155 + } 156 + } 157 + // The root MST node must always be the first item emitted. 158 + assert_eq!( 159 + first_item_is_node, 160 + Some(true), 161 + "first item from next_chunk_with_nodes must be a Node" 162 + ); 163 + (records, nodes) 164 + } 165 + 166 + #[tokio::test] 167 + async fn test_next_chunk_with_nodes_counts() { 168 + // Record counts must match the strict walk. 169 + let (records, nodes) = with_nodes_counts(EMPTY_CAR).await; 170 + assert_eq!(records, 0); 171 + assert_eq!(nodes, 1, "empty MST still has a root node block"); 172 + 173 + assert_eq!(with_nodes_counts(TINY_CAR).await.0, 8); 174 + assert_eq!(with_nodes_counts(LITTLE_CAR).await.0, 278); 175 + assert_eq!(with_nodes_counts(MIDSIZE_CAR).await.0, 11585); 176 + 177 + // Non-empty CARs have multiple nodes. 178 + assert!(with_nodes_counts(TINY_CAR).await.1 > 1); 179 + assert!(with_nodes_counts(LITTLE_CAR).await.1 > 1); 180 + assert!(with_nodes_counts(MIDSIZE_CAR).await.1 > 1); 181 + } 182 + 183 + // --------------------------------------------------------------------------- 184 + // SliceWalker tests on full CARs 185 + // --------------------------------------------------------------------------- 186 + 187 + #[tokio::test] 188 + async fn test_full_walker() { 189 + for (bytes, expected) in [(EMPTY_CAR, 0), (TINY_CAR, 8), (LITTLE_CAR, 278)] { 190 + let mut mem_car = DriverBuilder::new() 191 + .with_mem_limit_mb(10) 192 + .load_car(bytes) 193 + .await 194 + .unwrap(); 195 + 196 + let mut walker = mem_car.full().unwrap(); 197 + let mut count = 0; 198 + let mut prev_key = String::new(); 199 + while let Some(output) = walker.next().unwrap() { 200 + assert!(output.key > prev_key, "full() keys must be in order"); 201 + prev_key = output.key; 202 + count += 1; 203 + } 204 + assert_eq!(count, expected); 205 + 206 + let proof = walker.finish().unwrap(); 207 + assert!( 208 + proof.preceding_key.is_none(), 209 + "full walk has no preceding key" 210 + ); 211 + assert!( 212 + proof.following_key.is_none(), 213 + "full walk has no following key" 214 + ); 215 + } 216 + } 217 + 218 + #[tokio::test] 219 + async fn test_get_present_key() { 220 + let mut mem_car = DriverBuilder::new() 221 + .with_mem_limit_mb(10) 222 + .load_car(TINY_CAR) 223 + .await 224 + .unwrap(); 225 + 226 + let result = mem_car.get("app.bsky.actor.profile/self").unwrap(); 227 + assert!(result.is_some()); 228 + assert_eq!(result.unwrap().key, "app.bsky.actor.profile/self"); 229 + } 230 + 231 + #[tokio::test] 232 + async fn test_prefix_walker() { 233 + let mut mem_car = DriverBuilder::new() 234 + .with_mem_limit_mb(10) 235 + .load_car(TINY_CAR) 236 + .await 237 + .unwrap(); 238 + 239 + let mut walker = mem_car.prefix("app.bsky.actor.profile").unwrap(); 240 + let mut count = 0; 241 + while let Some(output) = walker.next().unwrap() { 242 + assert!( 243 + output.key.starts_with("app.bsky.actor.profile/"), 244 + "prefix walker must only yield matching keys" 245 + ); 246 + count += 1; 247 + } 248 + assert_eq!( 249 + count, 1, 250 + "tiny.car has exactly one app.bsky.actor.profile record" 251 + ); 252 + }