Server tools to backfill, tail, mirror, and verify PLC logs
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor big commands into own files

phil 5caf8d72 4afd30c3

+341 -252
+21 -252
src/bin/allegedly.rs
··· 1 - use allegedly::{ 2 - Db, Dt, ExportPage, FolderSource, HttpSource, ListenConf, PageBoundaryState, backfill, 3 - backfill_to_pg, bin_init, pages_to_pg, pages_to_weeks, poll_upstream, serve, 4 - }; 1 + use allegedly::{Dt, bin::GlobalArgs, bin_init, pages_to_stdout, pages_to_weeks, poll_upstream}; 5 2 use clap::{CommandFactory, Parser, Subcommand}; 6 - use reqwest::Url; 7 - use std::{net::SocketAddr, path::PathBuf, time::Instant}; 8 - use tokio::sync::{mpsc, oneshot}; 3 + use std::{path::PathBuf, time::Instant}; 4 + use tokio::sync::mpsc; 5 + 6 + mod backfill; 7 + mod mirror; 9 8 10 9 #[derive(Debug, Parser)] 11 10 struct Cli { 12 - /// Upstream PLC server 13 - #[arg(short, long, global = true, env = "ALLEGEDLY_UPSTREAM")] 14 - #[clap(default_value = "https://plc.directory")] 15 - upstream: Url, 11 + #[command(flatten)] 12 + globals: GlobalArgs, 13 + 16 14 #[command(subcommand)] 17 15 command: Commands, 18 16 } ··· 21 19 enum Commands { 22 20 /// Use weekly bundled ops to get a complete directory mirror FAST 23 21 Backfill { 24 - /// Remote URL prefix to fetch bundles from 25 - #[arg(long)] 26 - #[clap(default_value = "https://plc.t3.storage.dev/plc.directory/")] 27 - http: Url, 28 - /// Local folder to fetch bundles from (overrides `http`) 29 - #[arg(long)] 30 - dir: Option<PathBuf>, 31 - /// Parallel bundle fetchers 32 - /// 33 - /// Default: 4 for http fetches, 1 for local folder 34 - #[arg(long)] 35 - source_workers: Option<usize>, 36 - /// Bulk load into did-method-plc-compatible postgres instead of stdout 37 - /// 38 - /// Pass a postgres connection url like "postgresql://localhost:5432" 39 - #[arg(long, env = "ALLEGEDLY_TO_POSTGRES")] 40 - to_postgres: Option<Url>, 41 - /// Cert for postgres (if needed) 42 - #[arg(long)] 43 - postgres_cert: Option<PathBuf>, 44 - /// Delete all operations from the postgres db before starting 45 - /// 46 - /// only used if `--to-postgres` is present 47 - #[arg(long, action)] 48 - postgres_reset: bool, 49 - /// Stop at the week ending before this date 50 - #[arg(long)] 51 - until: Option<Dt>, 52 - /// After the weekly imports, poll upstream until we're caught up 53 - #[arg(long, action)] 54 - catch_up: bool, 22 + #[command(flatten)] 23 + args: backfill::Args, 55 24 }, 56 25 /// Scrape a PLC server, collecting ops into weekly bundles 57 26 /// ··· 76 45 }, 77 46 /// Wrap a did-method-plc server, syncing upstream and blocking op submits 78 47 Mirror { 79 - /// the wrapped did-method-plc server 80 - #[arg(long, env = "ALLEGEDLY_WRAP")] 81 - wrap: Url, 82 - /// the wrapped did-method-plc server's database (write access required) 83 - #[arg(long, env = "ALLEGEDLY_WRAP_PG")] 84 - wrap_pg: Url, 85 - /// path to tls cert for the wrapped postgres db, if needed 86 - #[arg(long, env = "ALLEGEDLY_WRAP_PG_CERT")] 87 - wrap_pg_cert: Option<PathBuf>, 88 - /// wrapping server listen address 89 - #[arg(short, long, env = "ALLEGEDLY_BIND")] 90 - #[clap(default_value = "127.0.0.1:8000")] 91 - bind: SocketAddr, 92 - /// obtain a certificate from letsencrypt 93 - /// 94 - /// for now this will force listening on all interfaces at :80 and :443 95 - /// (:80 will serve an "https required" error, *will not* redirect) 96 - #[arg( 97 - long, 98 - conflicts_with("bind"), 99 - requires("acme_cache_path"), 100 - env = "ALLEGEDLY_ACME_DOMAIN" 101 - )] 102 - acme_domain: Vec<String>, 103 - /// which local directory to keep the letsencrypt certs in 104 - #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_CACHE_PATH")] 105 - acme_cache_path: Option<PathBuf>, 106 - /// which public acme directory to use 107 - /// 108 - /// eg. letsencrypt staging: "https://acme-staging-v02.api.letsencrypt.org/directory" 109 - #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_DIRECTORY_URL")] 110 - #[clap(default_value = "https://acme-v02.api.letsencrypt.org/directory")] 111 - acme_directory_url: Url, 48 + #[command(flatten)] 49 + args: mirror::Args, 112 50 }, 113 51 /// Poll an upstream PLC server and log new ops to stdout 114 52 Tail { ··· 118 56 }, 119 57 } 120 58 121 - async fn pages_to_stdout( 122 - mut rx: mpsc::Receiver<ExportPage>, 123 - notify_last_at: Option<oneshot::Sender<Option<Dt>>>, 124 - ) -> anyhow::Result<()> { 125 - let mut last_at = None; 126 - while let Some(page) = rx.recv().await { 127 - for op in &page.ops { 128 - println!("{}", serde_json::to_string(op)?); 129 - } 130 - if notify_last_at.is_some() 131 - && let Some(s) = PageBoundaryState::new(&page) 132 - { 133 - last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at)); 134 - } 135 - } 136 - if let Some(notify) = notify_last_at { 137 - log::trace!("notifying last_at: {last_at:?}"); 138 - if notify.send(last_at).is_err() { 139 - log::error!("receiver for last_at dropped, can't notify"); 140 - }; 141 - } 142 - Ok(()) 143 - } 144 - 145 - /// page forwarder who drops its channels on receipt of a small page 146 - /// 147 - /// PLC will return up to 1000 ops on a page, and returns full pages until it 148 - /// has caught up, so this is a (hacky?) way to stop polling once we're up. 149 - fn full_pages(mut rx: mpsc::Receiver<ExportPage>) -> mpsc::Receiver<ExportPage> { 150 - let (tx, fwd) = mpsc::channel(1); 151 - tokio::task::spawn(async move { 152 - while let Some(page) = rx.recv().await 153 - && page.ops.len() > 900 154 - { 155 - tx.send(page).await.expect("to be able to forward a page"); 156 - } 157 - }); 158 - fwd 159 - } 160 - 161 59 #[tokio::main] 162 - async fn main() { 60 + async fn main() -> anyhow::Result<()> { 163 61 let args = Cli::parse(); 164 62 let matches = Cli::command().get_matches(); 165 63 let name = matches.subcommand().map(|(name, _)| name).unwrap_or("???"); 166 64 bin_init(name); 167 65 66 + let globals = args.globals.clone(); 67 + 168 68 let t0 = Instant::now(); 169 69 match args.command { 170 - Commands::Backfill { 171 - http, 172 - dir, 173 - source_workers, 174 - to_postgres, 175 - postgres_cert, 176 - postgres_reset, 177 - until, 178 - catch_up, 179 - } => { 180 - let (tx, rx) = mpsc::channel(32); // these are big pages 181 - tokio::task::spawn(async move { 182 - if let Some(dir) = dir { 183 - log::info!("Reading weekly bundles from local folder {dir:?}"); 184 - backfill(FolderSource(dir), tx, source_workers.unwrap_or(1), until) 185 - .await 186 - .inspect_err(|e| log::error!("backfill from folder problem: {e}")) 187 - .expect("to source bundles from a folder"); 188 - } else { 189 - log::info!("Fetching weekly bundles from from {http}"); 190 - backfill(HttpSource(http), tx, source_workers.unwrap_or(4), until) 191 - .await 192 - .expect("to source bundles from http"); 193 - } 194 - }); 195 - 196 - // postgres writer will notify us as soon as the very last op's time is known 197 - // so we can start catching up while pg is restoring indexes and stuff 198 - let (notify_last_at, rx_last) = if catch_up { 199 - let (tx, rx) = oneshot::channel(); 200 - (Some(tx), Some(rx)) 201 - } else { 202 - (None, None) 203 - }; 204 - 205 - let to_postgres_url_bulk = to_postgres.clone(); 206 - let pg_cert = postgres_cert.clone(); 207 - let bulk_out_write = tokio::task::spawn(async move { 208 - if let Some(ref url) = to_postgres_url_bulk { 209 - let db = Db::new(url.as_str(), pg_cert) 210 - .await 211 - .expect("to get db for bulk out write"); 212 - backfill_to_pg(db, postgres_reset, rx, notify_last_at) 213 - .await 214 - .expect("to backfill to pg"); 215 - } else { 216 - pages_to_stdout(rx, notify_last_at) 217 - .await 218 - .expect("to backfill to stdout"); 219 - } 220 - }); 221 - 222 - if let Some(rx_last) = rx_last { 223 - let mut upstream = args.upstream; 224 - upstream.set_path("/export"); 225 - // wait until the time for `after` is known 226 - let last_at = rx_last.await.expect("to get the last log's createdAt"); 227 - log::info!("beginning catch-up from {last_at:?} while the writer finalizes stuff"); 228 - let (tx, rx) = mpsc::channel(256); // these are small pages 229 - tokio::task::spawn(async move { 230 - poll_upstream(last_at, upstream, tx) 231 - .await 232 - .expect("polling upstream to work") 233 - }); 234 - bulk_out_write.await.expect("to wait for bulk_out_write"); 235 - log::info!("writing catch-up pages"); 236 - let full_pages = full_pages(rx); 237 - if let Some(url) = to_postgres { 238 - let db = Db::new(url.as_str(), postgres_cert) 239 - .await 240 - .expect("to connect pg for catchup"); 241 - pages_to_pg(db, full_pages) 242 - .await 243 - .expect("to write catch-up pages to pg"); 244 - } else { 245 - pages_to_stdout(full_pages, None) 246 - .await 247 - .expect("to write catch-up pages to stdout"); 248 - } 249 - } 250 - } 70 + Commands::Backfill { args } => backfill::run(globals, args).await?, 251 71 Commands::Bundle { 252 72 dest, 253 73 after, 254 74 clobber, 255 75 } => { 256 - let mut url = args.upstream; 76 + let mut url = globals.upstream; 257 77 url.set_path("/export"); 258 78 let (tx, rx) = mpsc::channel(32); // read ahead if gzip stalls for some reason 259 79 tokio::task::spawn(async move { ··· 267 87 .await 268 88 .expect("to write bundles to output files"); 269 89 } 270 - Commands::Mirror { 271 - wrap, 272 - wrap_pg, 273 - wrap_pg_cert, 274 - bind, 275 - acme_domain, 276 - acme_cache_path, 277 - acme_directory_url, 278 - } => { 279 - let db = Db::new(wrap_pg.as_str(), wrap_pg_cert) 280 - .await 281 - .expect("to connect to pg for mirroring"); 282 - let latest = db 283 - .get_latest() 284 - .await 285 - .expect("to query for last createdAt") 286 - .expect("there to be at least one op in the db. did you backfill?"); 287 - 288 - let (tx, rx) = mpsc::channel(2); 289 - // upstream poller 290 - let mut url = args.upstream.clone(); 291 - tokio::task::spawn(async move { 292 - log::info!("starting poll reader..."); 293 - url.set_path("/export"); 294 - tokio::task::spawn(async move { 295 - poll_upstream(Some(latest), url, tx) 296 - .await 297 - .expect("to poll upstream for mirror sync") 298 - }); 299 - }); 300 - // db writer 301 - let poll_db = db.clone(); 302 - tokio::task::spawn(async move { 303 - log::info!("starting db writer..."); 304 - pages_to_pg(poll_db, rx) 305 - .await 306 - .expect("to write to pg for mirror"); 307 - }); 308 - 309 - let listen_conf = match (bind, acme_domain.is_empty(), acme_cache_path) { 310 - (_, false, Some(cache_path)) => ListenConf::Acme { 311 - domains: acme_domain, 312 - cache_path, 313 - directory_url: acme_directory_url.to_string(), 314 - }, 315 - (bind, true, None) => ListenConf::Bind(bind), 316 - (_, _, _) => unreachable!(), 317 - }; 318 - 319 - serve(&args.upstream, wrap, listen_conf) 320 - .await 321 - .expect("to be able to serve the mirror proxy app"); 322 - } 90 + Commands::Mirror { args } => mirror::run(globals, args).await?, 323 91 Commands::Tail { after } => { 324 - let mut url = args.upstream; 92 + let mut url = globals.upstream; 325 93 url.set_path("/export"); 326 94 let start_at = after.or_else(|| Some(chrono::Utc::now())); 327 95 let (tx, rx) = mpsc::channel(1); ··· 336 104 } 337 105 } 338 106 log::info!("whew, {:?}. goodbye!", t0.elapsed()); 107 + Ok(()) 339 108 }
+146
src/bin/backfill.rs
··· 1 + use allegedly::{ 2 + Db, Dt, FolderSource, HttpSource, backfill, backfill_to_pg, bin::GlobalArgs, bin_init, 3 + full_pages, pages_to_pg, pages_to_stdout, poll_upstream, 4 + }; 5 + use clap::Parser; 6 + use reqwest::Url; 7 + use std::path::PathBuf; 8 + use tokio::sync::{mpsc, oneshot}; 9 + 10 + #[derive(Debug, clap::Args)] 11 + pub struct Args { 12 + /// Remote URL prefix to fetch bundles from 13 + #[arg(long)] 14 + #[clap(default_value = "https://plc.t3.storage.dev/plc.directory/")] 15 + http: Url, 16 + /// Local folder to fetch bundles from (overrides `http`) 17 + #[arg(long)] 18 + dir: Option<PathBuf>, 19 + /// Parallel bundle fetchers 20 + /// 21 + /// Default: 4 for http fetches, 1 for local folder 22 + #[arg(long)] 23 + source_workers: Option<usize>, 24 + /// Bulk load into did-method-plc-compatible postgres instead of stdout 25 + /// 26 + /// Pass a postgres connection url like "postgresql://localhost:5432" 27 + #[arg(long, env = "ALLEGEDLY_TO_POSTGRES")] 28 + to_postgres: Option<Url>, 29 + /// Cert for postgres (if needed) 30 + #[arg(long)] 31 + postgres_cert: Option<PathBuf>, 32 + /// Delete all operations from the postgres db before starting 33 + /// 34 + /// only used if `--to-postgres` is present 35 + #[arg(long, action)] 36 + postgres_reset: bool, 37 + /// Stop at the week ending before this date 38 + #[arg(long)] 39 + until: Option<Dt>, 40 + /// After the weekly imports, poll upstream until we're caught up 41 + #[arg(long, action)] 42 + catch_up: bool, 43 + } 44 + 45 + pub async fn run( 46 + GlobalArgs { upstream }: GlobalArgs, 47 + Args { 48 + http, 49 + dir, 50 + source_workers, 51 + to_postgres, 52 + postgres_cert, 53 + postgres_reset, 54 + until, 55 + catch_up, 56 + }: Args, 57 + ) -> anyhow::Result<()> { 58 + let (tx, rx) = mpsc::channel(32); // these are big pages 59 + tokio::task::spawn(async move { 60 + if let Some(dir) = dir { 61 + log::info!("Reading weekly bundles from local folder {dir:?}"); 62 + backfill(FolderSource(dir), tx, source_workers.unwrap_or(1), until) 63 + .await 64 + .inspect_err(|e| log::error!("backfill from folder problem: {e}")) 65 + .expect("to source bundles from a folder"); 66 + } else { 67 + log::info!("Fetching weekly bundles from from {http}"); 68 + backfill(HttpSource(http), tx, source_workers.unwrap_or(4), until) 69 + .await 70 + .expect("to source bundles from http"); 71 + } 72 + }); 73 + 74 + // postgres writer will notify us as soon as the very last op's time is known 75 + // so we can start catching up while pg is restoring indexes and stuff 76 + let (notify_last_at, rx_last) = if catch_up { 77 + let (tx, rx) = oneshot::channel(); 78 + (Some(tx), Some(rx)) 79 + } else { 80 + (None, None) 81 + }; 82 + 83 + let to_postgres_url_bulk = to_postgres.clone(); 84 + let pg_cert = postgres_cert.clone(); 85 + let bulk_out_write = tokio::task::spawn(async move { 86 + if let Some(ref url) = to_postgres_url_bulk { 87 + let db = Db::new(url.as_str(), pg_cert) 88 + .await 89 + .expect("to get db for bulk out write"); 90 + backfill_to_pg(db, postgres_reset, rx, notify_last_at) 91 + .await 92 + .expect("to backfill to pg"); 93 + } else { 94 + pages_to_stdout(rx, notify_last_at) 95 + .await 96 + .expect("to backfill to stdout"); 97 + } 98 + }); 99 + 100 + if let Some(rx_last) = rx_last { 101 + let mut upstream = upstream; 102 + upstream.set_path("/export"); 103 + // wait until the time for `after` is known 104 + let last_at = rx_last.await.expect("to get the last log's createdAt"); 105 + log::info!("beginning catch-up from {last_at:?} while the writer finalizes stuff"); 106 + let (tx, rx) = mpsc::channel(256); // these are small pages 107 + tokio::task::spawn(async move { 108 + poll_upstream(last_at, upstream, tx) 109 + .await 110 + .expect("polling upstream to work") 111 + }); 112 + bulk_out_write.await.expect("to wait for bulk_out_write"); 113 + log::info!("writing catch-up pages"); 114 + let full_pages = full_pages(rx); 115 + if let Some(url) = to_postgres { 116 + let db = Db::new(url.as_str(), postgres_cert) 117 + .await 118 + .expect("to connect pg for catchup"); 119 + pages_to_pg(db, full_pages) 120 + .await 121 + .expect("to write catch-up pages to pg"); 122 + } else { 123 + pages_to_stdout(full_pages, None) 124 + .await 125 + .expect("to write catch-up pages to stdout"); 126 + } 127 + } 128 + Ok(()) 129 + } 130 + 131 + #[derive(Debug, Parser)] 132 + struct CliArgs { 133 + #[command(flatten)] 134 + globals: GlobalArgs, 135 + #[command(flatten)] 136 + args: Args, 137 + } 138 + 139 + #[allow(dead_code)] 140 + #[tokio::main] 141 + async fn main() -> anyhow::Result<()> { 142 + let args = CliArgs::parse(); 143 + bin_init("backfill"); 144 + run(args.globals, args.args).await?; 145 + Ok(()) 146 + }
+117
src/bin/mirror.rs
··· 1 + use allegedly::{Db, ListenConf, bin::GlobalArgs, bin_init, pages_to_pg, poll_upstream, serve}; 2 + use clap::Parser; 3 + use reqwest::Url; 4 + use std::{net::SocketAddr, path::PathBuf}; 5 + use tokio::sync::mpsc; 6 + 7 + #[derive(Debug, clap::Args)] 8 + pub struct Args { 9 + /// the wrapped did-method-plc server 10 + #[arg(long, env = "ALLEGEDLY_WRAP")] 11 + wrap: Url, 12 + /// the wrapped did-method-plc server's database (write access required) 13 + #[arg(long, env = "ALLEGEDLY_WRAP_PG")] 14 + wrap_pg: Url, 15 + /// path to tls cert for the wrapped postgres db, if needed 16 + #[arg(long, env = "ALLEGEDLY_WRAP_PG_CERT")] 17 + wrap_pg_cert: Option<PathBuf>, 18 + /// wrapping server listen address 19 + #[arg(short, long, env = "ALLEGEDLY_BIND")] 20 + #[clap(default_value = "127.0.0.1:8000")] 21 + bind: SocketAddr, 22 + /// obtain a certificate from letsencrypt 23 + /// 24 + /// for now this will force listening on all interfaces at :80 and :443 25 + /// (:80 will serve an "https required" error, *will not* redirect) 26 + #[arg( 27 + long, 28 + conflicts_with("bind"), 29 + requires("acme_cache_path"), 30 + env = "ALLEGEDLY_ACME_DOMAIN" 31 + )] 32 + acme_domain: Vec<String>, 33 + /// which local directory to keep the letsencrypt certs in 34 + #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_CACHE_PATH")] 35 + acme_cache_path: Option<PathBuf>, 36 + /// which public acme directory to use 37 + /// 38 + /// eg. letsencrypt staging: "https://acme-staging-v02.api.letsencrypt.org/directory" 39 + #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_DIRECTORY_URL")] 40 + #[clap(default_value = "https://acme-v02.api.letsencrypt.org/directory")] 41 + acme_directory_url: Url, 42 + } 43 + 44 + pub async fn run( 45 + GlobalArgs { upstream }: GlobalArgs, 46 + Args { 47 + wrap, 48 + wrap_pg, 49 + wrap_pg_cert, 50 + bind, 51 + acme_domain, 52 + acme_cache_path, 53 + acme_directory_url, 54 + }: Args, 55 + ) -> anyhow::Result<()> { 56 + let db = Db::new(wrap_pg.as_str(), wrap_pg_cert) 57 + .await 58 + .expect("to connect to pg for mirroring"); 59 + let latest = db 60 + .get_latest() 61 + .await 62 + .expect("to query for last createdAt") 63 + .expect("there to be at least one op in the db. did you backfill?"); 64 + 65 + let (tx, rx) = mpsc::channel(2); 66 + // upstream poller 67 + let mut url = upstream.clone(); 68 + tokio::task::spawn(async move { 69 + log::info!("starting poll reader..."); 70 + url.set_path("/export"); 71 + tokio::task::spawn(async move { 72 + poll_upstream(Some(latest), url, tx) 73 + .await 74 + .expect("to poll upstream for mirror sync") 75 + }); 76 + }); 77 + // db writer 78 + let poll_db = db.clone(); 79 + tokio::task::spawn(async move { 80 + log::info!("starting db writer..."); 81 + pages_to_pg(poll_db, rx) 82 + .await 83 + .expect("to write to pg for mirror"); 84 + }); 85 + 86 + let listen_conf = match (bind, acme_domain.is_empty(), acme_cache_path) { 87 + (_, false, Some(cache_path)) => ListenConf::Acme { 88 + domains: acme_domain, 89 + cache_path, 90 + directory_url: acme_directory_url.to_string(), 91 + }, 92 + (bind, true, None) => ListenConf::Bind(bind), 93 + (_, _, _) => unreachable!(), 94 + }; 95 + 96 + serve(&upstream, wrap, listen_conf) 97 + .await 98 + .expect("to be able to serve the mirror proxy app"); 99 + Ok(()) 100 + } 101 + 102 + #[derive(Debug, Parser)] 103 + struct CliArgs { 104 + #[command(flatten)] 105 + globals: GlobalArgs, 106 + #[command(flatten)] 107 + args: Args, 108 + } 109 + 110 + #[allow(dead_code)] 111 + #[tokio::main] 112 + async fn main() -> anyhow::Result<()> { 113 + let args = CliArgs::parse(); 114 + bin_init("mirror"); 115 + run(args.globals, args.args).await?; 116 + Ok(()) 117 + }
+14
src/bin/mod.rs
··· 1 + use reqwest::Url; 2 + 3 + #[derive(Debug, Clone, clap::Args)] 4 + pub struct GlobalArgs { 5 + /// Upstream PLC server 6 + #[arg(short, long, global = true, env = "ALLEGEDLY_UPSTREAM")] 7 + #[clap(default_value = "https://plc.directory")] 8 + pub upstream: Url, 9 + } 10 + 11 + #[allow(dead_code)] 12 + fn main() { 13 + panic!("this is not actually a module") 14 + }
+43
src/lib.rs
··· 1 1 use serde::{Deserialize, Serialize}; 2 + use tokio::sync::{mpsc, oneshot}; 2 3 3 4 mod backfill; 4 5 mod client; ··· 7 8 mod poll; 8 9 mod ratelimit; 9 10 mod weekly; 11 + 12 + pub mod bin; 10 13 11 14 pub use backfill::backfill; 12 15 pub use client::{CLIENT, UA}; ··· 71 74 cid: cid.to_string(), 72 75 } 73 76 } 77 + } 78 + 79 + /// page forwarder who drops its channels on receipt of a small page 80 + /// 81 + /// PLC will return up to 1000 ops on a page, and returns full pages until it 82 + /// has caught up, so this is a (hacky?) way to stop polling once we're up. 83 + pub fn full_pages(mut rx: mpsc::Receiver<ExportPage>) -> mpsc::Receiver<ExportPage> { 84 + let (tx, fwd) = mpsc::channel(1); 85 + tokio::task::spawn(async move { 86 + while let Some(page) = rx.recv().await 87 + && page.ops.len() > 900 88 + { 89 + tx.send(page).await.expect("to be able to forward a page"); 90 + } 91 + }); 92 + fwd 93 + } 94 + 95 + pub async fn pages_to_stdout( 96 + mut rx: mpsc::Receiver<ExportPage>, 97 + notify_last_at: Option<oneshot::Sender<Option<Dt>>>, 98 + ) -> anyhow::Result<()> { 99 + let mut last_at = None; 100 + while let Some(page) = rx.recv().await { 101 + for op in &page.ops { 102 + println!("{}", serde_json::to_string(op)?); 103 + } 104 + if notify_last_at.is_some() 105 + && let Some(s) = PageBoundaryState::new(&page) 106 + { 107 + last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at)); 108 + } 109 + } 110 + if let Some(notify) = notify_last_at { 111 + log::trace!("notifying last_at: {last_at:?}"); 112 + if notify.send(last_at).is_err() { 113 + log::error!("receiver for last_at dropped, can't notify"); 114 + }; 115 + } 116 + Ok(()) 74 117 } 75 118 76 119 pub fn logo(name: &str) -> String {