Server tools to backfill, tail, mirror, and verify PLC logs
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

*: show amount of dids this instance of allegedly has seen so far

geesawra 4b57ed4e 6df45173

+59 -7
+5 -2
src/datastore.rs
··· 8 8 InsertOperation(String), 9 9 Backfill(String), 10 10 QueryOps(String), 11 + SeenDidsAmount(String), 11 12 } 12 13 13 14 impl std::error::Error for Error {} ··· 15 16 impl std::fmt::Display for Error { 16 17 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 17 18 match self { 18 - Error::GetLatest(s) => write!(f, "get_latest error: {}", s), 19 - Error::InsertOperation(s) => write!(f, "insert_operation error: {}", s), 19 + Error::GetLatest(s) => write!(f, "get_latest: {}", s), 20 + Error::InsertOperation(s) => write!(f, "insert_operation: {}", s), 20 21 Error::Backfill(s) => write!(f, "backfill error: {}", s), 21 22 Error::QueryOps(s) => write!(f, "query ops: {}", s), 23 + Error::SeenDidsAmount(s) => write!(f, "seen_dids_amount: {}", s), 22 24 } 23 25 } 24 26 } ··· 40 42 notify_last_at: Option<oneshot::Sender<Option<Dt>>>, 41 43 ) -> Result<(), Error>; 42 44 async fn query_ops(&self, did: String) -> Result<Vec<Op>, Error>; 45 + async fn seen_dids_amount(&self) -> Result<u64, Error>; 43 46 } 44 47 45 48 #[derive(Clone)]
+27 -4
src/mirror.rs
··· 15 15 use reqwest::{Client, Url}; 16 16 use serde::Serialize; 17 17 use std::{net::SocketAddr, path::PathBuf, time::Duration}; 18 + use tokio_util::sync; 18 19 19 20 #[derive(Clone)] 20 21 struct State { ··· 31 32 struct SyncInfo { 32 33 backend: String, 33 34 latest_at: CachedValue<Dt, GetLatestAt>, 35 + seen_dids: CachedValue<u64, GetLatestSeenDids>, 34 36 upstream_status: CachedValue<PlcStatus, CheckUpstream>, 35 37 } 36 38 37 39 #[handler] 38 - fn hello( 40 + async fn hello( 39 41 Data(State { 40 42 sync_info, 41 43 upstream, ··· 46 48 ) -> String { 47 49 // let mode = if sync_info.is_some() { "mirror" } else { "wrap" }; 48 50 let pre_info = if sync_info.is_some() { 51 + let seen_dids = sync_info 52 + .as_ref() 53 + .unwrap() 54 + .seen_dids 55 + .clone() 56 + .get() 57 + .await 58 + .unwrap(); 49 59 format!( 50 60 r#" 51 61 This is a PLC[1] mirror running Allegedly in mirror mode. Mirror mode wraps and ··· 58 68 59 69 Configured storage backend: 60 70 61 - {}"#, 62 - sync_info.as_ref().unwrap().backend 71 + {} 72 + 73 + This allegedly instance has seen {} different DIDs."#, 74 + sync_info.as_ref().unwrap().backend, 75 + seen_dids, 63 76 ) 64 77 } else { 65 78 format!( ··· 199 212 } 200 213 201 214 #[derive(Clone)] 215 + struct GetLatestSeenDids(DatastoreEnum); 216 + impl Fetcher<u64> for GetLatestSeenDids { 217 + async fn fetch(&self) -> Result<u64, Box<dyn std::error::Error>> { 218 + Ok(self.0.seen_dids_amount().await?) 219 + } 220 + } 221 + 222 + #[derive(Clone)] 202 223 struct CheckUpstream(Url, Client); 203 224 impl Fetcher<PlcStatus> for CheckUpstream { 204 225 async fn fetch(&self) -> Result<PlcStatus, Box<dyn std::error::Error>> { ··· 223 244 if let Some(SyncInfo { 224 245 latest_at, 225 246 upstream_status, 247 + seen_dids: _, 226 248 backend, 227 249 }) = sync_info 228 250 { ··· 445 467 }; 446 468 447 469 SyncInfo { 448 - latest_at: CachedValue::new(GetLatestAt(db), Duration::from_secs(2)), 470 + latest_at: CachedValue::new(GetLatestAt(db.clone()), Duration::from_secs(2)), 471 + seen_dids: CachedValue::new(GetLatestSeenDids(db), Duration::from_secs(2)), 449 472 upstream_status: CachedValue::new( 450 473 CheckUpstream(upstream.clone(), client.clone()), 451 474 Duration::from_secs(6),
+16
src/plc_pg.rs
··· 264 264 }) 265 265 .collect()) 266 266 } 267 + 268 + async fn seen_dids_amount(&self) -> Result<u64, datastore::Error> { 269 + let client = self.client.lock().await; 270 + let ops_stmt = client 271 + .prepare(r#"SELECT count(*) AS dids_count FROM dids"#) 272 + .await 273 + .map_err(|e| datastore::Error::QueryOps(e.to_string()))?; 274 + 275 + let res = client 276 + .query_one(&ops_stmt, &[]) 277 + .await 278 + .map_err(|e| datastore::Error::QueryOps(e.to_string()))?; 279 + 280 + let dids_count: i64 = res.get("dids_count"); 281 + Ok(dids_count as u64) 282 + } 267 283 } 268 284 269 285 /// Dump rows into an empty operations table quickly
+11 -1
src/plc_rocksdb.rs
··· 1 1 use async_compression::tokio::write::{GzipDecoder, GzipEncoder}; 2 - use futures::stream; 2 + use futures::{TryFutureExt, stream}; 3 3 use rocksdb::Options; 4 4 use std::{path::PathBuf, sync::Arc, time::Instant}; 5 5 use tokio::{ ··· 274 274 .then(async |e| e.to_op().await) 275 275 .collect() 276 276 .await) 277 + } 278 + 279 + async fn seen_dids_amount(&self) -> Result<u64, Error> { 280 + let db = self.db.lock().await; 281 + let next_usable = self 282 + .latest_usable_did_id(&db) 283 + .await 284 + .map_err(|e| Error::SeenDidsAmount(e.to_string()))?; 285 + 286 + Ok(next_usable - 1) 277 287 } 278 288 } 279 289