Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

bytes

and hashbrowns

authored by

phil and committed by tangled.org 6735d5c2 b8cb10fa

+192 -303
+23 -38
Cargo.lock
··· 27 27 ] 28 28 29 29 [[package]] 30 + name = "allocator-api2" 31 + version = "0.2.21" 32 + source = "registry+https://github.com/rust-lang/crates.io-index" 33 + checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" 34 + 35 + [[package]] 30 36 name = "anes" 31 37 version = "0.1.6" 32 38 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 126 132 ] 127 133 128 134 [[package]] 129 - name = "bincode" 130 - version = "2.0.1" 131 - source = "registry+https://github.com/rust-lang/crates.io-index" 132 - checksum = "36eaf5d7b090263e8150820482d5d93cd964a81e4019913c972f4edcc6edb740" 133 - dependencies = [ 134 - "bincode_derive", 135 - "serde", 136 - "unty", 137 - ] 138 - 139 - [[package]] 140 - name = "bincode_derive" 141 - version = "2.0.1" 142 - source = "registry+https://github.com/rust-lang/crates.io-index" 143 - checksum = "bf95709a440f45e986983918d0e8a1f30a9b1df04918fc828670606804ac3c09" 144 - dependencies = [ 145 - "virtue", 146 - ] 147 - 148 - [[package]] 149 135 name = "bitflags" 150 136 version = "2.9.4" 151 137 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 174 160 175 161 [[package]] 176 162 name = "bytes" 177 - version = "1.10.1" 163 + version = "1.11.0" 178 164 source = "registry+https://github.com/rust-lang/crates.io-index" 179 - checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" 165 + checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" 180 166 181 167 [[package]] 182 168 name = "byteview" ··· 562 548 ] 563 549 564 550 [[package]] 551 + name = "foldhash" 552 + version = "0.2.0" 553 + source = "registry+https://github.com/rust-lang/crates.io-index" 554 + checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" 555 + 556 + [[package]] 565 557 name = "futures" 566 558 version = "0.3.31" 567 559 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 700 692 version = "0.16.1" 701 693 source = "registry+https://github.com/rust-lang/crates.io-index" 702 694 checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" 695 + dependencies = [ 696 + "allocator-api2", 697 + "equivalent", 698 + "foldhash", 699 + ] 703 700 704 701 [[package]] 705 702 name = "heck" ··· 853 850 checksum = "b875f1dfe14f557f805b167fb9b0fc54c5560c7a4bd6ae02535b2846f276a8cb" 854 851 dependencies = [ 855 852 "byteorder-lite", 853 + "bytes", 856 854 "byteview", 857 855 "crossbeam-skiplist", 858 856 "enum_dispatch", ··· 1146 1144 name = "repo-stream" 1147 1145 version = "0.2.2" 1148 1146 dependencies = [ 1149 - "bincode", 1147 + "bytes", 1148 + "cid", 1150 1149 "clap", 1151 1150 "criterion", 1152 1151 "env_logger", 1153 1152 "fjall", 1154 - "futures", 1155 - "futures-core", 1156 - "ipld-core", 1153 + "hashbrown 0.16.1", 1157 1154 "iroh-car", 1158 1155 "log", 1159 1156 "mimalloc", ··· 1505 1502 checksum = "eb066959b24b5196ae73cb057f45598450d2c5f71460e98c49b738086eff9c06" 1506 1503 1507 1504 [[package]] 1508 - name = "unty" 1509 - version = "0.0.4" 1510 - source = "registry+https://github.com/rust-lang/crates.io-index" 1511 - checksum = "6d49784317cd0d1ee7ec5c716dd598ec5b4483ea832a2dced265471cc0f690ae" 1512 - 1513 - [[package]] 1514 1505 name = "utf8parse" 1515 1506 version = "0.2.2" 1516 1507 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1527 1518 version = "0.9.5" 1528 1519 source = "registry+https://github.com/rust-lang/crates.io-index" 1529 1520 checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" 1530 - 1531 - [[package]] 1532 - name = "virtue" 1533 - version = "0.0.18" 1534 - source = "registry+https://github.com/rust-lang/crates.io-index" 1535 - checksum = "051eb1abcf10076295e815102942cc58f9d5e3b4560e46e53c21e8ff6f3af7b1" 1536 1521 1537 1522 [[package]] 1538 1523 name = "walkdir"
+4 -6
Cargo.toml
··· 7 7 repository = "https://tangled.org/@microcosm.blue/repo-stream" 8 8 9 9 [dependencies] 10 - bincode = { version = "2.0.1", features = ["serde"] } 11 - fjall = { version = "3.0.1", default-features = false } 12 - futures = "0.3.31" 13 - futures-core = "0.3.31" 14 - ipld-core = { version = "0.4.2", features = ["serde"] } 10 + bytes = "1.11.0" 11 + fjall = { version = "3.0.1", default-features = false, features = ["bytes_1"] } 12 + hashbrown = "0.16.1" 13 + cid = { version = "0.11.1", features = ["serde"] } 15 14 iroh-car = "0.5.1" 16 15 log = "0.4.28" 17 - multibase = "0.9.2" 18 16 serde = { version = "1.0.228", features = ["derive"] } 19 17 serde_bytes = "0.11.19" 20 18 serde_ipld_dagcbor = "0.6.4"
+7 -3
benches/huge-car.rs
··· 22 22 let reader = tokio::fs::File::open(filename).await.unwrap(); 23 23 let reader = tokio::io::BufReader::new(reader); 24 24 25 - let mut driver = match Driver::load_car(reader, |block| block.len(), 1024) 26 - .await 27 - .unwrap() 25 + let mut driver = match Driver::load_car( 26 + reader, 27 + |block| block.len().to_le_bytes().to_vec().into(), 28 + 1024, 29 + ) 30 + .await 31 + .unwrap() 28 32 { 29 33 Driver::Memory(_, mem_driver) => mem_driver, 30 34 Driver::Disk(_) => panic!("not doing disk for benchmark"),
+8 -7
benches/non-huge-cars.rs
··· 29 29 } 30 30 31 31 async fn drive_car(bytes: &[u8]) -> usize { 32 - let mut driver = match Driver::load_car(bytes, |block| block.len(), 32) 33 - .await 34 - .unwrap() 35 - { 36 - Driver::Memory(_, mem_driver) => mem_driver, 37 - Driver::Disk(_) => panic!("not benching big cars here"), 38 - }; 32 + let mut driver = 33 + match Driver::load_car(bytes, |block| block.len().to_le_bytes().to_vec().into(), 32) 34 + .await 35 + .unwrap() 36 + { 37 + Driver::Memory(_, mem_driver) => mem_driver, 38 + Driver::Disk(_) => panic!("not benching big cars here"), 39 + }; 39 40 40 41 let mut n = 0; 41 42 while let Some(pairs) = driver.next_chunk(256).await.unwrap() {
+1 -1
examples/read-file/main.rs
··· 24 24 let reader = tokio::io::BufReader::new(reader); 25 25 26 26 let (commit, mut driver) = match DriverBuilder::new() 27 - .with_block_processor(|block| block.len()) 27 + .with_block_processor(|block| block.len().to_ne_bytes().to_vec().into()) 28 28 .load_car(reader) 29 29 .await? 30 30 {
+3 -3
src/disk.rs
··· 18 18 */ 19 19 20 20 use crate::drive::DriveError; 21 + use bytes::Bytes; 21 22 use fjall::config::{CompressionPolicy, PinningPolicy, RestartIntervalPolicy}; 22 23 use fjall::{CompressionType, Database, Error as FjallError, Keyspace, KeyspaceCreateOptions}; 23 24 use std::path::PathBuf; ··· 140 141 141 142 pub(crate) fn put_many( 142 143 &mut self, 143 - kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>, 144 + kv: impl Iterator<Item = (Vec<u8>, Bytes)>, 144 145 ) -> Result<(), DriveError> { 145 146 let mut batch = self.db.batch(); 146 - for pair in kv { 147 - let (k, v) = pair?; 147 + for (k, v) in kv { 148 148 self.stored += v.len(); 149 149 if self.stored > self.max_stored { 150 150 return Err(DiskError::MaxSizeExceeded.into());
+99 -106
src/drive.rs
··· 1 1 //! Consume a CAR from an AsyncRead, producing an ordered stream of records 2 2 3 + use crate::HashMap; 3 4 use crate::disk::{DiskError, DiskStore}; 4 - use crate::process::Processable; 5 - use ipld_core::cid::Cid; 5 + use crate::mst::Node; 6 + use bytes::Bytes; 7 + use cid::Cid; 6 8 use iroh_car::CarReader; 7 - use serde::{Deserialize, Serialize}; 8 - use std::collections::HashMap; 9 9 use std::convert::Infallible; 10 10 use tokio::{io::AsyncRead, sync::mpsc}; 11 11 12 - use crate::mst::{Commit, Node}; 12 + use crate::mst::Commit; 13 13 use crate::walk::{Step, WalkError, Walker}; 14 14 15 15 /// Errors that can happen while consuming and emitting blocks and records ··· 29 29 MissingRoot, 30 30 #[error("Storage error")] 31 31 StorageError(#[from] DiskError), 32 - #[error("Encode error: {0}")] 33 - BincodeEncodeError(#[from] bincode::error::EncodeError), 34 32 #[error("Tried to send on a closed channel")] 35 33 ChannelSendError, // SendError takes <T> which we don't need 36 34 #[error("Failed to join a task: {0}")] 37 35 JoinError(#[from] tokio::task::JoinError), 38 36 } 39 37 40 - #[derive(Debug, thiserror::Error)] 41 - pub enum DecodeError { 42 - #[error(transparent)] 43 - BincodeDecodeError(#[from] bincode::error::DecodeError), 44 - #[error("extra bytes remained after decoding")] 45 - ExtraGarbage, 46 - } 47 - 48 38 /// An in-order chunk of Rkey + (processed) Block pairs 49 - pub type BlockChunk<T> = Vec<(String, T)>; 39 + pub type BlockChunk = Vec<(String, Bytes)>; 50 40 51 - #[derive(Debug, Clone, Serialize, Deserialize)] 52 - pub(crate) enum MaybeProcessedBlock<T> { 41 + #[derive(Debug, Clone)] 42 + pub(crate) enum MaybeProcessedBlock { 53 43 /// A block that's *probably* a Node (but we can't know yet) 54 44 /// 55 45 /// It *can be* a record that suspiciously looks a lot like a node, so we 56 46 /// cannot eagerly turn it into a Node. We only know for sure what it is 57 47 /// when we actually walk down the MST 58 - Raw(Vec<u8>), 48 + Raw(Bytes), 59 49 /// A processed record from a block that was definitely not a Node 60 50 /// 61 51 /// Processing has to be fallible because the CAR can have totally-unused ··· 71 61 /// There's an alternative here, which would be to kick unprocessable blocks 72 62 /// back to Raw, or maybe even a new RawUnprocessable variant. Then we could 73 63 /// surface the typed error later if needed by trying to reprocess. 74 - Processed(T), 75 - } 76 - 77 - impl<T: Processable> Processable for MaybeProcessedBlock<T> { 78 - /// TODO this is probably a little broken 79 - fn get_size(&self) -> usize { 80 - use std::{cmp::max, mem::size_of}; 81 - 82 - // enum is always as big as its biggest member? 83 - let base_size = max(size_of::<Vec<u8>>(), size_of::<T>()); 84 - 85 - let extra = match self { 86 - Self::Raw(bytes) => bytes.len(), 87 - Self::Processed(t) => t.get_size(), 88 - }; 89 - 90 - base_size + extra 91 - } 64 + Processed(Bytes), 92 65 } 93 66 94 - impl<T> MaybeProcessedBlock<T> { 95 - fn maybe(process: fn(Vec<u8>) -> T, data: Vec<u8>) -> Self { 67 + impl MaybeProcessedBlock { 68 + pub(crate) fn maybe(process: fn(Bytes) -> Bytes, data: Bytes) -> Self { 96 69 if Node::could_be(&data) { 97 70 MaybeProcessedBlock::Raw(data) 98 71 } else { 99 72 MaybeProcessedBlock::Processed(process(data)) 100 73 } 101 74 } 75 + pub(crate) fn len(&self) -> usize { 76 + match self { 77 + MaybeProcessedBlock::Raw(b) => b.len(), 78 + MaybeProcessedBlock::Processed(b) => b.len(), 79 + } 80 + } 81 + pub(crate) fn into_bytes(self) -> Bytes { 82 + match self { 83 + MaybeProcessedBlock::Raw(b) => { 84 + let mut owned = b.try_into_mut().unwrap(); 85 + owned.extend_from_slice(&[0x00]); 86 + owned.into() 87 + } 88 + MaybeProcessedBlock::Processed(b) => { 89 + let mut owned = b.try_into_mut().unwrap(); 90 + owned.extend_from_slice(&[0x01]); 91 + owned.into() 92 + } 93 + } 94 + } 95 + pub(crate) fn from_bytes(mut b: Bytes) -> Self { 96 + // TODO: make sure bytes is not empty, that it's explicitly 0 or 1, etc 97 + let suffix = b.split_off(b.len() - 1); 98 + if *suffix == [0x00] { 99 + MaybeProcessedBlock::Raw(b) 100 + } else { 101 + MaybeProcessedBlock::Processed(b) 102 + } 103 + } 102 104 } 103 105 104 106 /// Read a CAR file, buffering blocks in memory or to disk 105 - pub enum Driver<R: AsyncRead + Unpin, T: Processable> { 107 + pub enum Driver<R: AsyncRead + Unpin> { 106 108 /// All blocks fit within the memory limit 107 109 /// 108 110 /// You probably want to check the commit's signature. You can go ahead and 109 111 /// walk the MST right away. 110 - Memory(Commit, MemDriver<T>), 112 + Memory(Commit, MemDriver), 111 113 /// Blocks exceed the memory limit 112 114 /// 113 115 /// You'll need to provide a disk storage to continue. The commit will be 114 116 /// returned and can be validated only once all blocks are loaded. 115 - Disk(NeedDisk<R, T>), 117 + Disk(NeedDisk<R>), 116 118 } 117 119 118 120 /// Builder-style driver setup ··· 127 129 } 128 130 } 129 131 132 + /// Processor that just returns the raw blocks 133 + #[inline] 134 + pub fn noop(block: Bytes) -> Bytes { 135 + block 136 + } 137 + 130 138 impl DriverBuilder { 131 139 /// Begin configuring the driver with defaults 132 140 pub fn new() -> Self { ··· 143 151 /// Set the block processor 144 152 /// 145 153 /// Default: noop, raw blocks will be emitted 146 - pub fn with_block_processor<T: Processable>( 154 + pub fn with_block_processor( 147 155 self, 148 - p: fn(Vec<u8>) -> T, 149 - ) -> DriverBuilderWithProcessor<T> { 156 + block_processor: fn(Bytes) -> Bytes, 157 + ) -> DriverBuilderWithProcessor { 150 158 DriverBuilderWithProcessor { 151 159 mem_limit_mb: self.mem_limit_mb, 152 - block_processor: p, 160 + block_processor, 153 161 } 154 162 } 155 163 /// Begin processing an atproto MST from a CAR file 156 - pub async fn load_car<R: AsyncRead + Unpin>( 157 - &self, 158 - reader: R, 159 - ) -> Result<Driver<R, Vec<u8>>, DriveError> { 160 - Driver::load_car(reader, crate::process::noop, self.mem_limit_mb).await 164 + pub async fn load_car<R: AsyncRead + Unpin>(&self, reader: R) -> Result<Driver<R>, DriveError> { 165 + Driver::load_car(reader, noop, self.mem_limit_mb).await 161 166 } 162 167 } 163 168 ··· 165 170 /// 166 171 /// start from `DriverBuilder` 167 172 #[derive(Debug, Clone)] 168 - pub struct DriverBuilderWithProcessor<T: Processable> { 173 + pub struct DriverBuilderWithProcessor { 169 174 pub mem_limit_mb: usize, 170 - pub block_processor: fn(Vec<u8>) -> T, 175 + pub block_processor: fn(Bytes) -> Bytes, 171 176 } 172 177 173 - impl<T: Processable> DriverBuilderWithProcessor<T> { 178 + impl DriverBuilderWithProcessor { 174 179 /// Set the in-memory size limit, in MiB 175 180 /// 176 181 /// Default: 16 MiB ··· 179 184 self 180 185 } 181 186 /// Begin processing an atproto MST from a CAR file 182 - pub async fn load_car<R: AsyncRead + Unpin>( 183 - &self, 184 - reader: R, 185 - ) -> Result<Driver<R, T>, DriveError> { 187 + pub async fn load_car<R: AsyncRead + Unpin>(&self, reader: R) -> Result<Driver<R>, DriveError> { 186 188 Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await 187 189 } 188 190 } 189 191 190 - impl<R: AsyncRead + Unpin, T: Processable> Driver<R, T> { 192 + impl<R: AsyncRead + Unpin> Driver<R> { 191 193 /// Begin processing an atproto MST from a CAR file 192 194 /// 193 195 /// Blocks will be loaded, processed, and buffered in memory. If the entire ··· 199 201 /// resumed by providing a `SqliteStorage` for on-disk block storage. 200 202 pub async fn load_car( 201 203 reader: R, 202 - process: fn(Vec<u8>) -> T, 204 + process: fn(Bytes) -> Bytes, 203 205 mem_limit_mb: usize, 204 - ) -> Result<Driver<R, T>, DriveError> { 206 + ) -> Result<Driver<R>, DriveError> { 205 207 let max_size = mem_limit_mb * 2_usize.pow(20); 206 208 let mut mem_blocks = HashMap::new(); 207 209 ··· 227 229 continue; 228 230 } 229 231 232 + let data = Bytes::from(data); 233 + 230 234 // remaining possible types: node, record, other. optimistically process 231 235 let maybe_processed = MaybeProcessedBlock::maybe(process, data); 232 236 233 237 // stash (maybe processed) blocks in memory as long as we have room 234 - mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size(); 238 + mem_size += maybe_processed.len(); 235 239 mem_blocks.insert(cid, maybe_processed); 236 240 if mem_size >= max_size { 237 241 return Ok(Driver::Disk(NeedDisk { ··· 275 279 /// work the init function will do. We can drop the CAR reader before walking, 276 280 /// so the sync/async boundaries become a little easier to work around. 277 281 #[derive(Debug)] 278 - pub struct MemDriver<T: Processable> { 279 - blocks: HashMap<Cid, MaybeProcessedBlock<T>>, 282 + pub struct MemDriver { 283 + blocks: HashMap<Cid, MaybeProcessedBlock>, 280 284 walker: Walker, 281 - process: fn(Vec<u8>) -> T, 285 + process: fn(Bytes) -> Bytes, 282 286 } 283 287 284 - impl<T: Processable> MemDriver<T> { 288 + impl MemDriver { 285 289 /// Step through the record outputs, in rkey order 286 - pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> { 290 + pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk>, DriveError> { 287 291 let mut out = Vec::with_capacity(n); 288 292 for _ in 0..n { 289 293 // walk as far as we can until we run out of blocks or find a record ··· 306 310 } 307 311 308 312 /// A partially memory-loaded car file that needs disk spillover to continue 309 - pub struct NeedDisk<R: AsyncRead + Unpin, T: Processable> { 313 + pub struct NeedDisk<R: AsyncRead + Unpin> { 310 314 car: CarReader<R>, 311 315 root: Cid, 312 - process: fn(Vec<u8>) -> T, 316 + process: fn(Bytes) -> Bytes, 313 317 max_size: usize, 314 - mem_blocks: HashMap<Cid, MaybeProcessedBlock<T>>, 318 + mem_blocks: HashMap<Cid, MaybeProcessedBlock>, 315 319 pub commit: Option<Commit>, 316 320 } 317 321 318 - fn encode(v: impl Serialize) -> Result<Vec<u8>, bincode::error::EncodeError> { 319 - bincode::serde::encode_to_vec(v, bincode::config::standard()) 320 - } 321 - 322 - pub(crate) fn decode<T: Processable>(bytes: &[u8]) -> Result<T, DecodeError> { 323 - let (t, n) = bincode::serde::decode_from_slice(bytes, bincode::config::standard())?; 324 - if n != bytes.len() { 325 - return Err(DecodeError::ExtraGarbage); 326 - } 327 - Ok(t) 328 - } 329 - 330 - impl<R: AsyncRead + Unpin, T: Processable + Send + 'static> NeedDisk<R, T> { 322 + impl<R: AsyncRead + Unpin> NeedDisk<R> { 331 323 pub async fn finish_loading( 332 324 mut self, 333 325 mut store: DiskStore, 334 - ) -> Result<(Commit, DiskDriver<T>), DriveError> { 326 + ) -> Result<(Commit, DiskDriver), DriveError> { 335 327 // move store in and back out so we can manage lifetimes 336 328 // dump mem blocks into the store 337 329 store = tokio::task::spawn(async move { 338 330 let kvs = self 339 331 .mem_blocks 340 332 .into_iter() 341 - .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 333 + .map(|(k, v)| (k.to_bytes(), v.into_bytes())); 342 334 343 335 store.put_many(kvs)?; 344 336 Ok::<_, DriveError>(store) 345 337 }) 346 338 .await??; 347 339 348 - let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(1); 340 + let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock)>>(1); 349 341 350 342 let store_worker = tokio::task::spawn_blocking(move || { 351 343 while let Some(chunk) = rx.blocking_recv() { 352 344 let kvs = chunk 353 345 .into_iter() 354 - .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 346 + .map(|(k, v)| (k.to_bytes(), v.into_bytes())); 355 347 store.put_many(kvs)?; 356 348 } 357 349 Ok::<_, DriveError>(store) ··· 372 364 self.commit = Some(c); 373 365 continue; 374 366 } 367 + 368 + let data = Bytes::from(data); 369 + 375 370 // remaining possible types: node, record, other. optimistically process 376 371 // TODO: get the actual in-memory size to compute disk spill 377 372 let maybe_processed = MaybeProcessedBlock::maybe(self.process, data); 378 - mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size(); 373 + mem_size += maybe_processed.len(); 379 374 chunk.push((cid, maybe_processed)); 380 375 if mem_size >= self.max_size { 381 376 // soooooo if we're setting the db cache to max_size and then letting ··· 418 413 } 419 414 420 415 /// MST walker that reads from disk instead of an in-memory hashmap 421 - pub struct DiskDriver<T: Clone> { 422 - process: fn(Vec<u8>) -> T, 416 + pub struct DiskDriver { 417 + process: fn(Bytes) -> Bytes, 423 418 state: Option<BigState>, 424 419 } 425 420 426 421 // for doctests only 427 422 #[doc(hidden)] 428 - pub fn _get_fake_disk_driver() -> DiskDriver<Vec<u8>> { 429 - use crate::process::noop; 423 + pub fn _get_fake_disk_driver() -> DiskDriver { 430 424 DiskDriver { 431 425 process: noop, 432 426 state: None, 433 427 } 434 428 } 435 429 436 - impl<T: Processable + Send + 'static> DiskDriver<T> { 430 + impl DiskDriver { 437 431 /// Walk the MST returning up to `n` rkey + record pairs 438 432 /// 439 433 /// ```no_run 440 - /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop}; 434 + /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, noop}; 441 435 /// # #[tokio::main] 442 436 /// # async fn main() -> Result<(), DriveError> { 443 437 /// # let mut disk_driver = _get_fake_disk_driver(); ··· 449 443 /// # Ok(()) 450 444 /// # } 451 445 /// ``` 452 - pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> { 446 + pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk>, DriveError> { 453 447 let process = self.process; 454 448 455 449 // state should only *ever* be None transiently while inside here ··· 458 452 // the big pain here is that we don't want to leave self.state in an 459 453 // invalid state (None), so all the error paths have to make sure it 460 454 // comes out again. 461 - let (state, res) = tokio::task::spawn_blocking( 462 - move || -> (BigState, Result<BlockChunk<T>, DriveError>) { 455 + let (state, res) = 456 + tokio::task::spawn_blocking(move || -> (BigState, Result<BlockChunk, DriveError>) { 463 457 let mut out = Vec::with_capacity(n); 464 458 465 459 for _ in 0..n { ··· 480 474 } 481 475 482 476 (state, Ok::<_, DriveError>(out)) 483 - }, 484 - ) 485 - .await?; // on tokio JoinError, we'll be left with invalid state :( 477 + }) 478 + .await?; // on tokio JoinError, we'll be left with invalid state :( 486 479 487 480 // *must* restore state before dealing with the actual result 488 481 self.state = Some(state); ··· 499 492 fn read_tx_blocking( 500 493 &mut self, 501 494 n: usize, 502 - tx: mpsc::Sender<Result<BlockChunk<T>, DriveError>>, 503 - ) -> Result<(), mpsc::error::SendError<Result<BlockChunk<T>, DriveError>>> { 495 + tx: mpsc::Sender<Result<BlockChunk, DriveError>>, 496 + ) -> Result<(), mpsc::error::SendError<Result<BlockChunk, DriveError>>> { 504 497 let BigState { store, walker } = self.state.as_mut().expect("valid state"); 505 498 506 499 loop { 507 - let mut out: BlockChunk<T> = Vec::with_capacity(n); 500 + let mut out: BlockChunk = Vec::with_capacity(n); 508 501 509 502 for _ in 0..n { 510 503 // walk as far as we can until we run out of blocks or find a record ··· 546 539 /// benefit over just using `.next_chunk(n)`. 547 540 /// 548 541 /// ```no_run 549 - /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop}; 542 + /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, noop}; 550 543 /// # #[tokio::main] 551 544 /// # async fn main() -> Result<(), DriveError> { 552 545 /// # let mut disk_driver = _get_fake_disk_driver(); ··· 565 558 mut self, 566 559 n: usize, 567 560 ) -> ( 568 - mpsc::Receiver<Result<BlockChunk<T>, DriveError>>, 561 + mpsc::Receiver<Result<BlockChunk, DriveError>>, 569 562 tokio::task::JoinHandle<Self>, 570 563 ) { 571 - let (tx, rx) = mpsc::channel::<Result<BlockChunk<T>, DriveError>>(1); 564 + let (tx, rx) = mpsc::channel::<Result<BlockChunk, DriveError>>(1); 572 565 573 566 // sketch: this worker is going to be allowed to execute without a join handle 574 567 let chan_task = tokio::task::spawn_blocking(move || {
+16 -6
src/lib.rs
··· 27 27 28 28 match DriverBuilder::new() 29 29 .with_mem_limit_mb(10) 30 - .with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size 30 + .with_block_processor( 31 + |rec| rec.len().to_ne_bytes().to_vec().into() 32 + ) // block processing: just extract the raw record size 31 33 .load_car(reader) 32 34 .await? 33 35 { ··· 35 37 // if all blocks fit within memory 36 38 Driver::Memory(_commit, mut driver) => { 37 39 while let Some(chunk) = driver.next_chunk(256).await? { 38 - for (_rkey, size) in chunk { 40 + for (_rkey, bytes) in chunk { 41 + 42 + let (int_bytes, _) = bytes.split_at(size_of::<usize>()); 43 + let size = usize::from_ne_bytes(int_bytes.try_into().unwrap()); 44 + 39 45 total_size += size; 40 46 } 41 47 } ··· 49 55 let (_commit, mut driver) = paused.finish_loading(store).await?; 50 56 51 57 while let Some(chunk) = driver.next_chunk(256).await? { 52 - for (_rkey, size) in chunk { 58 + for (_rkey, bytes) in chunk { 59 + 60 + let (int_bytes, _) = bytes.split_at(size_of::<usize>()); 61 + let size = usize::from_ne_bytes(int_bytes.try_into().unwrap()); 62 + 53 63 total_size += size; 54 64 } 55 65 } ··· 76 86 77 87 pub mod disk; 78 88 pub mod drive; 79 - pub mod process; 80 89 81 90 pub use disk::{DiskBuilder, DiskError, DiskStore}; 82 - pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk}; 91 + pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk, noop}; 83 92 pub use mst::Commit; 84 - pub use process::Processable; 93 + 94 + pub(crate) use hashbrown::HashMap;
+1 -1
src/mst.rs
··· 3 3 //! The primary aim is to work through the **tree** structure. Non-node blocks 4 4 //! are left as raw bytes, for upper levels to parse into DAG-CBOR or whatever. 5 5 6 - use ipld_core::cid::Cid; 6 + use cid::Cid; 7 7 use serde::Deserialize; 8 8 9 9 /// The top-level data object in a repository's tree is a signed commit.
-108
src/process.rs
··· 1 - /*! 2 - Record processor function output trait 3 - 4 - The return type must satisfy the `Processable` trait, which requires: 5 - 6 - - `Clone` because two rkeys can refer to the same record by CID, which may 7 - only appear once in the CAR file. 8 - - `Serialize + DeserializeOwned` so it can be spilled to disk. 9 - 10 - One required function must be implemented, `get_size()`: this should return the 11 - approximate total off-stack size of the type. (the on-stack size will be added 12 - automatically via `std::mem::get_size`). 13 - 14 - Note that it is **not guaranteed** that the `process` function will run on a 15 - block before storing it in memory or on disk: it's not possible to know if a 16 - block is a record without actually walking the MST, so the best we can do is 17 - apply `process` to any block that we know *cannot* be an MST node, and otherwise 18 - store the raw block bytes. 19 - 20 - Here's a silly processing function that just collects 'eyy's found in the raw 21 - record bytes 22 - 23 - ``` 24 - # use repo_stream::Processable; 25 - # use serde::{Serialize, Deserialize}; 26 - #[derive(Debug, Clone, Serialize, Deserialize)] 27 - struct Eyy(usize, String); 28 - 29 - impl Processable for Eyy { 30 - fn get_size(&self) -> usize { 31 - // don't need to compute the usize, it's on the stack 32 - self.1.capacity() // in-mem size from the string's capacity, in bytes 33 - } 34 - } 35 - 36 - fn process(raw: Vec<u8>) -> Vec<Eyy> { 37 - let mut out = Vec::new(); 38 - let to_find = "eyy".as_bytes(); 39 - for i in 0..(raw.len() - 3) { 40 - if &raw[i..(i+3)] == to_find { 41 - out.push(Eyy(i, "eyy".to_string())); 42 - } 43 - } 44 - out 45 - } 46 - ``` 47 - 48 - The memory sizing stuff is a little sketch but probably at least approximately 49 - works. 50 - */ 51 - 52 - use serde::{Serialize, de::DeserializeOwned}; 53 - 54 - /// Output trait for record processing 55 - pub trait Processable: Clone + Serialize + DeserializeOwned { 56 - /// Any additional in-memory size taken by the processed type 57 - /// 58 - /// Do not include stack size (`std::mem::size_of`) 59 - fn get_size(&self) -> usize; 60 - } 61 - 62 - /// Processor that just returns the raw blocks 63 - #[inline] 64 - pub fn noop(block: Vec<u8>) -> Vec<u8> { 65 - block 66 - } 67 - 68 - impl Processable for u8 { 69 - fn get_size(&self) -> usize { 70 - 0 71 - } 72 - } 73 - 74 - impl Processable for usize { 75 - fn get_size(&self) -> usize { 76 - 0 // no additional space taken, just its stack size (newtype is free) 77 - } 78 - } 79 - 80 - impl Processable for String { 81 - fn get_size(&self) -> usize { 82 - self.capacity() 83 - } 84 - } 85 - 86 - impl<Item: Sized + Processable> Processable for Vec<Item> { 87 - fn get_size(&self) -> usize { 88 - let slot_size = std::mem::size_of::<Item>(); 89 - let direct_size = slot_size * self.capacity(); 90 - let items_referenced_size: usize = self.iter().map(|item| item.get_size()).sum(); 91 - direct_size + items_referenced_size 92 - } 93 - } 94 - 95 - impl<Item: Processable> Processable for Option<Item> { 96 - fn get_size(&self) -> usize { 97 - self.as_ref().map(|item| item.get_size()).unwrap_or(0) 98 - } 99 - } 100 - 101 - impl<Item: Processable, Error: Processable> Processable for Result<Item, Error> { 102 - fn get_size(&self) -> usize { 103 - match self { 104 - Ok(item) => item.get_size(), 105 - Err(err) => err.get_size(), 106 - } 107 - } 108 - }
+18 -20
src/walk.rs
··· 1 1 //! Depth-first MST traversal 2 2 3 + use crate::HashMap; 3 4 use crate::disk::DiskStore; 4 - use crate::drive::{DecodeError, MaybeProcessedBlock}; 5 + use crate::drive::MaybeProcessedBlock; 5 6 use crate::mst::Node; 6 - use crate::process::Processable; 7 - use ipld_core::cid::Cid; 7 + use bytes::Bytes; 8 + use cid::Cid; 8 9 use sha2::{Digest, Sha256}; 9 - use std::collections::HashMap; 10 10 use std::convert::Infallible; 11 11 12 12 /// Errors that can happen while walking ··· 20 20 MstError(#[from] MstError), 21 21 #[error("storage error: {0}")] 22 22 StorageError(#[from] fjall::Error), 23 - #[error("Decode error: {0}")] 24 - DecodeError(#[from] DecodeError), 25 23 } 26 24 27 25 /// Errors from invalid Rkeys ··· 45 43 46 44 /// Walker outputs 47 45 #[derive(Debug)] 48 - pub enum Step<T> { 46 + pub enum Step { 49 47 /// We needed this CID but it's not in the block store 50 48 Missing(Cid), 51 49 /// Reached the end of the MST! yay! 52 50 Finish, 53 51 /// A record was found! 54 - Found { rkey: String, data: T }, 52 + Found { rkey: String, data: Bytes }, 55 53 } 56 54 57 55 #[derive(Debug, Clone, PartialEq)] ··· 176 174 } 177 175 178 176 /// Advance through nodes until we find a record or can't go further 179 - pub fn step<T: Processable>( 177 + pub fn step( 180 178 &mut self, 181 - blocks: &mut HashMap<Cid, MaybeProcessedBlock<T>>, 182 - process: impl Fn(Vec<u8>) -> T, 183 - ) -> Result<Step<T>, WalkError> { 179 + blocks: &mut HashMap<Cid, MaybeProcessedBlock>, 180 + process: impl Fn(Bytes) -> Bytes, 181 + ) -> Result<Step, WalkError> { 184 182 loop { 185 183 let Some(need) = self.stack.last_mut() else { 186 184 log::trace!("tried to walk but we're actually done."); ··· 216 214 }; 217 215 let rkey = rkey.clone(); 218 216 let data = match data { 219 - MaybeProcessedBlock::Raw(data) => process(data.to_vec()), 217 + MaybeProcessedBlock::Raw(data) => process(data.clone()), 220 218 MaybeProcessedBlock::Processed(t) => t.clone(), 221 219 }; 222 220 ··· 237 235 } 238 236 239 237 /// blocking!!!!!! 240 - pub fn disk_step<T: Processable>( 238 + pub fn disk_step( 241 239 &mut self, 242 240 reader: &mut DiskStore, 243 - process: impl Fn(Vec<u8>) -> T, 244 - ) -> Result<Step<T>, WalkError> { 241 + process: impl Fn(Bytes) -> Bytes, 242 + ) -> Result<Step, WalkError> { 245 243 loop { 246 244 let Some(need) = self.stack.last_mut() else { 247 245 log::trace!("tried to walk but we're actually done."); ··· 252 250 &mut Need::Node { depth, cid } => { 253 251 let cid_bytes = cid.to_bytes(); 254 252 log::trace!("need node {cid:?}"); 255 - let Some(block_bytes) = reader.get(&cid_bytes)? else { 253 + let Some(block_slice) = reader.get(&cid_bytes)? else { 256 254 log::trace!("node not found, resting"); 257 255 return Ok(Step::Missing(cid)); 258 256 }; 259 257 260 - let block: MaybeProcessedBlock<T> = crate::drive::decode(&block_bytes)?; 258 + let block = MaybeProcessedBlock::from_bytes(block_slice.into()); // TODO shouldn't fjalls slice already be bytes 261 259 262 260 let MaybeProcessedBlock::Raw(data) = block else { 263 261 return Err(WalkError::BadCommitFingerprint); ··· 274 272 Need::Record { rkey, cid } => { 275 273 log::trace!("need record {cid:?}"); 276 274 let cid_bytes = cid.to_bytes(); 277 - let Some(data_bytes) = reader.get(&cid_bytes)? else { 275 + let Some(data_slice) = reader.get(&cid_bytes)? else { 278 276 log::trace!("record block not found, resting"); 279 277 return Ok(Step::Missing(*cid)); 280 278 }; 281 - let data: MaybeProcessedBlock<T> = crate::drive::decode(&data_bytes)?; 279 + let data = MaybeProcessedBlock::from_bytes(data_slice.into()); 282 280 let rkey = rkey.clone(); 283 281 let data = match data { 284 282 MaybeProcessedBlock::Raw(data) => process(data),
+12 -4
tests/non-huge-cars.rs
··· 12 12 expected_sum: usize, 13 13 expect_profile: bool, 14 14 ) { 15 - let mut driver = match Driver::load_car(bytes, |block| block.len(), 10 /* MiB */) 16 - .await 17 - .unwrap() 15 + let mut driver = match Driver::load_car( 16 + bytes, 17 + |block| block.len().to_ne_bytes().to_vec().into(), 18 + 10, /* MiB */ 19 + ) 20 + .await 21 + .unwrap() 18 22 { 19 23 Driver::Memory(_commit, mem_driver) => mem_driver, 20 24 Driver::Disk(_) => panic!("too big"), ··· 26 30 let mut prev_rkey = "".to_string(); 27 31 28 32 while let Some(pairs) = driver.next_chunk(256).await.unwrap() { 29 - for (rkey, size) in pairs { 33 + for (rkey, bytes) in pairs { 30 34 records += 1; 35 + 36 + let (int_bytes, _) = bytes.split_at(size_of::<usize>()); 37 + let size = usize::from_ne_bytes(int_bytes.try_into().unwrap()); 38 + 31 39 sum += size; 32 40 if rkey == "app.bsky.actor.profile/self" { 33 41 found_bsky_profile = true;