Server tools to backfill, tail, mirror, and verify PLC logs
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

*: wrap Datastore trait, make it viral

geesawra 1c879068 934d915a

+174 -49
+13
Cargo.lock
··· 35 35 "bytevec", 36 36 "chrono", 37 37 "clap", 38 + "enum_dispatch", 38 39 "futures", 39 40 "governor", 40 41 "http-body-util", ··· 637 638 checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" 638 639 dependencies = [ 639 640 "cfg-if", 641 + ] 642 + 643 + [[package]] 644 + name = "enum_dispatch" 645 + version = "0.3.13" 646 + source = "registry+https://github.com/rust-lang/crates.io-index" 647 + checksum = "aa18ce2bc66555b3218614519ac839ddb759a7d6720732f979ef8d13be147ecd" 648 + dependencies = [ 649 + "once_cell", 650 + "proc-macro2", 651 + "quote", 652 + "syn", 640 653 ] 641 654 642 655 [[package]]
+1
Cargo.toml
··· 12 12 bytevec = "0.2.0" 13 13 chrono = { version = "0.4.42", features = ["serde"] } 14 14 clap = { version = "4.5.47", features = ["derive", "env"] } 15 + enum_dispatch = "0.3.13" 15 16 futures = "0.3.31" 16 17 governor = "0.10.1" 17 18 http-body-util = "0.1.3"
+8 -2
src/backfill.rs
··· 13 13 dest: mpsc::Sender<ExportPage>, 14 14 source_workers: usize, 15 15 until: Option<Dt>, 16 + starting_from: Option<i64>, 16 17 ) -> anyhow::Result<&'static str> { 18 + let starting_from = match starting_from { 19 + Some(sf) => Week::from_n(sf), 20 + None => FIRST_WEEK, 21 + }; 22 + 17 23 // queue up the week bundles that should be available 18 24 let weeks = Arc::new(Mutex::new( 19 25 until 20 - .map(|u| Week::range(FIRST_WEEK..u.into())) 21 - .unwrap_or(Week::range(FIRST_WEEK..)), 26 + .map(|u| Week::range(starting_from..u.into())) 27 + .unwrap_or(Week::range(starting_from..)), 22 28 )); 23 29 weeks.lock().await.reverse(); 24 30
+23 -10
src/bin/backfill.rs
··· 1 1 use allegedly::{ 2 - Db, Dt, ExportPage, FolderSource, HttpSource, RocksDatastore, backfill, backfill_to_pg, 3 - backfill_to_rocksdb, bin::GlobalArgs, bin_init, datastore::Datastore, full_pages, 4 - pages_to_stdout, poll_upstream, 2 + Db, Dt, ExportPage, FolderSource, HttpSource, RocksDatastore, backfill, bin::GlobalArgs, 3 + bin_init, datastore::Datastore, full_pages, pages_to_stdout, poll_upstream, 5 4 }; 6 5 use anyhow::anyhow; 7 6 use clap::Parser; ··· 51 50 /// Cannot be used alongside any postgres-related flag. 52 51 #[arg(long, action)] 53 52 rocksdb_db_path: Option<PathBuf>, 53 + /// Start at the week pointed by this UNIX timestamp 54 + #[arg(long)] 55 + starting_from: Option<i64>, 54 56 /// Stop at the week ending before this date 55 57 #[arg(long)] 56 58 until: Option<Dt>, ··· 72 74 to_postgres, 73 75 postgres_cert, 74 76 postgres_reset, 77 + starting_from, 75 78 until, 76 79 catch_up, 77 80 rocksdb_db_path, ··· 128 131 bulk_tx, 129 132 source_workers.unwrap_or(1), 130 133 until, 134 + starting_from, 131 135 )); 132 136 } else { 133 137 tasks.spawn(backfill( ··· 135 139 bulk_tx, 136 140 source_workers.unwrap_or(4), 137 141 until, 142 + starting_from, 138 143 )); 139 144 } 140 145 ··· 155 160 let db = Db::new(pg_url.as_str(), postgres_cert).await?; 156 161 log::trace!("connected to postgres"); 157 162 158 - tasks.spawn(backfill_to_pg( 159 - db.clone(), 160 - postgres_reset, 161 - bulk_out, 162 - found_last_tx, 163 - )); 163 + let db_backfill = db.clone(); 164 + tasks.spawn(async move { 165 + let mut db_conn = db_backfill.datastore().await?; 166 + match db_conn.backfill(bulk_out, found_last_tx).await { 167 + Ok(_) => Ok("backfill"), 168 + Err(e) => Err(anyhow!(e)), 169 + } 170 + }); 164 171 if catch_up { 165 172 tasks.spawn(async move { 166 173 let mut db_conn = db.datastore().await?; ··· 175 182 let mut db = RocksDatastore::new(rocksdb_path)?; 176 183 log::trace!("opened!"); 177 184 178 - tasks.spawn(backfill_to_rocksdb(db.clone(), bulk_out, found_last_tx)); 185 + let mut db_backfill = db.clone(); 186 + tasks.spawn(async move { 187 + match db_backfill.backfill(bulk_out, found_last_tx).await { 188 + Ok(_) => Ok("backfill"), 189 + Err(e) => Err(anyhow!(e)), 190 + } 191 + }); 179 192 if catch_up { 180 193 tasks.spawn(async move { 181 194 match db.observe_pages(full_out).await {
+34 -11
src/bin/mirror.rs
··· 1 1 use allegedly::{ 2 - Db, ExperimentalConf, ListenConf, bin::GlobalArgs, bin_init, datastore::Datastore, 2 + DatastoreEnum, Db, ExperimentalConf, ListenConf, RocksDatastore, 3 + bin::GlobalArgs, 4 + bin_init, 5 + datastore::{Datastore, SendDatastore}, 3 6 poll_upstream, serve, 4 7 }; 5 8 use anyhow::anyhow; ··· 13 16 /// the wrapped did-method-plc server 14 17 #[arg(long, env = "ALLEGEDLY_WRAP")] 15 18 wrap: Url, 19 + /// Path to the database for RocksDB. 20 + /// Will be created if it doesn't exist. 21 + /// Cannot be used alongside any postgres-related flag. 22 + #[arg(long, action)] 23 + rocksdb_db_path: Option<PathBuf>, 16 24 /// the wrapped did-method-plc server's database (write access required) 17 25 #[arg(long, env = "ALLEGEDLY_WRAP_PG")] 18 26 wrap_pg: Option<Url>, ··· 68 76 }: GlobalArgs, 69 77 Args { 70 78 wrap, 79 + rocksdb_db_path, 71 80 wrap_pg, 72 81 wrap_pg_cert, 73 82 bind, ··· 106 115 107 116 let mut tasks = JoinSet::new(); 108 117 118 + let datastore: Option<DatastoreEnum> = if let Some(pg_url) = wrap_pg { 119 + let db = Db::new(pg_url.clone().as_str(), wrap_pg_cert.clone()).await?; 120 + let ds = db.datastore().await?; 121 + Some(DatastoreEnum::Pg(ds)) 122 + } else if let Some(rocksdb_path) = rocksdb_db_path { 123 + let db = RocksDatastore::new(rocksdb_path)?; 124 + Some(DatastoreEnum::Rocks(db)) 125 + } else { 126 + None 127 + }; 128 + 109 129 let db = if sync { 110 - let wrap_pg = wrap_pg.ok_or(anyhow::anyhow!( 111 - "a wrapped reference postgres must be provided to sync" 112 - ))?; 113 - let db = Db::new(wrap_pg.clone().as_str(), wrap_pg_cert.clone()).await?; 130 + let ds = match datastore { 131 + Some(ds) => ds, 132 + None => { 133 + return Err(anyhow::anyhow!( 134 + "a wrapped reference postgres must be provided to sync" 135 + )); 136 + } 137 + }; 114 138 115 139 // TODO: allow starting up with polling backfill from beginning? 116 140 log::debug!("getting the latest op from the db..."); 117 - let latest = db 118 - .get_latest() 141 + let latest = ds 142 + .get_latest_timestamp() 119 143 .await? 120 144 .expect("there to be at least one op in the db. did you backfill?"); 121 145 ··· 127 151 128 152 tasks.spawn(poll_upstream(Some(latest), poll_url, throttle, send_page)); 129 153 154 + let mut ds_observe = ds.clone(); 130 155 tasks.spawn(async move { 131 - let db = Db::new(wrap_pg.as_str(), wrap_pg_cert).await?; 132 - let mut db_conn = db.datastore().await?; 133 - match db_conn.observe_pages(recv_page).await { 156 + match ds_observe.observe_pages(recv_page).await { 134 157 Ok(_) => Ok("observe_pages"), 135 158 Err(e) => Err(anyhow!(e)), 136 159 } 137 160 }); 138 - Some(db) 161 + Some(ds) 139 162 } else { 140 163 None 141 164 };
+21 -4
src/datastore.rs
··· 1 - use tokio::sync::mpsc; 1 + use tokio::sync::{mpsc, oneshot}; 2 2 3 - use crate::{Dt, ExportPage, Op}; 3 + use crate::{Dt, ExportPage, Op, plc_pg, plc_rocksdb}; 4 4 5 5 #[derive(Debug)] 6 6 pub enum Error { 7 7 GetLatest(String), 8 8 InsertOperation(String), 9 + Backfill(String), 9 10 } 10 11 11 12 impl std::error::Error for Error {} 12 13 13 14 impl std::fmt::Display for Error { 14 15 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 15 - todo!() 16 + match self { 17 + Error::GetLatest(s) => write!(f, "get_latest error: {}", s), 18 + Error::InsertOperation(s) => write!(f, "insert_operation error: {}", s), 19 + Error::Backfill(s) => write!(f, "backfill error: {}", s), 20 + } 16 21 } 17 22 } 18 23 ··· 21 26 pub did: u64, 22 27 } 23 28 24 - // #[allow(async_fn_in_trait)] 29 + #[enum_dispatch::enum_dispatch] 25 30 #[trait_variant::make(SendDatastore: Send)] 26 31 pub trait Datastore { 27 32 async fn get_latest_timestamp(&self) -> Result<Option<Dt>, Error>; 28 33 async fn insert_operation(&mut self, op: Op) -> Result<InsertResult, Error>; 29 34 async fn observe_pages(&mut self, pages: mpsc::Receiver<ExportPage>) -> Result<(), Error>; 35 + async fn backfill( 36 + &mut self, 37 + pages: mpsc::Receiver<ExportPage>, 38 + notify_last_at: Option<oneshot::Sender<Option<Dt>>>, 39 + ) -> Result<(), Error>; 40 + } 41 + 42 + #[derive(Clone)] 43 + #[enum_dispatch::enum_dispatch(Datastore, SendDatastore)] 44 + pub enum DatastoreEnum { 45 + Pg(plc_pg::DbDatastore), 46 + Rocks(plc_rocksdb::RocksDatastore), 30 47 }
+2 -1
src/lib.rs
··· 17 17 pub use backfill::backfill; 18 18 pub use cached_value::{CachedValue, Fetcher}; 19 19 pub use client::{CLIENT, UA}; 20 + pub use datastore::DatastoreEnum; 20 21 pub use mirror::{ExperimentalConf, ListenConf, serve}; 21 - pub use plc_pg::{Db, backfill_to_pg}; 22 + pub use plc_pg::Db; 22 23 pub use plc_rocksdb::{RocksDatastore, backfill_to_rocksdb}; 23 24 pub use poll::{PageBoundaryState, get_page, poll_upstream}; 24 25 pub use ratelimit::{CreatePlcOpLimiter, GovernorMiddleware, IpLimiters};
+5 -4
src/mirror.rs
··· 1 1 use crate::{ 2 - CachedValue, CreatePlcOpLimiter, Db, Dt, Fetcher, GovernorMiddleware, IpLimiters, UA, logo, 2 + CachedValue, CreatePlcOpLimiter, DatastoreEnum, Db, Dt, Fetcher, GovernorMiddleware, 3 + IpLimiters, UA, datastore::Datastore, logo, 3 4 }; 4 5 use futures::TryStreamExt; 5 6 use governor::Quota; ··· 181 182 } 182 183 183 184 #[derive(Clone)] 184 - struct GetLatestAt(Db); 185 + struct GetLatestAt(DatastoreEnum); 185 186 impl Fetcher<Dt> for GetLatestAt { 186 187 async fn fetch(&self) -> Result<Dt, Box<dyn std::error::Error>> { 187 - let now = self.0.get_latest().await?.ok_or(anyhow::anyhow!( 188 + let now = self.0.get_latest_timestamp().await?.ok_or(anyhow::anyhow!( 188 189 "expected to find at least one thing in the db" 189 190 ))?; 190 191 Ok(now) ··· 388 389 plc: Url, 389 390 listen: ListenConf, 390 391 experimental: ExperimentalConf, 391 - db: Option<Db>, 392 + db: Option<DatastoreEnum>, 392 393 ) -> anyhow::Result<&'static str> { 393 394 log::info!("starting server..."); 394 395
+32 -17
src/plc_pg.rs
··· 2 2 Dt, ExportPage, PageBoundaryState, 3 3 datastore::{self, InsertResult, SendDatastore}, 4 4 }; 5 + use futures::{TryFutureExt, lock::Mutex}; 5 6 use native_tls::{Certificate, TlsConnector}; 6 7 use postgres_native_tls::MakeTlsConnector; 7 - use std::path::PathBuf; 8 8 use std::pin::pin; 9 9 use std::time::Instant; 10 + use std::{path::PathBuf, sync::Arc}; 10 11 use tokio::{ 11 12 sync::{mpsc, oneshot}, 12 13 task::{JoinHandle, spawn}, ··· 113 114 Ok(dt) 114 115 } 115 116 116 - pub async fn datastore(&self) -> Result<impl SendDatastore, PgError> { 117 - let (client, task) = self.connect().await?; 117 + pub async fn datastore(&self) -> Result<DbDatastore, PgError> { 118 + let (client, _) = self.connect().await?; 118 119 119 - Ok(DbDatastore { client, task }) 120 + Ok(DbDatastore { 121 + client: Arc::new(Mutex::new(client)), 122 + }) 120 123 } 121 124 } 122 125 126 + #[derive(Clone)] 123 127 pub struct DbDatastore { 124 - client: Client, 125 - task: JoinHandle<Result<(), PgError>>, 128 + client: Arc<Mutex<Client>>, 126 129 } 127 130 128 131 impl SendDatastore for DbDatastore { 129 132 async fn get_latest_timestamp(&self) -> Result<Option<Dt>, crate::datastore::Error> { 130 133 match self 131 134 .client 135 + .lock() 136 + .await 132 137 .query_opt( 133 138 r#"SELECT "createdAt" 134 139 FROM operations ··· 147 152 &mut self, 148 153 op: crate::Op, 149 154 ) -> Result<datastore::InsertResult, crate::datastore::Error> { 150 - let ops_stmt = self 151 - .client 155 + let mut client = self.client.lock().await; 156 + let ops_stmt = client 152 157 .prepare( 153 158 r#"INSERT INTO operations (did, operation, cid, nullified, "createdAt") 154 159 VALUES ($1, $2, $3, $4, $5) ··· 156 161 ) 157 162 .await 158 163 .map_err(|e| datastore::Error::InsertOperation(e.to_string()))?; 159 - let did_stmt = self 160 - .client 164 + let did_stmt = client 161 165 .prepare(r#"INSERT INTO dids (did) VALUES ($1) ON CONFLICT do nothing"#) 162 166 .await 163 167 .map_err(|e| datastore::Error::InsertOperation(e.to_string()))?; 164 168 165 - let tx = self 166 - .client 169 + let tx = client 167 170 .transaction() 168 171 .await 169 172 .map_err(|e| datastore::Error::InsertOperation(e.to_string()))?; ··· 220 223 221 224 Ok(()) 222 225 } 226 + 227 + async fn backfill( 228 + &mut self, 229 + pages: mpsc::Receiver<ExportPage>, 230 + notify_last_at: Option<oneshot::Sender<Option<Dt>>>, 231 + ) -> Result<(), datastore::Error> { 232 + let mut client = self.client.lock().await; 233 + match backfill_to_pg(false, pages, notify_last_at, &mut client) 234 + .map_err(|e| datastore::Error::Backfill(e.to_string())) 235 + .await 236 + { 237 + Ok(_) => Ok(()), 238 + Err(e) => Err(e), 239 + } 240 + } 223 241 } 224 242 225 243 /// Dump rows into an empty operations table quickly ··· 235 253 /// panics: if the operations or dids tables are not empty, unless reset is true 236 254 /// 237 255 /// recommended postgres setting: `max_wal_size=4GB` (or more) 238 - pub async fn backfill_to_pg( 239 - db: Db, 256 + async fn backfill_to_pg( 240 257 reset: bool, 241 258 mut pages: mpsc::Receiver<ExportPage>, 242 259 notify_last_at: Option<oneshot::Sender<Option<Dt>>>, 260 + client: &mut Client, 243 261 ) -> anyhow::Result<&'static str> { 244 - let (mut client, task) = db.connect().await?; 245 - 246 262 let t0 = Instant::now(); 247 263 let tx = client.transaction().await?; 248 264 tx.execute("SET LOCAL synchronous_commit = off", &[]) ··· 356 372 log::trace!("set tables LOGGED: {:?}", t_step.elapsed()); 357 373 358 374 tx.commit().await?; 359 - drop(task); 360 375 log::info!("total backfill time: {:?}", t0.elapsed()); 361 376 362 377 Ok("backfill_to_pg")
+35
src/plc_rocksdb.rs
··· 21 21 pub fn new(path: PathBuf) -> Result<Self, rocksdb::Error> { 22 22 let mut opts = Options::default(); 23 23 opts.create_if_missing(true); 24 + opts.set_max_open_files(1000); 24 25 opts.create_missing_column_families(true); 25 26 26 27 let rdb = rocksdb::DB::open_cf(&opts, path, vec!["dids", "ops"])?; ··· 115 116 let op: Op = serde_json::from_slice(&v).unwrap(); 116 117 op 117 118 })) 119 + } 120 + 121 + pub fn as_datastore(self) -> impl SendDatastore { 122 + self 118 123 } 119 124 } 120 125 ··· 207 212 t0.elapsed() 208 213 ); 209 214 215 + Ok(()) 216 + } 217 + 218 + async fn backfill( 219 + &mut self, 220 + mut pages: mpsc::Receiver<ExportPage>, 221 + notify_last_at: Option<oneshot::Sender<Option<Dt>>>, 222 + ) -> Result<(), Error> { 223 + let mut last_at = None; 224 + while let Some(page) = pages.recv().await { 225 + for op in &page.ops { 226 + let rdb = self.db.lock().await; 227 + self.insert_op(&rdb, op) 228 + .await 229 + .map_err(|e| Error::Backfill(e.to_string()))?; 230 + } 231 + if notify_last_at.is_some() 232 + && let Some(s) = PageBoundaryState::new(&page) 233 + { 234 + last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at)); 235 + } 236 + } 237 + log::debug!("finished receiving bulk pages"); 238 + 239 + if let Some(notify) = notify_last_at { 240 + log::trace!("notifying last_at: {last_at:?}"); 241 + if notify.send(last_at).is_err() { 242 + log::error!("receiver for last_at dropped, can't notify"); 243 + }; 244 + } 210 245 Ok(()) 211 246 } 212 247 }