Server tools to backfill, tail, mirror, and verify PLC logs
50
fork

Configure Feed

Select the types of activity you want to include in your feed.

weekly exporter in rust

phil 26a92ddb f6f4e723

+144 -4
+1
.gitignore
··· 1 1 /target 2 + weekly/
+1
Cargo.lock
··· 123 123 "futures-core", 124 124 "futures-io", 125 125 "pin-project-lite", 126 + "tokio", 126 127 ] 127 128 128 129 [[package]]
+1 -1
Cargo.toml
··· 6 6 7 7 [dependencies] 8 8 anyhow = "1.0.99" 9 - async-compression = { version = "0.4.30", features = ["futures-io", "gzip"] } 9 + async-compression = { version = "0.4.30", features = ["futures-io", "tokio", "gzip"] } 10 10 chrono = { version = "0.4.42", features = ["serde"] } 11 11 clap = { version = "4.5.47", features = ["derive", "env"] } 12 12 env_logger = "0.11.8"
+53
src/bin/bundle-weekly.rs
··· 1 + use allegedly::{bin_init, pages_to_weeks, poll_upstream}; 2 + use clap::Parser; 3 + use std::path::PathBuf; 4 + use url::Url; 5 + 6 + const PAGE_QUEUE_SIZE: usize = 128; 7 + 8 + #[derive(Parser)] 9 + struct Args { 10 + /// Upstream PLC server to poll 11 + /// 12 + /// default: https://plc.directory 13 + #[arg(long, env)] 14 + #[clap(default_value = "https://plc.directory")] 15 + upstream: Url, 16 + /// Directory to save gzipped weekly bundles 17 + /// 18 + /// default: ./weekly/ 19 + #[arg(long, env)] 20 + #[clap(default_value = "./weekly/")] 21 + dir: PathBuf, 22 + /// The week to start from 23 + /// 24 + /// Must be a week-truncated unix timestamp 25 + #[arg(long, env)] 26 + start_at: Option<u64>, // TODO!! 27 + } 28 + 29 + #[tokio::main] 30 + async fn main() -> anyhow::Result<()> { 31 + bin_init("weekly"); 32 + let args = Args::parse(); 33 + 34 + let mut url = args.upstream; 35 + url.set_path("/export"); 36 + 37 + log::trace!("ensure weekly output directory exists"); 38 + std::fs::create_dir_all(&args.dir)?; 39 + 40 + let (tx, rx) = flume::bounded(PAGE_QUEUE_SIZE); 41 + 42 + tokio::task::spawn(async move { 43 + if let Err(e) = poll_upstream(None /*todo*/, url, tx).await { 44 + log::error!("polling failed: {e}"); 45 + } else { 46 + log::warn!("poller finished ok (weird?)"); 47 + } 48 + }); 49 + 50 + pages_to_weeks(rx, args.dir).await?; 51 + 52 + Ok(()) 53 + }
+3 -1
src/lib.rs
··· 4 4 mod client; 5 5 mod plc_pg; 6 6 mod poll; 7 + mod weekly; 7 8 8 9 pub use backfill::week_to_pages; 9 10 pub use client::CLIENT; 10 11 pub use plc_pg::Db; 11 - pub use poll::poll_upstream; 12 + pub use poll::{get_page, poll_upstream}; 13 + pub use weekly::{Week, pages_to_weeks}; 12 14 13 15 pub type Dt = chrono::DateTime<chrono::Utc>; 14 16
+9 -2
src/poll.rs
··· 18 18 /// we assume that the order will at least be deterministic: this may be unsound 19 19 #[derive(Debug, PartialEq)] 20 20 pub struct LastOp { 21 - created_at: Dt, // any op greater is definitely not duplicated 21 + pub created_at: Dt, // any op greater is definitely not duplicated 22 22 pk: (String, String), // did, cid 23 23 } 24 24 ··· 117 117 page.only_after_last(pl); 118 118 } 119 119 if !page.is_empty() { 120 - dest.send_async(page).await?; 120 + match dest.try_send(page) { 121 + Ok(()) => {} 122 + Err(flume::TrySendError::Full(page)) => { 123 + log::warn!("export: destination channel full, awaiting..."); 124 + dest.send_async(page).await?; 125 + } 126 + e => e?, 127 + }; 121 128 } 122 129 123 130 prev_last = next_last.or(prev_last);
+76
src/weekly.rs
··· 1 + use crate::{Dt, ExportPage, Op}; 2 + use async_compression::tokio::write::GzipEncoder; 3 + use std::path::PathBuf; 4 + use tokio::{fs::File, io::AsyncWriteExt}; 5 + 6 + const WEEK_IN_SECONDS: i64 = 7 * 86400; 7 + 8 + #[derive(Debug, Clone, Copy, PartialEq)] 9 + pub struct Week(i64); 10 + 11 + impl From<Dt> for Week { 12 + fn from(dt: Dt) -> Self { 13 + let ts = dt.timestamp(); 14 + let truncated = (ts / WEEK_IN_SECONDS) * WEEK_IN_SECONDS; 15 + Week(truncated) 16 + } 17 + } 18 + 19 + impl From<Week> for Dt { 20 + fn from(week: Week) -> Dt { 21 + let Week(ts) = week; 22 + Dt::from_timestamp(ts, 0).expect("the week to be in valid range") 23 + } 24 + } 25 + 26 + pub async fn pages_to_weeks(rx: flume::Receiver<ExportPage>, dir: PathBuf) -> anyhow::Result<()> { 27 + pub use std::time::Instant; 28 + 29 + // ...there is certainly a nicer way to write this 30 + let mut current_week: Option<Week> = None; 31 + let dummy_file = File::create(dir.join("_dummy")).await?; 32 + let mut encoder = GzipEncoder::new(dummy_file); 33 + 34 + let mut total_ops = 0; 35 + let total_t0 = Instant::now(); 36 + let mut week_ops = 0; 37 + let mut week_t0 = total_t0; 38 + let mut week = 0; 39 + 40 + while let Ok(page) = rx.recv_async().await { 41 + for mut s in page.ops { 42 + let Ok(op) = serde_json::from_str::<Op>(&s) 43 + .inspect_err(|e| log::error!("failed to parse plc op, ignoring: {e}")) 44 + else { 45 + continue; 46 + }; 47 + let op_week = op.created_at.into(); 48 + if current_week.map(|w| w != op_week).unwrap_or(true) { 49 + encoder.shutdown().await?; 50 + let now = Instant::now(); 51 + 52 + log::info!( 53 + "done week {week:3 } ({:10 }): {week_ops:7 } ({:5.0 }/s) ops, {:5 }k total ({:5.0 }/s)", 54 + current_week.unwrap_or(Week(0)).0, 55 + (week_ops as f64) / (now - week_t0).as_secs_f64(), 56 + total_ops / 1000, 57 + (total_ops as f64) / (now - total_t0).as_secs_f64(), 58 + ); 59 + 60 + let file = File::create(dir.join(format!("{}.jsonl.gz", op_week.0))).await?; 61 + encoder = GzipEncoder::with_quality(file, async_compression::Level::Best); 62 + current_week = Some(op_week); 63 + week_ops = 0; 64 + week_t0 = now; 65 + week += 1; 66 + } 67 + s.push('\n'); // hack 68 + log::trace!("writing: {s}"); 69 + encoder.write_all(s.as_bytes()).await?; 70 + total_ops += 1; 71 + week_ops += 1; 72 + } 73 + } 74 + 75 + Ok(()) 76 + }