Server tools to backfill, tail, mirror, and verify PLC logs
50
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 147 lines 4.4 kB view raw
1use serde::{Deserialize, Serialize}; 2use tokio::sync::{mpsc, oneshot}; 3 4mod backfill; 5mod cached_value; 6mod client; 7mod mirror; 8mod plc_pg; 9mod poll; 10mod ratelimit; 11mod weekly; 12 13pub mod bin; 14 15pub use backfill::backfill; 16pub use cached_value::{CachedValue, Fetcher}; 17pub use client::{CLIENT, UA}; 18pub use mirror::{ExperimentalConf, ListenConf, serve}; 19pub use plc_pg::{Db, backfill_to_pg, pages_to_pg}; 20pub use poll::{PageBoundaryState, get_page, poll_upstream}; 21pub use ratelimit::{CreatePlcOpLimiter, GovernorMiddleware, IpLimiters}; 22pub use weekly::{BundleSource, FolderSource, HttpSource, Week, pages_to_weeks, week_to_pages}; 23 24pub type Dt = chrono::DateTime<chrono::Utc>; 25 26/// One page of PLC export 27/// 28/// plc.directory caps /export at 1000 ops; backfill tasks may send more in a page. 29#[derive(Debug)] 30pub struct ExportPage { 31 pub ops: Vec<Op>, 32} 33 34impl ExportPage { 35 pub fn is_empty(&self) -> bool { 36 self.ops.is_empty() 37 } 38} 39 40/// A fully-deserialized plc operation 41/// 42/// including the plc's wrapping with timestmap and nullified state 43#[derive(Debug, Clone, Deserialize, Serialize)] 44#[serde(rename_all = "camelCase")] 45pub struct Op { 46 pub did: String, 47 pub cid: String, 48 pub created_at: Dt, 49 pub nullified: bool, 50 pub operation: Box<serde_json::value::RawValue>, 51} 52 53#[cfg(test)] 54impl PartialEq for Op { 55 fn eq(&self, other: &Self) -> bool { 56 self.did == other.did 57 && self.cid == other.cid 58 && self.created_at == other.created_at 59 && self.nullified == other.nullified 60 && serde_json::from_str::<serde_json::Value>(self.operation.get()).unwrap() 61 == serde_json::from_str::<serde_json::Value>(other.operation.get()).unwrap() 62 } 63} 64 65/// Database primary key for an op 66#[derive(Debug, PartialEq)] 67pub struct OpKey { 68 pub did: String, 69 pub cid: String, 70} 71 72impl From<&Op> for OpKey { 73 fn from(Op { did, cid, .. }: &Op) -> Self { 74 Self { 75 did: did.to_string(), 76 cid: cid.to_string(), 77 } 78 } 79} 80 81/// page forwarder who drops its channels on receipt of a small page 82/// 83/// PLC will return up to 1000 ops on a page, and returns full pages until it 84/// has caught up, so this is a (hacky?) way to stop polling once we're up. 85pub async fn full_pages( 86 mut rx: mpsc::Receiver<ExportPage>, 87 tx: mpsc::Sender<ExportPage>, 88) -> anyhow::Result<&'static str> { 89 while let Some(page) = rx.recv().await { 90 let n = page.ops.len(); 91 if n < 900 { 92 let last_age = page.ops.last().map(|op| chrono::Utc::now() - op.created_at); 93 let Some(age) = last_age else { 94 log::info!("full_pages done, empty final page"); 95 return Ok("full pages (hmm)"); 96 }; 97 if age <= chrono::TimeDelta::hours(6) { 98 log::info!("full_pages done, final page of {n} ops"); 99 } else { 100 log::warn!("full_pages finished with small page of {n} ops, but it's {age} old"); 101 } 102 return Ok("full pages (cool)"); 103 } 104 log::trace!("full_pages: continuing with page of {n} ops"); 105 tx.send(page).await?; 106 } 107 Err(anyhow::anyhow!( 108 "full_pages ran out of source material, sender closed" 109 )) 110} 111 112pub async fn pages_to_stdout( 113 mut rx: mpsc::Receiver<ExportPage>, 114 notify_last_at: Option<oneshot::Sender<Option<Dt>>>, 115) -> anyhow::Result<&'static str> { 116 let mut last_at = None; 117 while let Some(page) = rx.recv().await { 118 for op in &page.ops { 119 println!("{}", serde_json::to_string(op)?); 120 } 121 if notify_last_at.is_some() 122 && let Some(s) = PageBoundaryState::new(&page) 123 { 124 last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at)); 125 } 126 } 127 if let Some(notify) = notify_last_at { 128 log::trace!("notifying last_at: {last_at:?}"); 129 if notify.send(last_at).is_err() { 130 log::error!("receiver for last_at dropped, can't notify"); 131 }; 132 } 133 Ok("pages_to_stdout") 134} 135 136pub fn logo(name: &str) -> String { 137 format!( 138 r" 139 140 \ | | | | 141 _ \ | | -_) _` | -_) _` | | | | ({name}) 142 _/ _\ _| _| \___| \__, | \___| \__,_| _| \_, | (v{}) 143 ____| __/ 144", 145 env!("CARGO_PKG_VERSION"), 146 ) 147}