Server tools to backfill, tail, mirror, and verify PLC logs
50
fork

Configure Feed

Select the types of activity you want to include in your feed.

at debug 64 lines 2.1 kB view raw
1use crate::{BundleSource, Dt, ExportPage, Week, week_to_pages}; 2use std::sync::Arc; 3use std::time::Instant; 4use tokio::{ 5 sync::{Mutex, mpsc}, 6 task::JoinSet, 7}; 8 9const FIRST_WEEK: Week = Week::from_n(1668643200); 10 11pub async fn backfill( 12 source: impl BundleSource + Send + 'static, 13 dest: mpsc::Sender<ExportPage>, 14 source_workers: usize, 15 until: Option<Dt>, 16) -> anyhow::Result<&'static str> { 17 // queue up the week bundles that should be available 18 let weeks = Arc::new(Mutex::new( 19 until 20 .map(|u| Week::range(FIRST_WEEK..u.into())) 21 .unwrap_or(Week::range(FIRST_WEEK..)), 22 )); 23 weeks.lock().await.reverse(); 24 25 let mut workers: JoinSet<anyhow::Result<()>> = JoinSet::new(); 26 27 let t_step = Instant::now(); 28 log::info!( 29 "fetching backfill for {} weeks with {source_workers} workers...", 30 weeks.lock().await.len() 31 ); 32 33 // spin up the fetchers to work in parallel 34 for w in 0..source_workers { 35 let weeks = weeks.clone(); 36 let dest = dest.clone(); 37 let source = source.clone(); 38 workers.spawn(async move { 39 while let Some(week) = weeks.lock().await.pop() { 40 let when = Into::<Dt>::into(week).to_rfc3339(); 41 log::trace!("worker {w}: fetching week {when} (-{})", week.n_ago()); 42 week_to_pages(source.clone(), week, dest.clone()) 43 .await 44 .inspect_err(|e| log::error!("failing week_to_pages: {e}"))?; 45 } 46 log::info!("done with the weeks ig"); 47 Ok(()) 48 }); 49 } 50 51 // TODO: handle missing/failed weeks 52 53 // wait for the big backfill to finish 54 while let Some(res) = workers.join_next().await { 55 res.inspect_err(|e| log::error!("problem joining source workers: {e}"))? 56 .inspect_err(|e| log::error!("problem *from* source worker: {e}"))?; 57 } 58 log::info!( 59 "finished fetching backfill in {:?}. senders remaining: {}", 60 t_step.elapsed(), 61 dest.strong_count() 62 ); 63 Ok("backfill") 64}