forked from
microcosm.blue/Allegedly
Server tools to backfill, tail, mirror, and verify PLC logs
1use serde::{Deserialize, Serialize};
2
3use tokio::sync::{mpsc, oneshot};
4
5mod backfill;
6mod cached_value;
7mod client;
8mod crypto;
9pub mod doc;
10mod mirror;
11mod plc_fjall;
12mod plc_pg;
13mod poll;
14mod ratelimit;
15mod weekly;
16
17pub mod bin;
18
19pub use backfill::backfill;
20pub use cached_value::{CachedValue, Fetcher};
21pub use client::{CLIENT, UA};
22pub use mirror::{ExperimentalConf, ListenConf, serve, serve_fjall};
23pub use plc_fjall::{FjallDb, audit as audit_fjall, backfill_to_fjall, pages_to_fjall, drop_invalid_ops as drop_invalid_ops_fjall};
24pub use plc_pg::{Db, backfill_to_pg, pages_to_pg};
25pub use poll::{PageBoundaryState, get_page, poll_upstream};
26pub use ratelimit::{CreatePlcOpLimiter, GovernorMiddleware, IpLimiters};
27pub use weekly::{BundleSource, FolderSource, HttpSource, Week, pages_to_weeks, week_to_pages};
28
29pub type Dt = chrono::DateTime<chrono::Utc>;
30
31/// One page of PLC export
32///
33/// plc.directory caps /export at 1000 ops; backfill tasks may send more in a page.
34#[derive(Debug)]
35pub struct ExportPage {
36 pub ops: Vec<Op>,
37}
38
39impl ExportPage {
40 pub fn is_empty(&self) -> bool {
41 self.ops.is_empty()
42 }
43}
44
45/// A fully-deserialized plc operation
46///
47/// including the plc's wrapping with timestmap and nullified state
48#[derive(Debug, Clone, Deserialize, Serialize)]
49#[serde(rename_all = "camelCase")]
50pub struct Op {
51 pub did: String,
52 pub cid: String,
53 pub created_at: Dt,
54 pub nullified: bool,
55 pub operation: Box<serde_json::value::RawValue>,
56}
57
58#[cfg(test)]
59impl PartialEq for Op {
60 fn eq(&self, other: &Self) -> bool {
61 self.did == other.did
62 && self.cid == other.cid
63 && self.created_at == other.created_at
64 && self.nullified == other.nullified
65 && serde_json::from_str::<serde_json::Value>(self.operation.get()).unwrap()
66 == serde_json::from_str::<serde_json::Value>(other.operation.get()).unwrap()
67 }
68}
69
70/// Database primary key for an op
71#[derive(Debug, PartialEq)]
72pub struct OpKey {
73 pub did: String,
74 pub cid: String,
75}
76
77impl From<&Op> for OpKey {
78 fn from(Op { did, cid, .. }: &Op) -> Self {
79 Self {
80 did: did.to_string(),
81 cid: cid.to_string(),
82 }
83 }
84}
85
86/// page forwarder who drops its channels on receipt of a small page
87///
88/// PLC will return up to 1000 ops on a page, and returns full pages until it
89/// has caught up, so this is a (hacky?) way to stop polling once we're up.
90pub async fn full_pages(
91 mut rx: mpsc::Receiver<ExportPage>,
92 tx: mpsc::Sender<ExportPage>,
93) -> anyhow::Result<&'static str> {
94 while let Some(page) = rx.recv().await {
95 let n = page.ops.len();
96 if n < 900 {
97 let last_age = page.ops.last().map(|op| chrono::Utc::now() - op.created_at);
98 let Some(age) = last_age else {
99 log::info!("full_pages done, empty final page");
100 return Ok("full pages (hmm)");
101 };
102 if age <= chrono::TimeDelta::hours(6) {
103 log::info!("full_pages done, final page of {n} ops");
104 } else {
105 log::warn!("full_pages finished with small page of {n} ops, but it's {age} old");
106 }
107 return Ok("full pages (cool)");
108 }
109 log::trace!("full_pages: continuing with page of {n} ops");
110 tx.send(page).await?;
111 }
112 Err(anyhow::anyhow!(
113 "full_pages ran out of source material, sender closed"
114 ))
115}
116
117pub async fn pages_to_stdout(
118 mut rx: mpsc::Receiver<ExportPage>,
119 notify_last_at: Option<oneshot::Sender<Option<Dt>>>,
120) -> anyhow::Result<&'static str> {
121 let mut last_at = None;
122 while let Some(page) = rx.recv().await {
123 for op in &page.ops {
124 println!("{}", serde_json::to_string(op)?);
125 }
126 if notify_last_at.is_some()
127 && let Some(s) = PageBoundaryState::new(&page)
128 {
129 last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at));
130 }
131 }
132 if let Some(notify) = notify_last_at {
133 log::trace!("notifying last_at: {last_at:?}");
134 if notify.send(last_at).is_err() {
135 log::error!("receiver for last_at dropped, can't notify");
136 };
137 }
138 Ok("pages_to_stdout")
139}
140
141pub async fn invalid_ops_to_stdout(
142 mut rx: mpsc::Receiver<(String, Dt, String)>,
143) -> anyhow::Result<&'static str> {
144 while let Some((did, at, cid)) = rx.recv().await {
145 let val = serde_json::json!({
146 "did": did,
147 "at": at,
148 "cid": cid,
149 });
150 println!("{val}");
151 }
152 Ok("invalid_ops_to_stdout")
153}
154
155pub async fn file_to_invalid_ops(
156 path: impl AsRef<std::path::Path>,
157 tx: mpsc::Sender<(String, Dt, String)>,
158) -> anyhow::Result<&'static str> {
159 let file = tokio::fs::File::open(path).await?;
160
161 use tokio::io::AsyncBufReadExt;
162 let mut lines = tokio::io::BufReader::new(file).lines();
163 while let Some(line) = lines.next_line().await? {
164 #[derive(serde::Deserialize)]
165 struct Op {
166 did: String,
167 at: Dt,
168 cid: String,
169 }
170 let op: Op = serde_json::from_str(&line)?;
171 tx.send((op.did, op.at, op.cid)).await?;
172 }
173
174 Ok("invalid_ops_to_stdout")
175}
176
177pub fn logo(name: &str) -> String {
178 format!(
179 r"
180
181 \ | | | |
182 _ \ | | -_) _` | -_) _` | | | | ({name})
183 _/ _\ _| _| \___| \__, | \___| \__,_| _| \_, | (v{})
184 ____| __/
185",
186 env!("CARGO_PKG_VERSION"),
187 )
188}