Server tools to backfill, tail, mirror, and verify PLC logs
50
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 207 lines 6.4 kB view raw
1use allegedly::{ 2 Db, Dt, ExportPage, FolderSource, HttpSource, backfill, backfill_to_pg, 3 bin::{GlobalArgs, bin_init}, 4 full_pages, logo, pages_to_pg, pages_to_stdout, poll_upstream, 5}; 6use clap::Parser; 7use reqwest::Url; 8use std::{path::PathBuf, time::Duration}; 9use tokio::{ 10 sync::{mpsc, oneshot}, 11 task::JoinSet, 12}; 13 14pub const DEFAULT_HTTP: &str = "https://plc.t3.storage.dev/plc.directory/"; 15 16#[derive(Debug, clap::Args)] 17pub struct Args { 18 /// Remote URL prefix to fetch bundles from 19 #[arg(long)] 20 #[clap(default_value = DEFAULT_HTTP)] 21 http: Url, 22 /// Local folder to fetch bundles from (overrides `http`) 23 #[arg(long)] 24 dir: Option<PathBuf>, 25 /// Don't do weekly bulk-loading at all. 26 /// 27 /// overrides `http` and `dir`, makes catch_up redundant 28 #[arg(long, action)] 29 no_bulk: bool, 30 /// Parallel bundle fetchers 31 /// 32 /// Default: 4 for http fetches, 1 for local folder 33 #[arg(long)] 34 source_workers: Option<usize>, 35 /// Bulk load into did-method-plc-compatible postgres instead of stdout 36 /// 37 /// Pass a postgres connection url like "postgresql://localhost:5432" 38 #[arg(long, env = "ALLEGEDLY_TO_POSTGRES")] 39 to_postgres: Option<Url>, 40 /// Cert for postgres (if needed) 41 #[arg(long)] 42 postgres_cert: Option<PathBuf>, 43 /// Delete all operations from the postgres db before starting 44 /// 45 /// only used if `--to-postgres` is present 46 #[arg(long, action)] 47 postgres_reset: bool, 48 /// Stop at the week ending before this date 49 #[arg(long)] 50 until: Option<Dt>, 51 /// After the weekly imports, poll upstream until we're caught up 52 #[arg(long, action)] 53 catch_up: bool, 54} 55 56pub async fn run( 57 GlobalArgs { 58 upstream, 59 upstream_throttle_ms, 60 }: GlobalArgs, 61 Args { 62 http, 63 dir, 64 no_bulk, 65 source_workers, 66 to_postgres, 67 postgres_cert, 68 postgres_reset, 69 until, 70 catch_up, 71 }: Args, 72) -> anyhow::Result<()> { 73 let mut tasks = JoinSet::<anyhow::Result<&'static str>>::new(); 74 75 let (bulk_tx, bulk_out) = mpsc::channel(32); // bulk uses big pages 76 77 // a bulk sink can notify us as soon as the very last op's time is known 78 // so we can start catching up while the sink might restore indexes and such 79 let (found_last_tx, found_last_out) = if catch_up { 80 let (tx, rx) = oneshot::channel(); 81 (Some(tx), Some(rx)) 82 } else { 83 (None, None) 84 }; 85 86 let (poll_tx, poll_out) = mpsc::channel::<ExportPage>(128); // normal/small pages 87 let (full_tx, full_out) = mpsc::channel(1); // don't need to buffer at this filter 88 89 // set up sources 90 if no_bulk { 91 // simple mode, just poll upstream from teh beginning 92 if http != DEFAULT_HTTP.parse()? { 93 log::warn!("ignoring non-default bulk http setting since --no-bulk was set"); 94 } 95 if let Some(d) = dir { 96 log::warn!("ignoring bulk dir setting ({d:?}) since --no-bulk was set."); 97 } 98 if let Some(u) = until { 99 log::warn!( 100 "ignoring `until` setting ({u:?}) since --no-bulk was set. (feature request?)" 101 ); 102 } 103 let mut upstream = upstream; 104 upstream.set_path("/export"); 105 let throttle = Duration::from_millis(upstream_throttle_ms); 106 tasks.spawn(poll_upstream(None, upstream, throttle, poll_tx)); 107 tasks.spawn(full_pages(poll_out, full_tx)); 108 tasks.spawn(pages_to_stdout(full_out, None)); 109 } else { 110 // fun mode 111 112 // set up bulk sources 113 if let Some(dir) = dir { 114 if http != DEFAULT_HTTP.parse()? { 115 anyhow::bail!( 116 "non-default bulk http setting can't be used with bulk dir setting ({dir:?})" 117 ); 118 } 119 tasks.spawn(backfill( 120 FolderSource(dir), 121 bulk_tx, 122 source_workers.unwrap_or(1), 123 until, 124 )); 125 } else { 126 tasks.spawn(backfill( 127 HttpSource(http), 128 bulk_tx, 129 source_workers.unwrap_or(4), 130 until, 131 )); 132 } 133 134 // and the catch-up source... 135 if let Some(last) = found_last_out { 136 let throttle = Duration::from_millis(upstream_throttle_ms); 137 tasks.spawn(async move { 138 let mut upstream = upstream; 139 upstream.set_path("/export"); 140 141 poll_upstream(last.await?, upstream, throttle, poll_tx).await 142 }); 143 } 144 145 // set up sinks 146 if let Some(pg_url) = to_postgres { 147 log::trace!("connecting to postgres..."); 148 let db = Db::new(pg_url.as_str(), postgres_cert).await?; 149 log::trace!("connected to postgres"); 150 151 tasks.spawn(backfill_to_pg( 152 db.clone(), 153 postgres_reset, 154 bulk_out, 155 found_last_tx, 156 )); 157 if catch_up { 158 tasks.spawn(pages_to_pg(db, full_out)); 159 } 160 } else { 161 tasks.spawn(pages_to_stdout(bulk_out, found_last_tx)); 162 if catch_up { 163 tasks.spawn(pages_to_stdout(full_out, None)); 164 } 165 } 166 } 167 168 while let Some(next) = tasks.join_next().await { 169 match next { 170 Err(e) if e.is_panic() => { 171 log::error!("a joinset task panicked: {e}. bailing now. (should we panic?)"); 172 return Err(e.into()); 173 } 174 Err(e) => { 175 log::error!("a joinset task failed to join: {e}"); 176 return Err(e.into()); 177 } 178 Ok(Err(e)) => { 179 log::error!("a joinset task completed with error: {e}"); 180 return Err(e); 181 } 182 Ok(Ok(name)) => { 183 log::trace!("a task completed: {name:?}. {} left", tasks.len()); 184 } 185 } 186 } 187 188 Ok(()) 189} 190 191#[derive(Debug, Parser)] 192struct CliArgs { 193 #[command(flatten)] 194 globals: GlobalArgs, 195 #[command(flatten)] 196 args: Args, 197} 198 199#[allow(dead_code)] 200#[tokio::main] 201async fn main() -> anyhow::Result<()> { 202 let args = CliArgs::parse(); 203 bin_init(false); 204 log::info!("{}", logo("backfill")); 205 run(args.globals, args.args).await?; 206 Ok(()) 207}