Server tools to backfill, tail, mirror, and verify PLC logs
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 260 lines 8.5 kB view raw
1use crate::{CLIENT, Dt, ExportPage, Op}; 2use async_compression::tokio::bufread::GzipDecoder; 3use async_compression::tokio::write::GzipEncoder; 4use core::pin::pin; 5use reqwest::Url; 6use std::future::Future; 7use std::ops::{Bound, RangeBounds}; 8use std::path::PathBuf; 9use tokio::{ 10 fs::File, 11 io::{AsyncBufReadExt, AsyncRead, AsyncWriteExt, BufReader}, 12 sync::mpsc, 13}; 14use tokio_stream::wrappers::LinesStream; 15use tokio_util::compat::FuturesAsyncReadCompatExt; 16 17const WEEK_IN_SECONDS: i64 = 7 * 86_400; 18 19#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)] 20pub struct Week(i64); 21 22impl Week { 23 pub const fn from_n(n: i64) -> Self { 24 Self(n) 25 } 26 pub fn range(r: impl RangeBounds<Week>) -> Vec<Self> { 27 let first = match r.start_bound() { 28 Bound::Included(week) => *week, 29 Bound::Excluded(week) => week.next(), 30 Bound::Unbounded => panic!("week range must have a defined start bound"), 31 }; 32 let last = match r.end_bound() { 33 Bound::Included(week) => *week, 34 Bound::Excluded(week) => week.prev(), 35 Bound::Unbounded => Self(Self::nullification_cutoff()).prev(), 36 }; 37 let mut out = Vec::new(); 38 let mut current = first; 39 while current <= last { 40 out.push(current); 41 current = current.next(); 42 } 43 out 44 } 45 pub fn n_ago(&self) -> i64 { 46 let now = chrono::Utc::now().timestamp(); 47 (now - self.0) / WEEK_IN_SECONDS 48 } 49 pub fn n_until(&self, other: Week) -> i64 { 50 let Self(until) = other; 51 (until - self.0) / WEEK_IN_SECONDS 52 } 53 pub fn next(&self) -> Week { 54 Self(self.0 + WEEK_IN_SECONDS) 55 } 56 pub fn prev(&self) -> Week { 57 Self(self.0 - WEEK_IN_SECONDS) 58 } 59 /// whether the plc log for this week outside the 72h nullification window 60 /// 61 /// plus one hour for safety (week must have ended > 73 hours ago) 62 pub fn is_immutable(&self) -> bool { 63 self.next().0 <= Self::nullification_cutoff() 64 } 65 fn nullification_cutoff() -> i64 { 66 const HOUR_IN_SECONDS: i64 = 3600; 67 let now = chrono::Utc::now().timestamp(); 68 now - (73 * HOUR_IN_SECONDS) 69 } 70} 71 72impl From<Dt> for Week { 73 fn from(dt: Dt) -> Self { 74 let ts = dt.timestamp(); 75 let truncated = (ts / WEEK_IN_SECONDS) * WEEK_IN_SECONDS; 76 Week(truncated) 77 } 78} 79 80impl From<Week> for Dt { 81 fn from(week: Week) -> Dt { 82 let Week(ts) = week; 83 Dt::from_timestamp(ts, 0).expect("the week to be in valid range") 84 } 85} 86 87pub trait BundleSource: Clone { 88 fn reader_for( 89 &self, 90 week: Week, 91 ) -> impl Future<Output = anyhow::Result<impl AsyncRead + Send>> + Send; 92} 93 94#[derive(Debug, Clone)] 95pub struct FolderSource(pub PathBuf); 96impl BundleSource for FolderSource { 97 async fn reader_for(&self, week: Week) -> anyhow::Result<impl AsyncRead> { 98 let FolderSource(dir) = self; 99 let path = dir.join(format!("{}.jsonl.gz", week.0)); 100 tracing::debug!("opening folder source: {path:?}"); 101 let file = File::open(path) 102 .await 103 .inspect_err(|e| tracing::error!("failed to open file: {e}"))?; 104 let decoder = GzipDecoder::new(BufReader::new(file)); 105 Ok(decoder) 106 } 107} 108 109#[derive(Debug, Clone)] 110pub struct HttpSource(pub Url); 111impl BundleSource for HttpSource { 112 async fn reader_for(&self, week: Week) -> anyhow::Result<impl AsyncRead> { 113 use futures::TryStreamExt; 114 let HttpSource(base) = self; 115 let url = base.join(&format!("{}.jsonl.gz", week.0))?; 116 let stream = CLIENT 117 .get(url) 118 .send() 119 .await? 120 .error_for_status()? 121 .bytes_stream() 122 .map_err(futures::io::Error::other) 123 .into_async_read() 124 .compat(); 125 let decoder = GzipDecoder::new(BufReader::new(stream)); 126 Ok(decoder) 127 } 128} 129 130pub async fn pages_to_weeks( 131 mut rx: mpsc::Receiver<ExportPage>, 132 dir: PathBuf, 133 clobber: bool, 134) -> anyhow::Result<()> { 135 pub use std::time::Instant; 136 137 // ...there is certainly a nicer way to write this 138 let mut current_week: Option<Week> = None; 139 let dummy_file = File::create(dir.join("_dummy")).await?; 140 let mut encoder = GzipEncoder::new(dummy_file); 141 142 let mut total_ops = 0; 143 let total_t0 = Instant::now(); 144 let mut week_ops = 0; 145 let mut week_t0 = total_t0; 146 147 while let Some(page) = rx.recv().await { 148 for op in page.ops { 149 let op_week = op.created_at.into(); 150 if current_week.map(|w| w != op_week).unwrap_or(true) { 151 encoder.shutdown().await?; 152 let now = Instant::now(); 153 154 tracing::info!( 155 "done week {:3 } ({:10 }): {week_ops:7 } ({:5.0 }/s) ops, {:5 }k total ({:5.0 }/s)", 156 current_week.map(|w| -w.n_ago()).unwrap_or(0), 157 current_week.unwrap_or(Week(0)).0, 158 (week_ops as f64) / (now - week_t0).as_secs_f64(), 159 total_ops / 1000, 160 (total_ops as f64) / (now - total_t0).as_secs_f64(), 161 ); 162 let path = dir.join(format!("{}.jsonl.gz", op_week.0)); 163 let file = if clobber { 164 File::create(path).await? 165 } else { 166 File::create_new(path).await? 167 }; 168 encoder = GzipEncoder::with_quality(file, async_compression::Level::Best); 169 current_week = Some(op_week); 170 week_ops = 0; 171 week_t0 = now; 172 } 173 tracing::trace!("writing: {op:?}"); 174 encoder 175 .write_all(serde_json::to_string(&op)?.as_bytes()) 176 .await?; 177 total_ops += 1; 178 week_ops += 1; 179 } 180 } 181 182 // don't forget the final file 183 encoder.shutdown().await?; 184 let now = Instant::now(); 185 tracing::info!( 186 "done week {:3 } ({:10 }): {week_ops:7 } ({:5.0 }/s) ops, {:5 }k total ({:5.0 }/s)", 187 current_week.map(|w| -w.n_ago()).unwrap_or(0), 188 current_week.unwrap_or(Week(0)).0, 189 (week_ops as f64) / (now - week_t0).as_secs_f64(), 190 total_ops / 1000, 191 (total_ops as f64) / (now - total_t0).as_secs_f64(), 192 ); 193 194 Ok(()) 195} 196 197pub async fn week_to_pages( 198 source: impl BundleSource, 199 week: Week, 200 dest: mpsc::Sender<ExportPage>, 201) -> anyhow::Result<()> { 202 use futures::TryStreamExt; 203 let mut retry_backoff = std::time::Duration::from_secs(2); 204 205 loop { 206 let reader = match source.reader_for(week).await { 207 Ok(r) => r, 208 Err(e) => { 209 tracing::warn!( 210 "week_to_pages reader_for failed {e}, retrying in {}s", 211 retry_backoff.as_secs() 212 ); 213 tokio::time::sleep(retry_backoff).await; 214 retry_backoff = (retry_backoff * 2).min(std::time::Duration::from_secs(300)); 215 continue; 216 } 217 }; 218 219 let mut chunks = pin!(LinesStream::new(BufReader::new(reader).lines()).try_chunks(10000)); 220 let mut success = true; 221 222 while let Some(chunk) = match chunks.as_mut().try_next().await { 223 Ok(Some(c)) => Some(c), 224 Ok(None) => None, 225 Err(e) => { 226 tracing::warn!( 227 "failed to get next chunk: {e}, retrying week in {}s", 228 retry_backoff.as_secs() 229 ); 230 tokio::time::sleep(retry_backoff).await; 231 retry_backoff = (retry_backoff * 2).min(std::time::Duration::from_secs(300)); 232 success = false; 233 None 234 } 235 } { 236 let ops: Vec<Op> = chunk 237 .into_iter() 238 .filter_map(|s| { 239 serde_json::from_str::<Op>(&s) 240 .inspect_err(|e| tracing::warn!("failed to parse op: {e} ({s})")) 241 .ok() 242 }) 243 .collect(); 244 245 if ops.is_empty() { 246 continue; 247 } 248 249 let page = ExportPage { ops }; 250 if let Err(e) = dest.send(page).await { 251 tracing::error!("failed to send page (receiver closed): {e}"); 252 return Err(e.into()); 253 } 254 } 255 256 if success { 257 return Ok(()); 258 } 259 } 260}