Server tools to backfill, tail, mirror, and verify PLC logs
50
fork

Configure Feed

Select the types of activity you want to include in your feed.

at debug 158 lines 4.6 kB view raw
1use serde::{Deserialize, Serialize}; 2use tokio::sync::{mpsc, oneshot}; 3 4mod backfill; 5mod client; 6mod mirror; 7mod plc_pg; 8mod poll; 9mod ratelimit; 10mod weekly; 11 12pub mod bin; 13 14pub use backfill::backfill; 15pub use client::{CLIENT, UA}; 16pub use mirror::{ListenConf, serve}; 17pub use plc_pg::{Db, backfill_to_pg, pages_to_pg}; 18pub use poll::{PageBoundaryState, get_page, poll_upstream}; 19pub use ratelimit::GovernorMiddleware; 20pub use weekly::{BundleSource, FolderSource, HttpSource, Week, pages_to_weeks, week_to_pages}; 21 22pub type Dt = chrono::DateTime<chrono::Utc>; 23 24/// One page of PLC export 25/// 26/// plc.directory caps /export at 1000 ops; backfill tasks may send more in a page. 27#[derive(Debug)] 28pub struct ExportPage { 29 pub ops: Vec<Op>, 30} 31 32impl ExportPage { 33 pub fn is_empty(&self) -> bool { 34 self.ops.is_empty() 35 } 36} 37 38/// A fully-deserialized plc operation 39/// 40/// including the plc's wrapping with timestmap and nullified state 41#[derive(Debug, Clone, Deserialize, Serialize)] 42#[serde(rename_all = "camelCase")] 43pub struct Op { 44 pub did: String, 45 pub cid: String, 46 pub created_at: Dt, 47 pub nullified: bool, 48 pub operation: Box<serde_json::value::RawValue>, 49} 50 51#[cfg(test)] 52impl PartialEq for Op { 53 fn eq(&self, other: &Self) -> bool { 54 self.did == other.did 55 && self.cid == other.cid 56 && self.created_at == other.created_at 57 && self.nullified == other.nullified 58 && serde_json::from_str::<serde_json::Value>(self.operation.get()).unwrap() 59 == serde_json::from_str::<serde_json::Value>(other.operation.get()).unwrap() 60 } 61} 62 63/// Database primary key for an op 64#[derive(Debug, PartialEq)] 65pub struct OpKey { 66 pub did: String, 67 pub cid: String, 68} 69 70impl From<&Op> for OpKey { 71 fn from(Op { did, cid, .. }: &Op) -> Self { 72 Self { 73 did: did.to_string(), 74 cid: cid.to_string(), 75 } 76 } 77} 78 79/// page forwarder who drops its channels on receipt of a small page 80/// 81/// PLC will return up to 1000 ops on a page, and returns full pages until it 82/// has caught up, so this is a (hacky?) way to stop polling once we're up. 83pub async fn full_pages( 84 mut rx: mpsc::Receiver<ExportPage>, 85 tx: mpsc::Sender<ExportPage>, 86) -> anyhow::Result<&'static str> { 87 while let Some(page) = rx.recv().await { 88 let n = page.ops.len(); 89 if n < 900 { 90 let last_age = page.ops.last().map(|op| chrono::Utc::now() - op.created_at); 91 let Some(age) = last_age else { 92 log::info!("full_pages done, empty final page"); 93 return Ok("full pages (hmm)"); 94 }; 95 if age <= chrono::TimeDelta::hours(6) { 96 log::info!("full_pages done, final page of {n} ops"); 97 } else { 98 log::warn!("full_pages finished with small page of {n} ops, but it's {age} old"); 99 } 100 return Ok("full pages (cool)"); 101 } 102 log::trace!("full_pages: continuing with page of {n} ops"); 103 tx.send(page).await?; 104 } 105 Err(anyhow::anyhow!( 106 "full_pages ran out of source material, sender closed" 107 )) 108} 109 110pub async fn pages_to_stdout( 111 mut rx: mpsc::Receiver<ExportPage>, 112 notify_last_at: Option<oneshot::Sender<Option<Dt>>>, 113) -> anyhow::Result<&'static str> { 114 let mut last_at = None; 115 while let Some(page) = rx.recv().await { 116 for op in &page.ops { 117 println!("{}", serde_json::to_string(op)?); 118 } 119 if notify_last_at.is_some() 120 && let Some(s) = PageBoundaryState::new(&page) 121 { 122 last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at)); 123 } 124 } 125 if let Some(notify) = notify_last_at { 126 log::trace!("notifying last_at: {last_at:?}"); 127 if notify.send(last_at).is_err() { 128 log::error!("receiver for last_at dropped, can't notify"); 129 }; 130 } 131 Ok("pages_to_stdout") 132} 133 134pub fn logo(name: &str) -> String { 135 format!( 136 r" 137 138 \ | | | | 139 _ \ | | -_) _` | -_) _` | | | | ({name}) 140 _/ _\ _| _| \___| \__, | \___| \__,_| _| \_, | (v{}) 141 ____| __/ 142", 143 env!("CARGO_PKG_VERSION"), 144 ) 145} 146 147pub fn bin_init(name: &str) { 148 if std::env::var_os("RUST_LOG").is_none() { 149 unsafe { std::env::set_var("RUST_LOG", "info") }; 150 } 151 let filter = tracing_subscriber::EnvFilter::from_default_env(); 152 tracing_subscriber::fmt() 153 .with_writer(std::io::stderr) 154 .with_env_filter(filter) 155 .init(); 156 157 log::info!("{}", logo(name)); 158}