Server tools to backfill, tail, mirror, and verify PLC logs
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 6dbca75644dfb4662692c8ffae5aad64f6dfd98d 188 lines 5.6 kB view raw
1use serde::{Deserialize, Serialize}; 2 3use tokio::sync::{mpsc, oneshot}; 4 5mod backfill; 6mod cached_value; 7mod client; 8mod crypto; 9pub mod doc; 10mod mirror; 11mod plc_fjall; 12mod plc_pg; 13mod poll; 14mod ratelimit; 15mod weekly; 16 17pub mod bin; 18 19pub use backfill::backfill; 20pub use cached_value::{CachedValue, Fetcher}; 21pub use client::{CLIENT, UA}; 22pub use mirror::{ExperimentalConf, ListenConf, serve, serve_fjall}; 23pub use plc_fjall::{FjallDb, audit as audit_fjall, backfill_to_fjall, pages_to_fjall, drop_invalid_ops as drop_invalid_ops_fjall}; 24pub use plc_pg::{Db, backfill_to_pg, pages_to_pg}; 25pub use poll::{PageBoundaryState, get_page, poll_upstream}; 26pub use ratelimit::{CreatePlcOpLimiter, GovernorMiddleware, IpLimiters}; 27pub use weekly::{BundleSource, FolderSource, HttpSource, Week, pages_to_weeks, week_to_pages}; 28 29pub type Dt = chrono::DateTime<chrono::Utc>; 30 31/// One page of PLC export 32/// 33/// plc.directory caps /export at 1000 ops; backfill tasks may send more in a page. 34#[derive(Debug)] 35pub struct ExportPage { 36 pub ops: Vec<Op>, 37} 38 39impl ExportPage { 40 pub fn is_empty(&self) -> bool { 41 self.ops.is_empty() 42 } 43} 44 45/// A fully-deserialized plc operation 46/// 47/// including the plc's wrapping with timestmap and nullified state 48#[derive(Debug, Clone, Deserialize, Serialize)] 49#[serde(rename_all = "camelCase")] 50pub struct Op { 51 pub did: String, 52 pub cid: String, 53 pub created_at: Dt, 54 pub nullified: bool, 55 pub operation: Box<serde_json::value::RawValue>, 56} 57 58#[cfg(test)] 59impl PartialEq for Op { 60 fn eq(&self, other: &Self) -> bool { 61 self.did == other.did 62 && self.cid == other.cid 63 && self.created_at == other.created_at 64 && self.nullified == other.nullified 65 && serde_json::from_str::<serde_json::Value>(self.operation.get()).unwrap() 66 == serde_json::from_str::<serde_json::Value>(other.operation.get()).unwrap() 67 } 68} 69 70/// Database primary key for an op 71#[derive(Debug, PartialEq)] 72pub struct OpKey { 73 pub did: String, 74 pub cid: String, 75} 76 77impl From<&Op> for OpKey { 78 fn from(Op { did, cid, .. }: &Op) -> Self { 79 Self { 80 did: did.to_string(), 81 cid: cid.to_string(), 82 } 83 } 84} 85 86/// page forwarder who drops its channels on receipt of a small page 87/// 88/// PLC will return up to 1000 ops on a page, and returns full pages until it 89/// has caught up, so this is a (hacky?) way to stop polling once we're up. 90pub async fn full_pages( 91 mut rx: mpsc::Receiver<ExportPage>, 92 tx: mpsc::Sender<ExportPage>, 93) -> anyhow::Result<&'static str> { 94 while let Some(page) = rx.recv().await { 95 let n = page.ops.len(); 96 if n < 900 { 97 let last_age = page.ops.last().map(|op| chrono::Utc::now() - op.created_at); 98 let Some(age) = last_age else { 99 log::info!("full_pages done, empty final page"); 100 return Ok("full pages (hmm)"); 101 }; 102 if age <= chrono::TimeDelta::hours(6) { 103 log::info!("full_pages done, final page of {n} ops"); 104 } else { 105 log::warn!("full_pages finished with small page of {n} ops, but it's {age} old"); 106 } 107 return Ok("full pages (cool)"); 108 } 109 log::trace!("full_pages: continuing with page of {n} ops"); 110 tx.send(page).await?; 111 } 112 Err(anyhow::anyhow!( 113 "full_pages ran out of source material, sender closed" 114 )) 115} 116 117pub async fn pages_to_stdout( 118 mut rx: mpsc::Receiver<ExportPage>, 119 notify_last_at: Option<oneshot::Sender<Option<Dt>>>, 120) -> anyhow::Result<&'static str> { 121 let mut last_at = None; 122 while let Some(page) = rx.recv().await { 123 for op in &page.ops { 124 println!("{}", serde_json::to_string(op)?); 125 } 126 if notify_last_at.is_some() 127 && let Some(s) = PageBoundaryState::new(&page) 128 { 129 last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at)); 130 } 131 } 132 if let Some(notify) = notify_last_at { 133 log::trace!("notifying last_at: {last_at:?}"); 134 if notify.send(last_at).is_err() { 135 log::error!("receiver for last_at dropped, can't notify"); 136 }; 137 } 138 Ok("pages_to_stdout") 139} 140 141pub async fn invalid_ops_to_stdout( 142 mut rx: mpsc::Receiver<(String, Dt, String)>, 143) -> anyhow::Result<&'static str> { 144 while let Some((did, at, cid)) = rx.recv().await { 145 let val = serde_json::json!({ 146 "did": did, 147 "at": at, 148 "cid": cid, 149 }); 150 println!("{val}"); 151 } 152 Ok("invalid_ops_to_stdout") 153} 154 155pub async fn file_to_invalid_ops( 156 path: impl AsRef<std::path::Path>, 157 tx: mpsc::Sender<(String, Dt, String)>, 158) -> anyhow::Result<&'static str> { 159 let file = tokio::fs::File::open(path).await?; 160 161 use tokio::io::AsyncBufReadExt; 162 let mut lines = tokio::io::BufReader::new(file).lines(); 163 while let Some(line) = lines.next_line().await? { 164 #[derive(serde::Deserialize)] 165 struct Op { 166 did: String, 167 at: Dt, 168 cid: String, 169 } 170 let op: Op = serde_json::from_str(&line)?; 171 tx.send((op.did, op.at, op.cid)).await?; 172 } 173 174 Ok("invalid_ops_to_stdout") 175} 176 177pub fn logo(name: &str) -> String { 178 format!( 179 r" 180 181 \ | | | | 182 _ \ | | -_) _` | -_) _` | | | | ({name}) 183 _/ _\ _| _| \___| \__, | \___| \__,_| _| \_, | (v{}) 184 ____| __/ 185", 186 env!("CARGO_PKG_VERSION"), 187 ) 188}