Server tools to backfill, tail, mirror, and verify PLC logs
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

mirror: query local operations

geesawra f83f5f5f 8474087c

+129 -34
+3
src/datastore.rs
··· 7 7 GetLatest(String), 8 8 InsertOperation(String), 9 9 Backfill(String), 10 + QueryOps(String), 10 11 } 11 12 12 13 impl std::error::Error for Error {} ··· 17 18 Error::GetLatest(s) => write!(f, "get_latest error: {}", s), 18 19 Error::InsertOperation(s) => write!(f, "insert_operation error: {}", s), 19 20 Error::Backfill(s) => write!(f, "backfill error: {}", s), 21 + Error::QueryOps(s) => write!(f, "query ops: {}", s), 20 22 } 21 23 } 22 24 } ··· 37 39 pages: mpsc::Receiver<ExportPage>, 38 40 notify_last_at: Option<oneshot::Sender<Option<Dt>>>, 39 41 ) -> Result<(), Error>; 42 + async fn query_ops(&self, did: String) -> Result<Vec<Op>, Error>; 40 43 } 41 44 42 45 #[derive(Clone)]
+39 -4
src/mirror.rs
··· 1 1 use crate::{ 2 - CachedValue, CreatePlcOpLimiter, DatastoreEnum, Db, Dt, Fetcher, GovernorMiddleware, 3 - IpLimiters, UA, datastore::Datastore, logo, 2 + CachedValue, CreatePlcOpLimiter, DatastoreEnum, Dt, Fetcher, GovernorMiddleware, IpLimiters, 3 + Op, UA, datastore::Datastore, logo, 4 4 }; 5 5 use futures::TryStreamExt; 6 6 use governor::Quota; ··· 13 13 web::{Data, Json, Path}, 14 14 }; 15 15 use reqwest::{Client, Url}; 16 + use serde::Serialize; 16 17 use std::{net::SocketAddr, path::PathBuf, time::Duration}; 17 18 18 19 #[derive(Clone)] ··· 20 21 client: Client, 21 22 plc: Url, 22 23 upstream: Url, 24 + ds: Option<DatastoreEnum>, 23 25 sync_info: Option<SyncInfo>, 24 26 experimental: ExperimentalConf, 25 27 } ··· 271 273 Response::from_parts(parts, poem::Body::from_bytes_stream(body)) 272 274 } 273 275 276 + #[derive(Serialize)] 277 + struct OpsResponse { 278 + operations: Vec<Op>, 279 + } 280 + 281 + #[handler] 282 + async fn ops(req: &Request, Data(state): Data<&State>) -> Result<Response> { 283 + let did = req.uri().path().strip_prefix("/").unwrap_or_default(); 284 + 285 + let ds = match state.ds.as_ref() { 286 + Some(ds) => ds, 287 + None => { 288 + return Ok("no data store configured :(" 289 + .with_status(reqwest::StatusCode::BAD_REQUEST) 290 + .into_response()); 291 + } 292 + }; 293 + 294 + match ds.query_ops(did.to_string()).await { 295 + Ok(res) => Ok(Json(OpsResponse { operations: res }).into_response()), 296 + Err(e) => Ok(e 297 + .to_string() 298 + .with_status(reqwest::StatusCode::BAD_REQUEST) 299 + .into_response()), 300 + } 301 + } 302 + 274 303 #[handler] 275 304 async fn proxy(req: &Request, Data(state): Data<&State>) -> Result<Response> { 276 305 let mut target = state.plc.clone(); ··· 399 428 ) -> anyhow::Result<&'static str> { 400 429 log::info!("starting server..."); 401 430 431 + let has_db = db.as_ref().is_some(); 432 + 402 433 // not using crate CLIENT: don't want the retries etc 403 434 let client = Client::builder() 404 435 .user_agent(UA) ··· 407 438 .expect("reqwest client to build"); 408 439 409 440 // when `db` is None, we're running in wrap mode. no db access, no upstream sync 410 - let sync_info = db.map(|db| { 441 + let sync_info = db.clone().map(|db| { 411 442 let backend = match db { 412 443 DatastoreEnum::Pg(_) => "postgres".to_owned(), 413 444 DatastoreEnum::Rocks(_) => "rocksdb".to_owned(), ··· 428 459 plc, 429 460 upstream: upstream.clone(), 430 461 sync_info, 462 + ds: db, 431 463 experimental: experimental.clone(), 432 464 }; 433 465 ··· 449 481 450 482 app = app.at("/did:plc:*", get(proxy).post(upstream_proxier)); 451 483 } else { 452 - app = app.at("/did:plc:*", get(proxy).post(nope)); 484 + match has_db { 485 + true => app = app.at("/did:plc:*", get(ops).post(nope)), 486 + false => app = app.at("/did:plc:*", get(proxy).post(nope)), 487 + }; 453 488 } 454 489 455 490 let app = app
+27 -1
src/plc_pg.rs
··· 1 1 use crate::{ 2 - Dt, ExportPage, PageBoundaryState, 2 + Dt, ExportPage, Op, PageBoundaryState, 3 3 datastore::{self, InsertResult, SendDatastore}, 4 4 }; 5 5 use futures::{TryFutureExt, lock::Mutex}; ··· 237 237 Ok(_) => Ok(()), 238 238 Err(e) => Err(e), 239 239 } 240 + } 241 + 242 + async fn query_ops(&self, did: String) -> Result<Vec<Op>, datastore::Error> { 243 + let client = self.client.lock().await; 244 + 245 + let ops_stmt = client 246 + .prepare(r#"SELECT * FROM operations WHERE did = $1"#) 247 + .await 248 + .map_err(|e| datastore::Error::QueryOps(e.to_string()))?; 249 + 250 + let res = client 251 + .query(&ops_stmt, &[&did]) 252 + .await 253 + .map_err(|e| datastore::Error::QueryOps(e.to_string()))?; 254 + 255 + // (did, operation, cid, nullified, "createdAt") 256 + Ok(res 257 + .into_iter() 258 + .map(|r| Op { 259 + did: r.get("did"), 260 + cid: r.get("cid"), 261 + created_at: r.get("createdAt"), 262 + nullified: r.get("nullified"), 263 + operation: serde_json::value::RawValue::from_string(r.get("operation")).unwrap(), 264 + }) 265 + .collect()) 240 266 } 241 267 } 242 268
+60 -29
src/plc_rocksdb.rs
··· 1 1 use async_compression::tokio::write::{GzipDecoder, GzipEncoder}; 2 - use rocksdb::{ColumnFamilyDescriptor, Options}; 2 + use futures::{TryFutureExt, stream}; 3 + use rocksdb::Options; 3 4 use serde::{Deserialize, Serialize}; 4 - use std::{fmt::Display, ops::Add, path::PathBuf, sync::Arc, time::Instant}; 5 + use std::{path::PathBuf, sync::Arc, time::Instant}; 5 6 use tokio::{ 6 - io::{AsyncWrite, AsyncWriteExt}, 7 + io::AsyncWriteExt, 7 8 sync::{Mutex, mpsc, oneshot}, 8 9 }; 10 + use tokio_stream::StreamExt; 9 11 10 12 use crate::{ 11 13 Dt, ExportPage, Op, PageBoundaryState, ··· 77 79 } 78 80 } 79 81 82 + async fn id_for_did( 83 + &self, 84 + db: &rocksdb::DB, 85 + did: String, 86 + ) -> Result<Option<u64>, rocksdb::Error> { 87 + let cf = db.cf_handle("dids").unwrap(); 88 + Ok(match db.get_cf(&cf, did.clone())? { 89 + Some(s) => { 90 + let l = String::from_utf8(s).unwrap(); 91 + Some(l.parse::<u64>().unwrap()) 92 + } 93 + None => None, 94 + }) 95 + } 96 + 80 97 async fn latest_usable_did_id(&self, db: &rocksdb::DB) -> Result<u64, rocksdb::Error> { 81 98 let latest_did = db.get_pinned(LATEST_USED_DID_ID)?; 82 99 Ok(match latest_did { ··· 92 109 Ok(db.put(LATEST_USED_DID_ID, l.to_string())?) 93 110 } 94 111 95 - fn all_dids(&self, db: &rocksdb::DB) -> Result<impl Iterator<Item = String>, rocksdb::Error> { 96 - let cf = db.cf_handle("dids").unwrap(); 97 - 98 - let pi = db.prefix_iterator_cf(&cf, "did"); 99 - 100 - Ok(pi.into_iter().map(|e| { 101 - let (key, v) = e.unwrap(); 102 - log::info!("id: {}", String::from_utf8(v.to_vec()).unwrap()); 103 - let did = String::from_utf8(key.to_vec()).unwrap(); 104 - did 105 - })) 106 - } 107 - 108 - fn all_ops(&self, db: &rocksdb::DB) -> Result<impl Iterator<Item = Op>, rocksdb::Error> { 109 - let cf = db.cf_handle("ops").unwrap(); 110 - 111 - let pi = db.prefix_iterator_cf(&cf, ""); 112 - 113 - Ok(pi.into_iter().map(|e| { 114 - let (key, v) = e.unwrap(); 115 - log::info!("id: {}", String::from_utf8(key.to_vec()).unwrap()); 116 - let op: Op = serde_json::from_slice(&v).unwrap(); 117 - op 118 - })) 119 - } 120 - 121 112 pub fn as_datastore(self) -> impl SendDatastore { 122 113 self 123 114 } ··· 243 234 }; 244 235 } 245 236 Ok(()) 237 + } 238 + 239 + async fn query_ops(&self, did: String) -> Result<Vec<Op>, Error> { 240 + // TODO(geesawra): check if did exists first! 241 + let db = self.db.lock().await; 242 + 243 + let id = match self 244 + .id_for_did(&db, did.clone()) 245 + .await 246 + .map_err(|e| Error::QueryOps(e.to_string()))? 247 + { 248 + Some(id) => id, 249 + None => return Err(Error::QueryOps(format!("{} not found", did))), 250 + }; 251 + 252 + let cf = db.cf_handle("ops").unwrap(); 253 + 254 + let key_prefix = format!("{}_", id); 255 + 256 + let mut va: Vec<RocksOp> = db 257 + .prefix_iterator_cf(&cf, key_prefix.clone()) 258 + .into_iter() 259 + .map_while(|e| { 260 + let (k, v) = e.unwrap(); 261 + let kstr = String::from_utf8(k.to_vec()).unwrap(); 262 + if !kstr.starts_with(&key_prefix) { 263 + return None; 264 + } 265 + 266 + let rop: RocksOp = serde_json::from_slice(&v).unwrap(); 267 + Some(rop) 268 + }) 269 + .collect(); 270 + 271 + va.sort_by(|a, b| a.created_at.cmp(&b.created_at)); 272 + 273 + Ok(stream::iter(va) 274 + .then(async |e| e.to_op().await) 275 + .collect() 276 + .await) 246 277 } 247 278 } 248 279