Fast and robust atproto CAR file processing in rust
15
fork

Configure Feed

Select the types of activity you want to include in your feed.

redb big write tx

fast but this might be buffering in mem? need to verify.

phil 4e8617e1 474e29e8

+15 -10
+15 -10
src/disk_redb.rs
··· 1 1 use crate::disk_drive::BlockStore; 2 2 use ipld_core::cid::Cid; 3 - use redb::{Database, Error, ReadableDatabase, TableDefinition}; 3 + use redb::{Database, Error, ReadableTable, TableDefinition, WriteTransaction}; 4 4 use serde::{Serialize, de::DeserializeOwned}; 5 5 use std::path::Path; 6 6 7 7 const TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("blocks"); 8 8 9 9 pub struct RedbStore { 10 + #[allow(dead_code)] 10 11 db: Database, 12 + tx: Option<WriteTransaction>, 11 13 } 12 14 13 15 impl RedbStore { ··· 15 17 log::warn!("redb new"); 16 18 let db = Database::create(path)?; 17 19 log::warn!("db created"); 18 - Ok(Self { db }) 20 + let mut tx = db.begin_write()?; 21 + tx.set_durability(redb::Durability::None).unwrap(); 22 + log::warn!("transaction begun"); 23 + Ok(Self { db, tx: Some(tx) }) 19 24 } 20 25 } 21 26 22 - // TODO: clean up on drop 27 + impl Drop for RedbStore { 28 + fn drop(&mut self) { 29 + let tx = self.tx.take(); 30 + tx.unwrap().abort().unwrap(); 31 + } 32 + } 23 33 24 34 impl<MPB: Serialize + DeserializeOwned> BlockStore<MPB> for RedbStore { 25 35 fn put(&self, c: Cid, t: MPB) { 26 36 let key_bytes = c.to_bytes(); 27 37 let val_bytes = bincode::serde::encode_to_vec(t, bincode::config::standard()).unwrap(); 28 - 29 - let mut tx = self.db.begin_write().unwrap(); 30 - tx.set_durability(redb::Durability::None).unwrap(); 31 38 { 32 - let mut table = tx.open_table(TABLE).unwrap(); 39 + let mut table = self.tx.as_ref().unwrap().open_table(TABLE).unwrap(); 33 40 table.insert(&*key_bytes, &*val_bytes).unwrap(); 34 41 } 35 - tx.commit().unwrap(); 36 42 } 37 43 fn get(&self, c: Cid) -> Option<MPB> { 38 44 let key_bytes = c.to_bytes(); 39 - let tx = self.db.begin_read().unwrap(); 40 - let table = tx.open_table(TABLE).unwrap(); 45 + let table = self.tx.as_ref().unwrap().open_table(TABLE).unwrap(); 41 46 let maybe_val_bytes = table.get(&*key_bytes).unwrap()?; 42 47 let (t, n): (MPB, usize) = 43 48 bincode::serde::decode_from_slice(maybe_val_bytes.value(), bincode::config::standard())