Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

bench seeking across collection boundaries

phil 74787d95 a28cc59c

+191 -3
+4
Cargo.toml
··· 49 49 name = "huge-car" 50 50 harness = false 51 51 52 + [[bench]] 53 + name = "collections" 54 + harness = false 55 + 52 56 # [[bench]] 53 57 # name = "leading" 54 58 # harness = false
+182
benches/collections.rs
··· 1 + extern crate repo_stream; 2 + use repo_stream::{DriverBuilder, Step}; 3 + use std::collections::HashSet; 4 + use std::path::Path; 5 + 6 + use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; 7 + 8 + use mimalloc::MiMalloc; 9 + #[global_allocator] 10 + static GLOBAL: MiMalloc = MiMalloc; 11 + 12 + const EMPTY_CAR: &[u8] = include_bytes!("../car-samples/empty.car"); 13 + const TINY_CAR: &[u8] = include_bytes!("../car-samples/tiny.car"); 14 + const LITTLE_CAR: &[u8] = include_bytes!("../car-samples/little.car"); 15 + const MIDSIZE_CAR: &[u8] = include_bytes!("../car-samples/midsize.car"); 16 + 17 + /// Walk every record and collect unique collection prefixes via HashSet dedup. 18 + async fn collect_naive(bytes: &[u8]) -> Vec<String> { 19 + let mut mem_car = DriverBuilder::new() 20 + .with_mem_limit_mb(100) 21 + .load_car(bytes) 22 + .await 23 + .unwrap(); 24 + 25 + let mut seen = HashSet::new(); 26 + let mut collections = vec![]; 27 + loop { 28 + match mem_car.next_chunk(256).unwrap() { 29 + Step::End(_) => break, 30 + Step::Value(outputs) => { 31 + for output in outputs { 32 + let collection = output.key.split_once('/').unwrap().0.to_string(); 33 + if seen.insert(collection.clone()) { 34 + collections.push(collection); 35 + } 36 + } 37 + } 38 + } 39 + } 40 + collections 41 + } 42 + 43 + /// Seek past each collection using a sentinel that sorts strictly after any valid key 44 + /// in the collection. Atproto rkeys are capped at 512 chars; 513 tildes exceeds that 45 + /// maximum, so `collection/<513 tildes>` can never equal an actual record key and 46 + /// is guaranteed to be greater than `collection/<512 tildes>` (the max valid key). 47 + async fn collect_seeking(bytes: &[u8]) -> Vec<String> { 48 + // 513 > max rkey length (512), so this is strictly greater than any valid key 49 + let tilde_max = "~".repeat(513); 50 + let mut mem_car = DriverBuilder::new() 51 + .with_mem_limit_mb(100) 52 + .load_car(bytes) 53 + .await 54 + .unwrap(); 55 + 56 + let mut collections = vec![]; 57 + loop { 58 + match mem_car.next().unwrap() { 59 + Step::End(_) => break, 60 + Step::Value(output) => { 61 + let collection = output.key.split_once('/').unwrap().0.to_string(); 62 + collections.push(collection.clone()); 63 + mem_car.seek(&format!("{collection}/{tilde_max}")).unwrap(); 64 + } 65 + } 66 + } 67 + collections 68 + } 69 + 70 + async fn collect_naive_file(path: &Path) -> Vec<String> { 71 + let reader = tokio::io::BufReader::new(tokio::fs::File::open(path).await.unwrap()); 72 + let mut mem_car = DriverBuilder::new() 73 + .with_mem_limit_mb(1024) 74 + .load_car(reader) 75 + .await 76 + .unwrap(); 77 + 78 + let mut seen = HashSet::new(); 79 + let mut collections = vec![]; 80 + loop { 81 + match mem_car.next_chunk(256).unwrap() { 82 + Step::End(_) => break, 83 + Step::Value(outputs) => { 84 + for output in outputs { 85 + let collection = output.key.split_once('/').unwrap().0.to_string(); 86 + if seen.insert(collection.clone()) { 87 + collections.push(collection); 88 + } 89 + } 90 + } 91 + } 92 + } 93 + collections 94 + } 95 + 96 + async fn collect_seeking_file(path: &Path) -> Vec<String> { 97 + let tilde_max = "~".repeat(513); 98 + let reader = tokio::io::BufReader::new(tokio::fs::File::open(path).await.unwrap()); 99 + let mut mem_car = DriverBuilder::new() 100 + .with_mem_limit_mb(1024) 101 + .load_car(reader) 102 + .await 103 + .unwrap(); 104 + 105 + let mut collections = vec![]; 106 + loop { 107 + match mem_car.next().unwrap() { 108 + Step::End(_) => break, 109 + Step::Value(output) => { 110 + let collection = output.key.split_once('/').unwrap().0.to_string(); 111 + collections.push(collection.clone()); 112 + mem_car.seek(&format!("{collection}/{tilde_max}")).unwrap(); 113 + } 114 + } 115 + } 116 + collections 117 + } 118 + 119 + pub fn criterion_benchmark(c: &mut Criterion) { 120 + let rt = tokio::runtime::Builder::new_multi_thread() 121 + .enable_all() 122 + .build() 123 + .expect("Creating runtime failed"); 124 + 125 + let cars = [ 126 + ("empty", EMPTY_CAR), 127 + ("tiny", TINY_CAR), 128 + ("little", LITTLE_CAR), 129 + ("midsize", MIDSIZE_CAR), 130 + ]; 131 + 132 + let mut group = c.benchmark_group("collections"); 133 + 134 + for (name, bytes) in cars { 135 + // Sanity-check: both approaches must return the same collections 136 + let naive = rt.block_on(collect_naive(bytes)); 137 + let mut seeking = rt.block_on(collect_seeking(bytes)); 138 + seeking.sort(); 139 + let mut naive_sorted = naive.clone(); 140 + naive_sorted.sort(); 141 + assert_eq!(naive_sorted, seeking, "approaches disagree for {name}"); 142 + println!("{name}: {naive_sorted:?}"); 143 + 144 + group.bench_with_input(BenchmarkId::new("naive", name), bytes, |b, bytes| { 145 + b.to_async(&rt).iter(async || collect_naive(bytes).await) 146 + }); 147 + group.bench_with_input(BenchmarkId::new("seeking", name), bytes, |b, bytes| { 148 + b.to_async(&rt).iter(async || collect_seeking(bytes).await) 149 + }); 150 + } 151 + 152 + group.finish(); 153 + 154 + if let Ok(huge_car) = std::env::var("HUGE_CAR") { 155 + let path: std::path::PathBuf = huge_car.into(); 156 + 157 + // Sanity-check the huge car too 158 + let naive = rt.block_on(collect_naive_file(&path)); 159 + let mut seeking = rt.block_on(collect_seeking_file(&path)); 160 + seeking.sort(); 161 + let mut naive_sorted = naive.clone(); 162 + naive_sorted.sort(); 163 + assert_eq!(naive_sorted, seeking, "approaches disagree for huge-car"); 164 + println!("huge: {naive_sorted:?}"); 165 + 166 + let mut group = c.benchmark_group("collections-huge"); 167 + 168 + group.bench_with_input(BenchmarkId::new("naive", "huge"), &path, |b, path| { 169 + b.to_async(&rt) 170 + .iter(async || collect_naive_file(path).await) 171 + }); 172 + group.bench_with_input(BenchmarkId::new("seeking", "huge"), &path, |b, path| { 173 + b.to_async(&rt) 174 + .iter(async || collect_seeking_file(path).await) 175 + }); 176 + 177 + group.finish(); 178 + } 179 + } 180 + 181 + criterion_group!(benches, criterion_benchmark); 182 + criterion_main!(benches);
+5 -3
src/mem.rs
··· 28 28 /// 29 29 /// The partial state is returned so the caller can decide what to do 30 30 /// (e.g. resume with disk storage via `PartialCar::finish_loading`). 31 + /// 32 + /// boxed because it's big, to avoid making normal load errors heavy 31 33 #[error("partially loaded car")] 32 - MemoryLimitReached(PartialCar<R>), 34 + MemoryLimitReached(Box<PartialCar<R>>), 33 35 } 34 36 35 37 /// A partially memory-loaded CAR file that hit the memory limit mid-stream. ··· 130 132 mem_blocks.insert(cid.into(), maybe_processed); 131 133 if mem_size >= max_size { 132 134 log::debug!("blocks loaded before memory limit: {block_count}"); 133 - return Err(LoadError::MemoryLimitReached(PartialCar { 135 + return Err(LoadError::MemoryLimitReached(Box::new(PartialCar { 134 136 car, 135 137 root, 136 138 process, 137 139 max_size, 138 140 blocks: mem_blocks, 139 141 commit, 140 - })); 142 + }))); 141 143 } 142 144 } 143 145