Server tools to backfill, tail, mirror, and verify PLC logs
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

*: store dids and ops in different column families

Instead of storing (did, cid) for ops, store (did_id, cid), not to waste
space.

Store dids individually for ease of iteration.

geesawra 2debffb6 d9b0ecb9

+87 -19
+87 -19
src/plc_rocksdb.rs
··· 1 - use std::{path::PathBuf, sync::Arc, time::Instant}; 1 + use rocksdb::{ColumnFamilyDescriptor, Options}; 2 + use std::{fmt::Display, ops::Add, path::PathBuf, sync::Arc, time::Instant}; 2 3 use tokio::sync::{Mutex, mpsc, oneshot}; 3 4 4 5 use crate::{ ··· 13 14 14 15 impl RocksDatastore { 15 16 pub fn new(path: PathBuf) -> Result<Self, rocksdb::Error> { 16 - let rdb = rocksdb::DB::open_default(path)?; 17 + let mut opts = Options::default(); 18 + opts.create_if_missing(true); 19 + opts.create_missing_column_families(true); 20 + 21 + let rdb = rocksdb::DB::open_cf(&opts, path, vec!["dids", "ops"])?; 17 22 18 23 Ok(RocksDatastore { 19 24 db: Arc::new(Mutex::new(rdb)), 20 25 }) 21 26 } 22 27 23 - async fn latest_timestamp(&self) -> Result<Option<Dt>, rocksdb::Error> { 24 - let db = self.db.lock().await; 28 + async fn latest_timestamp(&self, db: &rocksdb::DB) -> Result<Option<Dt>, rocksdb::Error> { 25 29 let res = db.get(LATEST_TIMESTAMP_KEY)?; 26 30 27 31 match res { ··· 33 37 } 34 38 } 35 39 36 - async fn set_latest_timestamp(&self, ts: Dt) -> Result<(), rocksdb::Error> { 37 - let db = self.db.lock().await; 40 + async fn set_latest_timestamp(&self, db: &rocksdb::DB, ts: Dt) -> Result<(), rocksdb::Error> { 38 41 db.put(LATEST_TIMESTAMP_KEY, ts.timestamp_micros().to_string()) 39 42 .map(|_| Ok(()))? 40 43 } 41 44 42 - async fn insert_op(&self, op: &Op) -> Result<(), rocksdb::Error> { 43 - { 44 - let db = self.db.lock().await; 45 - db.put(op_key(&op), serde_json::to_vec(&op).unwrap())?; 45 + async fn insert_op(&self, db: &rocksdb::DB, op: &Op) -> Result<(), rocksdb::Error> { 46 + let cf = db.cf_handle("ops").unwrap(); 47 + let id = self.store_did(db, op.did.clone()).await?; 48 + db.put_cf(&cf, op_key(&op, id), serde_json::to_vec(&op).unwrap())?; 49 + self.set_latest_timestamp(&db, op.created_at).await?; 50 + 51 + Ok(()) 52 + } 53 + 54 + async fn store_did(&self, db: &rocksdb::DB, did: String) -> Result<u64, rocksdb::Error> { 55 + let cf = db.cf_handle("dids").unwrap(); 56 + match db.get_cf(&cf, did.clone())? { 57 + Some(s) => { 58 + let l = String::from_utf8(s).unwrap(); 59 + Ok(l.parse::<u64>().unwrap()) 60 + } 61 + None => { 62 + let latest_usable = self.latest_usable_did_id(&db).await?; 63 + db.put_cf(&cf, did, latest_usable.to_string())?; 64 + self.write_latest_did_id(&db, latest_usable).await?; 65 + Ok(latest_usable) 66 + } 46 67 } 68 + } 47 69 48 - self.set_latest_timestamp(op.created_at).await?; 70 + async fn latest_usable_did_id(&self, db: &rocksdb::DB) -> Result<u64, rocksdb::Error> { 71 + let latest_did = db.get_pinned(LATEST_USED_DID_ID)?; 72 + Ok(match latest_did { 73 + Some(l) => { 74 + let l = String::from_utf8(l.to_vec()).unwrap(); 75 + (l.parse::<u64>().unwrap()) + 1 76 + } 77 + None => 0, 78 + }) 79 + } 49 80 50 - Ok(()) 81 + async fn write_latest_did_id(&self, db: &rocksdb::DB, l: u64) -> Result<(), rocksdb::Error> { 82 + Ok(db.put(LATEST_USED_DID_ID, l.to_string())?) 83 + } 84 + 85 + fn all_dids(&self, db: &rocksdb::DB) -> Result<impl Iterator<Item = String>, rocksdb::Error> { 86 + let cf = db.cf_handle("dids").unwrap(); 87 + 88 + let pi = db.prefix_iterator_cf(&cf, "did"); 89 + 90 + Ok(pi.into_iter().map(|e| { 91 + let (key, v) = e.unwrap(); 92 + log::info!("id: {}", String::from_utf8(v.to_vec()).unwrap()); 93 + let did = String::from_utf8(key.to_vec()).unwrap(); 94 + did 95 + })) 96 + } 97 + 98 + fn all_ops(&self, db: &rocksdb::DB) -> Result<impl Iterator<Item = Op>, rocksdb::Error> { 99 + let cf = db.cf_handle("ops").unwrap(); 100 + 101 + let pi = db.prefix_iterator_cf(&cf, ""); 102 + 103 + Ok(pi.into_iter().map(|e| { 104 + let (key, v) = e.unwrap(); 105 + log::info!("id: {}", String::from_utf8(key.to_vec()).unwrap()); 106 + let op: Op = serde_json::from_slice(&v).unwrap(); 107 + op 108 + })) 51 109 } 52 110 } 53 111 54 - fn op_key(op: &Op) -> String { 55 - format!("{}_{}", op.did, op.cid) 112 + fn op_key(op: &Op, id: u64) -> String { 113 + format!("{}_{}", id, op.cid) 56 114 } 57 115 58 116 const LATEST_TIMESTAMP_KEY: &str = "latest_timestamp"; 59 - const OPERATIONS_KEY: &str = "operations"; 60 - const DIDS_KEY: &str = "dids"; 117 + const LATEST_USED_DID_ID: &str = "latest_used_did_id"; 61 118 62 119 impl SendDatastore for RocksDatastore { 63 120 async fn get_latest_timestamp(&self) -> Result<Option<Dt>, Error> { 64 - self.latest_timestamp() 121 + let db = self.db.lock().await; 122 + 123 + self.latest_timestamp(&db) 65 124 .await 66 125 .map_err(|e| Error::GetLatest(e.to_string())) 67 126 } 68 127 69 128 async fn insert_operation(&mut self, op: Op) -> Result<InsertResult, Error> { 70 - self.insert_op(&op) 129 + let db = self.db.lock().await; 130 + 131 + self.insert_op(&db, &op) 71 132 .await 72 133 .map_err(|e| Error::InsertOperation(e.to_string())) 73 134 .map(|_| InsertResult { op: 1, did: 1 }) ··· 106 167 let mut last_at = None; 107 168 while let Some(page) = pages.recv().await { 108 169 for op in &page.ops { 109 - db.insert_op(op).await?; 170 + let rdb = db.db.lock().await; 171 + db.insert_op(&rdb, op).await?; 110 172 } 111 173 if notify_last_at.is_some() 112 174 && let Some(s) = PageBoundaryState::new(&page) ··· 115 177 } 116 178 } 117 179 log::debug!("finished receiving bulk pages"); 180 + 181 + let rdb = db.db.lock().await; 182 + let d = db.all_ops(&rdb).unwrap(); 183 + for did in d { 184 + log::info!("stored op: {:?}", did); 185 + } 118 186 119 187 if let Some(notify) = notify_last_at { 120 188 log::trace!("notifying last_at: {last_at:?}");