Server tools to backfill, tail, mirror, and verify PLC logs
49
fork

Configure Feed

Select the types of activity you want to include in your feed.

move weekly export to the cli

phil 7d311d09 e39abc5d

+57 -62
+7 -1
readme.md
··· 4 4 5 5 Allegedly can 6 6 7 - - Tail PLC ops to stdout 7 + - Tail PLC ops to stdout: `allegedly tail | jq` 8 + - Export PLC ops to weekly gzipped bundles: `allegdly bundle --dest ./some-folder` 9 + 10 + (add `--help` to any command for more info about it) 11 + 12 + also can: 13 + 8 14 - Copy ops to postgres for a mirror running the [reference typescript implementation](https://github.com/did-method-plc/did-method-plc)
-55
src/bin/bundle-weekly.rs
··· 1 - use allegedly::{Week, bin_init, pages_to_weeks, poll_upstream}; 2 - use clap::Parser; 3 - use std::path::PathBuf; 4 - use url::Url; 5 - 6 - const PAGE_QUEUE_SIZE: usize = 128; 7 - 8 - #[derive(Parser)] 9 - struct Args { 10 - /// Upstream PLC server to poll 11 - /// 12 - /// default: https://plc.directory 13 - #[arg(long, env)] 14 - #[clap(default_value = "https://plc.directory")] 15 - upstream: Url, 16 - /// Directory to save gzipped weekly bundles 17 - /// 18 - /// default: ./weekly/ 19 - #[arg(long, env)] 20 - #[clap(default_value = "./weekly/")] 21 - dir: PathBuf, 22 - /// The week to start from 23 - /// 24 - /// Must be a week-truncated unix timestamp 25 - #[arg(long, env)] 26 - start_at: Option<i64>, 27 - } 28 - 29 - #[tokio::main] 30 - async fn main() -> anyhow::Result<()> { 31 - bin_init("weekly"); 32 - let args = Args::parse(); 33 - 34 - let mut url = args.upstream; 35 - url.set_path("/export"); 36 - 37 - let after = args.start_at.map(|n| Week::from_n(n).into()); 38 - 39 - log::trace!("ensure weekly output directory exists"); 40 - std::fs::create_dir_all(&args.dir)?; 41 - 42 - let (tx, rx) = flume::bounded(PAGE_QUEUE_SIZE); 43 - 44 - tokio::task::spawn(async move { 45 - if let Err(e) = poll_upstream(after, url, tx).await { 46 - log::error!("polling failed: {e}"); 47 - } else { 48 - log::warn!("poller finished ok (weird?)"); 49 - } 50 - }); 51 - 52 - pages_to_weeks(rx, args.dir).await?; 53 - 54 - Ok(()) 55 - }
+37 -2
src/bin/main.rs
··· 1 - use allegedly::{Dt, bin_init, poll_upstream}; 1 + use allegedly::{Dt, bin_init, pages_to_weeks, poll_upstream}; 2 2 use clap::{Parser, Subcommand}; 3 + use std::path::PathBuf; 3 4 use url::Url; 4 5 5 6 #[derive(Debug, Parser)] ··· 14 15 15 16 #[derive(Debug, Subcommand)] 16 17 enum Commands { 18 + /// Scrape a PLC server, collecting ops into weekly bundles 19 + /// 20 + /// Bundles are gzipped files named `<WEEK>.jsonl.gz` where WEEK is a unix 21 + /// timestamp rounded down to a multiple of 604,800 (one week in seconds). 22 + /// 23 + /// Will stop by default at floor((now - 73hrs) / one week) * one week. PLC 24 + /// operations can be invalidated within 72 hrs, so stopping before that 25 + /// time ensures that the bundles are (hopefully) immutable. 26 + Bundle { 27 + /// Where to save the bundled files 28 + #[arg(short, long)] 29 + #[clap(default_value = "./weekly/")] 30 + dest: PathBuf, 31 + /// Start the export from this time. Should be a week boundary. 32 + #[arg(short, long)] 33 + #[clap(default_value = "2022-11-17T00:00:00Z")] 34 + after: Dt, 35 + /// Overwrite existing files, if present 36 + #[arg(long, action)] 37 + clobber: bool, 38 + }, 17 39 /// Poll an upstream PLC server and log new ops to stdout 18 40 Tail { 19 41 /// Begin tailing from a specific timestamp for replay or wait-until ··· 29 51 let args = Cli::parse(); 30 52 31 53 match args.command { 54 + Commands::Bundle { 55 + dest, 56 + after, 57 + clobber, 58 + } => { 59 + let mut url = args.upstream; 60 + url.set_path("/export"); 61 + let (tx, rx) = flume::bounded(32); // read ahead if gzip stalls for some reason 62 + tokio::task::spawn(async move { poll_upstream(Some(after), url, tx).await.unwrap() }); 63 + log::trace!("ensuring output directory exists"); 64 + std::fs::create_dir_all(&dest).unwrap(); 65 + pages_to_weeks(rx, dest, clobber).await.unwrap(); 66 + } 32 67 Commands::Tail { after } => { 33 68 let mut url = args.upstream; 34 69 url.set_path("/export"); 35 70 let start_at = after.or_else(|| Some(chrono::Utc::now())); 36 - let (tx, rx) = flume::bounded(0); // rendezvous 71 + let (tx, rx) = flume::bounded(0); // rendezvous, don't read ahead 37 72 tokio::task::spawn(async move { poll_upstream(start_at, url, tx).await.unwrap() }); 38 73 loop { 39 74 for op in rx.recv_async().await.unwrap().ops {
+2 -1
src/poll.rs
··· 3 3 use thiserror::Error; 4 4 use url::Url; 5 5 6 - const UPSTREAM_REQUEST_INTERVAL: Duration = Duration::from_millis(500); 6 + // plc.directory ratelimit on /export is 500 per 5 mins 7 + const UPSTREAM_REQUEST_INTERVAL: Duration = Duration::from_millis(600); 7 8 8 9 #[derive(Debug, Error)] 9 10 pub enum GetPageError {
+11 -3
src/weekly.rs
··· 34 34 } 35 35 } 36 36 37 - pub async fn pages_to_weeks(rx: flume::Receiver<ExportPage>, dir: PathBuf) -> anyhow::Result<()> { 37 + pub async fn pages_to_weeks( 38 + rx: flume::Receiver<ExportPage>, 39 + dir: PathBuf, 40 + clobber: bool, 41 + ) -> anyhow::Result<()> { 38 42 pub use std::time::Instant; 39 43 40 44 // ...there is certainly a nicer way to write this ··· 67 71 total_ops / 1000, 68 72 (total_ops as f64) / (now - total_t0).as_secs_f64(), 69 73 ); 70 - 71 - let file = File::create(dir.join(format!("{}.jsonl.gz", op_week.0))).await?; 74 + let path = dir.join(format!("{}.jsonl.gz", op_week.0)); 75 + let file = if clobber { 76 + File::create(path).await? 77 + } else { 78 + File::create_new(path).await? 79 + }; 72 80 encoder = GzipEncoder::with_quality(file, async_compression::Level::Best); 73 81 current_week = Some(op_week); 74 82 week_ops = 0;