Server tools to backfill, tail, mirror, and verify PLC logs
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

allow audit to fix ops using upstream

dawn bb4a7328 af1ceb98

+126 -39
+1 -1
src/bin/allegedly.rs
··· 126 126 } 127 127 Commands::Mirror { args, .. } => mirror::run(globals, args, true).await?, 128 128 Commands::Wrap { args, .. } => mirror::run(globals, args, false).await?, 129 - Commands::Audit { args, .. } => audit::run(args).await?, 129 + Commands::Audit { args, .. } => audit::run(globals, args).await?, 130 130 Commands::Tail { after } => { 131 131 let mut url = globals.upstream; 132 132 url.set_path("/export");
+11 -6
src/bin/audit.rs
··· 1 1 use allegedly::{ 2 2 FjallDb, audit_fjall, 3 - bin::{InstrumentationArgs, bin_init}, 4 - drop_invalid_ops_fjall, file_to_invalid_ops, invalid_ops_to_stdout, logo, 3 + bin::{GlobalArgs, InstrumentationArgs, bin_init}, 4 + file_to_invalid_ops, fix_ops_fjall, invalid_ops_to_stdout, logo, 5 5 }; 6 6 use clap::Parser; 7 7 use std::path::PathBuf; ··· 12 12 /// path to a local fjall database directory 13 13 #[arg(long, env = "ALLEGEDLY_FJALL")] 14 14 fjall: Option<PathBuf>, 15 - /// path to a file containing invalid ops to fix 15 + /// path to a file containing invalid ops to fix using upstream 16 16 #[arg(long, env = "ALLEGEDLY_FIX")] 17 17 fix: Option<PathBuf>, 18 + /// drop invalid ops instead of trying to fix them from upstream 19 + #[arg(long, env = "ALLEGEDLY_DROP")] 20 + drop: bool, 18 21 } 19 22 20 - pub async fn run(Args { fjall, fix }: Args) -> anyhow::Result<()> { 23 + pub async fn run(globals: GlobalArgs, Args { fjall, fix, drop }: Args) -> anyhow::Result<()> { 21 24 let mut tasks = JoinSet::new(); 22 25 23 26 if let Some(fjall) = fjall { ··· 26 29 27 30 if let Some(fix) = fix { 28 31 tasks.spawn(file_to_invalid_ops(fix, invalid_ops_tx)); 29 - tasks.spawn(drop_invalid_ops_fjall(db, invalid_ops_rx)); 32 + tasks.spawn(fix_ops_fjall(db, globals.upstream, drop, invalid_ops_rx)); 30 33 } else { 31 34 tasks.spawn(audit_fjall(db, invalid_ops_tx)); 32 35 tasks.spawn(invalid_ops_to_stdout(invalid_ops_rx)); ··· 61 64 #[derive(Debug, Parser)] 62 65 struct CliArgs { 63 66 #[command(flatten)] 67 + globals: GlobalArgs, 68 + #[command(flatten)] 64 69 instrumentation: InstrumentationArgs, 65 70 #[command(flatten)] 66 71 args: Args, ··· 72 77 let args = CliArgs::parse(); 73 78 bin_init(args.instrumentation.enable_opentelemetry); 74 79 log::info!("{}", logo("audit")); 75 - run(args.args).await?; 80 + run(args.globals, args.args).await?; 76 81 Ok(()) 77 82 }
+19 -18
src/lib.rs
··· 20 20 pub use cached_value::{CachedValue, Fetcher}; 21 21 pub use client::{CLIENT, UA}; 22 22 pub use mirror::{ExperimentalConf, ListenConf, serve, serve_fjall}; 23 - pub use plc_fjall::{FjallDb, audit as audit_fjall, backfill_to_fjall, pages_to_fjall, drop_invalid_ops as drop_invalid_ops_fjall}; 23 + pub use plc_fjall::{ 24 + FjallDb, audit as audit_fjall, backfill_to_fjall, fix_ops as fix_ops_fjall, pages_to_fjall, 25 + }; 24 26 pub use plc_pg::{Db, backfill_to_pg, pages_to_pg}; 25 27 pub use poll::{PageBoundaryState, get_page, poll_upstream}; 26 28 pub use ratelimit::{CreatePlcOpLimiter, GovernorMiddleware, IpLimiters}; ··· 138 140 Ok("pages_to_stdout") 139 141 } 140 142 143 + #[derive(Debug, Clone, Serialize, Deserialize)] 144 + pub struct InvalidOp { 145 + pub did: String, 146 + pub at: Dt, 147 + pub cid: String, 148 + } 149 + 141 150 pub async fn invalid_ops_to_stdout( 142 - mut rx: mpsc::Receiver<(String, Dt, String)>, 151 + mut rx: mpsc::Receiver<InvalidOp>, 143 152 ) -> anyhow::Result<&'static str> { 144 - while let Some((did, at, cid)) = rx.recv().await { 145 - let val = serde_json::json!({ 146 - "did": did, 147 - "at": at, 148 - "cid": cid, 149 - }); 150 - println!("{val}"); 153 + while let Some(op) = rx.recv().await { 154 + use std::io::{Write, stdout}; 155 + let mut stdout = stdout().lock(); 156 + serde_json::to_writer(&mut stdout, &op)?; 157 + stdout.write_all(b"\n")?; 151 158 } 152 159 Ok("invalid_ops_to_stdout") 153 160 } 154 161 155 162 pub async fn file_to_invalid_ops( 156 163 path: impl AsRef<std::path::Path>, 157 - tx: mpsc::Sender<(String, Dt, String)>, 164 + tx: mpsc::Sender<InvalidOp>, 158 165 ) -> anyhow::Result<&'static str> { 159 166 let file = tokio::fs::File::open(path).await?; 160 167 161 168 use tokio::io::AsyncBufReadExt; 162 169 let mut lines = tokio::io::BufReader::new(file).lines(); 163 170 while let Some(line) = lines.next_line().await? { 164 - #[derive(serde::Deserialize)] 165 - struct Op { 166 - did: String, 167 - at: Dt, 168 - cid: String, 169 - } 170 - let op: Op = serde_json::from_str(&line)?; 171 - tx.send((op.did, op.at, op.cid)).await?; 171 + let op: InvalidOp = serde_json::from_str(&line)?; 172 + tx.send(op).await?; 172 173 } 173 174 174 175 Ok("invalid_ops_to_stdout")
+95 -14
src/plc_fjall.rs
··· 1 1 use crate::{ 2 - BundleSource, Dt, ExportPage, Op as CommonOp, PageBoundaryState, Week, 2 + BundleSource, Dt, ExportPage, InvalidOp, Op as CommonOp, PageBoundaryState, Week, 3 3 crypto::{AssuranceResults, DidKey, Signature, assure_valid_sig}, 4 4 }; 5 5 use anyhow::Context; ··· 1148 1148 Ok(()) 1149 1149 } 1150 1150 1151 - pub fn audit( 1152 - &self, 1153 - invalid_ops_tx: mpsc::Sender<(String, Dt, String)>, 1154 - ) -> anyhow::Result<(usize, usize)> { 1151 + pub fn audit(&self, invalid_ops_tx: mpsc::Sender<InvalidOp>) -> anyhow::Result<(usize, usize)> { 1155 1152 use std::sync::mpsc; 1156 1153 1157 1154 let ops = self.inner.by_did.len()?; ··· 1178 1175 while let Ok((did_prefix, ops)) = rx.recv() { 1179 1176 let did = decode_did(&did_prefix[..did_prefix.len() - 1]); 1180 1177 for (ts, cid, op) in &ops { 1178 + let send_invalid = || { 1179 + let _ = invalid_ops_tx.blocking_send(InvalidOp { 1180 + did: did.clone(), 1181 + at: ts.clone(), 1182 + cid: cid.to_string(), 1183 + }); 1184 + }; 1181 1185 checked += 1; 1182 1186 let prev_op = op.operation.prev.as_ref().and_then(|expected| { 1183 1187 ops.iter().find(|(_, c, _)| c == expected) ··· 1186 1190 if !prev_cid_ok { 1187 1191 log::error!("audit: op {did} {cid} prev cid mismatch or missing predecessor, is db corrupted?"); 1188 1192 failed += 1; 1189 - let _ = invalid_ops_tx.blocking_send((did.clone(), ts.clone(), cid.to_string())); 1193 + send_invalid(); 1190 1194 continue; 1191 1195 } 1192 1196 let prev_stored = prev_op.map(|(_, _, p)| &p.operation); ··· 1201 1205 .join("\n "); 1202 1206 log::warn!("audit: invalid op {} {}:\n {msg}", did, cid); 1203 1207 failed += 1; 1204 - let _ = invalid_ops_tx.blocking_send((did.clone(), ts.clone(), cid.to_string())); 1208 + send_invalid(); 1205 1209 } 1206 1210 } 1207 1211 Err(e) => { 1208 1212 log::warn!("audit: invalid op {} {}: {e}", did, cid); 1209 1213 failed += 1; 1210 - let _ = invalid_ops_tx.blocking_send((did.clone(), ts.clone(), cid.to_string())); 1214 + send_invalid(); 1211 1215 } 1212 1216 } 1213 1217 } ··· 1450 1454 1451 1455 pub async fn audit( 1452 1456 db: FjallDb, 1453 - invalid_ops_tx: mpsc::Sender<(String, Dt, String)>, 1457 + invalid_ops_tx: mpsc::Sender<InvalidOp>, 1454 1458 ) -> anyhow::Result<&'static str> { 1455 1459 log::info!("starting fjall audit..."); 1456 1460 let t0 = std::time::Instant::now(); ··· 1465 1469 Ok("audit_fjall") 1466 1470 } 1467 1471 1468 - pub async fn drop_invalid_ops( 1472 + pub async fn fix_ops( 1469 1473 db: FjallDb, 1470 - mut invalid_ops_rx: mpsc::Receiver<(String, Dt, String)>, 1474 + upstream: reqwest::Url, 1475 + only_drop: bool, 1476 + mut invalid_ops_rx: mpsc::Receiver<InvalidOp>, 1471 1477 ) -> anyhow::Result<&'static str> { 1472 - while let Some((did, at, cid)) = invalid_ops_rx.recv().await { 1473 - db.drop_op(&did, &at, &cid)?; 1478 + log::info!("starting fjall fix ops..."); 1479 + let mut fixed_dids = std::collections::HashSet::new(); 1480 + let mut count = 0; 1481 + 1482 + let latest_at = db 1483 + .get_latest()? 1484 + .ok_or_else(|| anyhow::anyhow!("db not backfilled? expected at least one op"))?; 1485 + 1486 + while let Some(op) = invalid_ops_rx.recv().await { 1487 + let InvalidOp { did, at, cid, .. } = op; 1488 + 1489 + if only_drop { 1490 + db.drop_op(&did, &at, &cid)?; 1491 + db.persist(PersistMode::Buffer)?; 1492 + count += 1; 1493 + continue; 1494 + } 1495 + 1496 + if fixed_dids.contains(&did) { 1497 + continue; 1498 + } 1499 + 1500 + log::trace!("fetching upstream ops to fix did: {did}"); 1501 + let mut url = upstream.clone(); 1502 + url.set_path(&format!("/{did}/log/audit")); 1503 + 1504 + let resp = crate::CLIENT.get(url).send().await?; 1505 + 1506 + use reqwest::StatusCode; 1507 + let ops: Vec<CommonOp> = match resp.status() { 1508 + StatusCode::OK => match resp.json().await { 1509 + Ok(ops) => ops, 1510 + Err(e) => { 1511 + log::warn!("failed to parse upstream ops for {did}: {e}"); 1512 + continue; 1513 + } 1514 + }, 1515 + StatusCode::NOT_FOUND => { 1516 + log::trace!("did not found upstream: {did}"); 1517 + Vec::new() // this essentially means drop the whole did 1518 + } 1519 + s => { 1520 + log::warn!("failed to fetch upstream for {did}: {s}"); 1521 + continue; 1522 + } 1523 + }; 1524 + 1525 + log::trace!("fetched {} ops for {did}", ops.len()); 1526 + 1527 + // we drop all ops first just to be safe 1528 + let existing = db.ops_for_did(&did)?; 1529 + for op in existing { 1530 + let op = op?; 1531 + db.drop_op(&did, &op.created_at, &op.cid)?; 1532 + } 1533 + 1534 + // then insert the fresh ops 1535 + for op in ops { 1536 + // skip newer ops, since we will fill them in later anyway 1537 + // if we don't skip these we might miss some ops in between 1538 + // the latest_at we started with vs the one we ended up with 1539 + if op.created_at > latest_at { 1540 + log::trace!( 1541 + "skipping op {} for {did} because it is newer than latest_at {latest_at}", 1542 + op.cid 1543 + ); 1544 + continue; 1545 + } 1546 + 1547 + count += db.insert_op::<true>(&op)?; 1548 + } 1549 + 1550 + db.persist(PersistMode::Buffer)?; 1551 + fixed_dids.insert(did); 1474 1552 } 1475 - Ok("drop_invalid_ops") 1553 + 1554 + log::info!("fixed {count} ops"); 1555 + 1556 + Ok("fix_ops_fjall") 1476 1557 } 1477 1558 1478 1559 #[cfg(test)]