Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

store raw blocks

redb blows up the disk a bit unsurprisingly

phil f1ba7cb5 4e8617e1

+105 -140
+20 -52
src/disk_drive.rs
··· 4 4 5 5 use crate::disk_walk::{Step, Trip, Walker}; 6 6 use crate::mst::Commit; 7 - use crate::mst::Node; 8 7 9 8 use ipld_core::cid::Cid; 10 - use serde::{Deserialize, Serialize, de::DeserializeOwned}; 9 + use serde::{Serialize, de::DeserializeOwned}; 11 10 12 11 /// Errors that can happen while consuming and emitting blocks and records 13 12 #[derive(Debug, thiserror::Error)] ··· 26 25 Tripped(#[from] Trip), 27 26 } 28 27 29 - #[derive(Debug, Clone, Serialize, Deserialize)] 30 - pub enum MaybeProcessedBlock<T: Clone + Serialize> { 31 - /// A block that's *probably* a Node (but we can't know yet) 32 - /// 33 - /// It *can be* a record that suspiciously looks a lot like a node, so we 34 - /// cannot eagerly turn it into a Node. We only know for sure what it is 35 - /// when we actually walk down the MST 36 - Raw(Vec<u8>), 37 - /// A processed record from a block that was definitely not a Node 38 - /// 39 - /// If we _never_ needed this block, then we may have wasted a bit of effort 40 - /// trying to process it. Oh well. 41 - /// 42 - /// Processing has to be fallible because the CAR can have totally-unused 43 - /// blocks, which can just be garbage. since we're eagerly trying to process 44 - /// record blocks without knowing for sure that they *are* records, we 45 - /// discard any definitely-not-nodes that fail processing and keep their 46 - /// error in the buffer for them. if we later try to retreive them as a 47 - /// record, then we can surface the error. 48 - /// 49 - /// The error type is `String` because we don't really want to put 50 - /// any constraints like `Serialize` on the error type, and `Error` 51 - /// at least requires `Display`. It's a compromise. 52 - ProcessedOk(T), 53 - Unprocessable(String), 54 - } 55 - 56 28 pub trait BlockStore<MPB: Serialize + DeserializeOwned> { 57 - fn put(&self, key: Cid, value: MPB); // unwraps for now 29 + fn put_batch(&self, blocks: Vec<(Cid, MPB)>); // unwraps for now 58 30 fn get(&self, key: Cid) -> Option<MPB>; 59 31 } 60 32 ··· 66 38 SE: Error + 'static, 67 39 S: Stream<Item = CarBlock<SE>>, 68 40 T: Clone + Serialize + DeserializeOwned, 69 - BS: BlockStore<MaybeProcessedBlock<T>>, 41 + BS: BlockStore<Vec<u8>>, 70 42 P: Fn(&[u8]) -> Result<T, PE>, 71 43 PE: Error, 72 44 { 73 45 #[allow(dead_code)] 74 - block_stream: S, 46 + block_stream: Option<S>, 75 47 block_store: BS, 76 48 walker: Walker, 77 49 process: P, ··· 82 54 SE: Error + 'static, 83 55 S: Stream<Item = CarBlock<SE>> + Unpin, 84 56 T: Clone + Serialize + DeserializeOwned, 85 - BS: BlockStore<MaybeProcessedBlock<T>>, 57 + BS: BlockStore<Vec<u8>>, 86 58 P: Fn(&[u8]) -> Result<T, PE>, 87 59 PE: Error, 88 60 { ··· 105 77 /// memory usage. 106 78 pub async fn init( 107 79 root: Cid, 108 - mut block_stream: S, 80 + block_stream: S, 109 81 block_store: BS, 110 82 process: P, 111 83 ) -> Result<(Commit, Self), DriveError> { 112 84 let mut commit = None; 113 85 114 86 log::warn!("init: load blocks"); 87 + 88 + let mut chunked = block_stream.try_chunks(4096); 115 89 116 90 // go ahead and put all blocks in the block store 117 - while let Some((cid, data)) = block_stream 91 + while let Some(chunk) = chunked 118 92 .try_next() 119 93 .await 120 94 .map_err(|e| DriveError::CarBlockError(e.into()))? 121 95 { 122 - if cid == root { 123 - let c: Commit = serde_ipld_dagcbor::from_slice(&data) 124 - .map_err(|e| DriveError::BadCommit(e.into()))?; 125 - commit = Some(c); 126 - } else { 127 - block_store.put( 128 - cid, 129 - if Node::could_be(&data) { 130 - MaybeProcessedBlock::Raw(data) 131 - } else { 132 - match process(&data) { 133 - Ok(t) => MaybeProcessedBlock::ProcessedOk(t), 134 - Err(e) => MaybeProcessedBlock::Unprocessable(e.to_string()), 135 - } 136 - }, 137 - ); 96 + let mut to_insert = Vec::with_capacity(chunk.len()); 97 + for (cid, data) in chunk { 98 + if cid == root { 99 + let c: Commit = serde_ipld_dagcbor::from_slice(&data) 100 + .map_err(|e| DriveError::BadCommit(e.into()))?; 101 + commit = Some(c); 102 + } else { 103 + to_insert.push((cid, data)); 104 + } 138 105 } 106 + block_store.put_batch(to_insert) 139 107 } 140 108 141 109 log::warn!("init: got commit?"); ··· 148 116 log::warn!("init: wrapping up"); 149 117 150 118 let me = Self { 151 - block_stream, 119 + block_stream: None, 152 120 block_store, 153 121 walker, 154 122 process,
+22 -14
src/disk_redb.rs
··· 1 1 use crate::disk_drive::BlockStore; 2 2 use ipld_core::cid::Cid; 3 - use redb::{Database, Error, ReadableTable, TableDefinition, WriteTransaction}; 3 + use redb::{Database, Durability, Error, ReadableDatabase, TableDefinition}; 4 4 use serde::{Serialize, de::DeserializeOwned}; 5 5 use std::path::Path; 6 6 ··· 9 9 pub struct RedbStore { 10 10 #[allow(dead_code)] 11 11 db: Database, 12 - tx: Option<WriteTransaction>, 13 12 } 14 13 15 14 impl RedbStore { ··· 17 16 log::warn!("redb new"); 18 17 let db = Database::create(path)?; 19 18 log::warn!("db created"); 20 - let mut tx = db.begin_write()?; 21 - tx.set_durability(redb::Durability::None).unwrap(); 22 - log::warn!("transaction begun"); 23 - Ok(Self { db, tx: Some(tx) }) 19 + Ok(Self { db }) 24 20 } 25 21 } 26 22 27 23 impl Drop for RedbStore { 28 24 fn drop(&mut self) { 29 - let tx = self.tx.take(); 30 - tx.unwrap().abort().unwrap(); 25 + let mut tx = self.db.begin_write().unwrap(); 26 + tx.set_durability(Durability::None).unwrap(); 27 + tx.delete_table(TABLE).unwrap(); 28 + tx.commit().unwrap(); 31 29 } 32 30 } 33 31 34 32 impl<MPB: Serialize + DeserializeOwned> BlockStore<MPB> for RedbStore { 35 - fn put(&self, c: Cid, t: MPB) { 36 - let key_bytes = c.to_bytes(); 37 - let val_bytes = bincode::serde::encode_to_vec(t, bincode::config::standard()).unwrap(); 33 + fn put_batch(&self, blocks: Vec<(Cid, MPB)>) { 34 + let mut tx = self.db.begin_write().unwrap(); 35 + tx.set_durability(Durability::None).unwrap(); 36 + 38 37 { 39 - let mut table = self.tx.as_ref().unwrap().open_table(TABLE).unwrap(); 40 - table.insert(&*key_bytes, &*val_bytes).unwrap(); 38 + let mut table = tx.open_table(TABLE).unwrap(); 39 + for (cid, t) in blocks { 40 + let key_bytes = cid.to_bytes(); 41 + let val_bytes = 42 + bincode::serde::encode_to_vec(t, bincode::config::standard()).unwrap(); 43 + table.insert(&*key_bytes, &*val_bytes).unwrap(); 44 + } 41 45 } 46 + 47 + tx.commit().unwrap(); 42 48 } 49 + 43 50 fn get(&self, c: Cid) -> Option<MPB> { 44 51 let key_bytes = c.to_bytes(); 45 - let table = self.tx.as_ref().unwrap().open_table(TABLE).unwrap(); 52 + let tx = self.db.begin_read().unwrap(); 53 + let table = tx.open_table(TABLE).unwrap(); 46 54 let maybe_val_bytes = table.get(&*key_bytes).unwrap()?; 47 55 let (t, n): (MPB, usize) = 48 56 bincode::serde::decode_from_slice(maybe_val_bytes.value(), bincode::config::standard())
+57 -57
src/disk_sqlite.rs
··· 1 - use crate::disk_drive::BlockStore; 2 - use ipld_core::cid::Cid; 3 - use rusqlite::{Connection, OptionalExtension, Result}; 4 - use serde::{Serialize, de::DeserializeOwned}; 5 - use std::path::Path; 1 + // use crate::disk_drive::BlockStore; 2 + // use ipld_core::cid::Cid; 3 + // use rusqlite::{Connection, OptionalExtension, Result}; 4 + // use serde::{Serialize, de::DeserializeOwned}; 5 + // use std::path::Path; 6 6 7 - pub struct SqliteStore { 8 - conn: Connection, 9 - } 7 + // pub struct SqliteStore { 8 + // conn: Connection, 9 + // } 10 10 11 - impl SqliteStore { 12 - pub fn new(path: impl AsRef<Path>) -> Result<Self> { 13 - let conn = Connection::open(path)?; 14 - conn.pragma_update(None, "journal_mode", "WAL")?; 15 - conn.pragma_update(None, "synchronous", "OFF")?; 16 - conn.pragma_update(None, "cache_size", (-32 * 2_i64.pow(10)).to_string())?; 17 - conn.execute( 18 - "CREATE TABLE blocks ( 19 - key BLOB PRIMARY KEY NOT NULL, 20 - val BLOB NOT NULL 21 - ) WITHOUT ROWID", 22 - (), 23 - )?; 11 + // impl SqliteStore { 12 + // pub fn new(path: impl AsRef<Path>) -> Result<Self> { 13 + // let conn = Connection::open(path)?; 14 + // conn.pragma_update(None, "journal_mode", "WAL")?; 15 + // conn.pragma_update(None, "synchronous", "OFF")?; 16 + // conn.pragma_update(None, "cache_size", (-32 * 2_i64.pow(10)).to_string())?; 17 + // conn.execute( 18 + // "CREATE TABLE blocks ( 19 + // key BLOB PRIMARY KEY NOT NULL, 20 + // val BLOB NOT NULL 21 + // ) WITHOUT ROWID", 22 + // (), 23 + // )?; 24 24 25 - Ok(Self { conn }) 26 - } 27 - } 25 + // Ok(Self { conn }) 26 + // } 27 + // } 28 28 29 - impl Drop for SqliteStore { 30 - fn drop(&mut self) { 31 - self.conn.execute("DROP TABLE blocks", ()).unwrap(); 32 - } 33 - } 29 + // impl Drop for SqliteStore { 30 + // fn drop(&mut self) { 31 + // self.conn.execute("DROP TABLE blocks", ()).unwrap(); 32 + // } 33 + // } 34 34 35 - impl<MPB: Serialize + DeserializeOwned> BlockStore<MPB> for SqliteStore { 36 - fn put(&self, c: Cid, t: MPB) { 37 - let key_bytes = c.to_bytes(); 38 - let val_bytes = bincode::serde::encode_to_vec(t, bincode::config::standard()).unwrap(); 35 + // impl<MPB: Serialize + DeserializeOwned> BlockStore<MPB> for SqliteStore { 36 + // fn put(&self, c: Cid, t: MPB) { 37 + // let key_bytes = c.to_bytes(); 38 + // let val_bytes = bincode::serde::encode_to_vec(t, bincode::config::standard()).unwrap(); 39 39 40 - self.conn 41 - .execute( 42 - "INSERT INTO blocks (key, val) VALUES (?1, ?2)", 43 - (&key_bytes, &val_bytes), 44 - ) 45 - .unwrap(); 46 - } 47 - fn get(&self, c: Cid) -> Option<MPB> { 48 - let key_bytes = c.to_bytes(); 40 + // self.conn 41 + // .execute( 42 + // "INSERT INTO blocks (key, val) VALUES (?1, ?2)", 43 + // (&key_bytes, &val_bytes), 44 + // ) 45 + // .unwrap(); 46 + // } 47 + // fn get(&self, c: Cid) -> Option<MPB> { 48 + // let key_bytes = c.to_bytes(); 49 49 50 - let val_bytes: Vec<u8> = self 51 - .conn 52 - .query_one( 53 - "SELECT val FROM blocks WHERE key = ?1", 54 - (&key_bytes,), 55 - |row| row.get(0), 56 - ) 57 - .optional() 58 - .unwrap()?; 50 + // let val_bytes: Vec<u8> = self 51 + // .conn 52 + // .query_one( 53 + // "SELECT val FROM blocks WHERE key = ?1", 54 + // (&key_bytes,), 55 + // |row| row.get(0), 56 + // ) 57 + // .optional() 58 + // .unwrap()?; 59 59 60 - let (t, n): (MPB, usize) = 61 - bincode::serde::decode_from_slice(&val_bytes, bincode::config::standard()).unwrap(); 62 - assert_eq!(val_bytes.len(), n); 63 - Some(t) 64 - } 65 - } 60 + // let (t, n): (MPB, usize) = 61 + // bincode::serde::decode_from_slice(&val_bytes, bincode::config::standard()).unwrap(); 62 + // assert_eq!(val_bytes.len(), n); 63 + // Some(t) 64 + // } 65 + // }
+6 -17
src/disk_walk.rs
··· 1 1 //! Depth-first MST traversal 2 2 3 - use crate::disk_drive::{BlockStore, MaybeProcessedBlock}; 3 + use crate::disk_drive::BlockStore; 4 4 use crate::mst::Node; 5 5 6 6 use ipld_core::cid::Cid; ··· 101 101 /// Advance through nodes until we find a record or can't go further 102 102 pub fn step<T: Clone + Serialize + DeserializeOwned, E: Error>( 103 103 &mut self, 104 - block_store: &mut impl BlockStore<MaybeProcessedBlock<T>>, 104 + block_store: &mut impl BlockStore<Vec<u8>>, 105 105 process: impl Fn(&[u8]) -> Result<T, E>, 106 106 ) -> Result<Step<T>, Trip> { 107 107 loop { ··· 113 113 match &mut need { 114 114 Need::Node(cid) => { 115 115 log::trace!("need node {cid:?}"); 116 - let Some(mpb) = block_store.get(*cid) else { 116 + let Some(block) = block_store.get(*cid) else { 117 117 log::trace!("node not found, resting"); 118 118 return Ok(Step::Rest(*cid)); 119 119 }; 120 120 121 - let MaybeProcessedBlock::<T>::Raw(block) = mpb else { 122 - return Err(Trip::BadCommit("failed commit fingerprint".into())); 123 - }; 124 121 let node = serde_ipld_dagcbor::from_slice::<Node>(&block) 125 122 .map_err(|e| Trip::BadCommit(e.into()))?; 126 123 ··· 132 129 } 133 130 Need::Record { rkey, cid } => { 134 131 log::trace!("need record {cid:?}"); 135 - let Some(mpb) = block_store.get(*cid) else { 132 + let Some(block) = block_store.get(*cid) else { 136 133 log::trace!("record block not found, resting"); 137 134 return Ok(Step::Rest(*cid)); 138 135 }; 139 136 let rkey = rkey.clone(); 140 - let data = match mpb { 141 - MaybeProcessedBlock::Raw(data) => match process(&data) { 142 - Ok(t) => Ok(t), 143 - Err(e) => Err(Trip::ProcessFailed(e.to_string())), 144 - }, 145 - MaybeProcessedBlock::ProcessedOk(t) => Ok(t.clone()), 146 - MaybeProcessedBlock::Unprocessable(s) => { 147 - return Err(Trip::ProcessFailed(s.clone())); 148 - } 149 - }; 137 + 138 + let data = process(&block).map_err(|e| Trip::ProcessFailed(e.to_string())); 150 139 151 140 // found node, make sure we remember 152 141 self.stack.pop();