forked from
microcosm.blue/Allegedly
Server tools to backfill, tail, mirror, and verify PLC logs
1use crate::{CLIENT, Dt, ExportPage, Op};
2use async_compression::tokio::bufread::GzipDecoder;
3use async_compression::tokio::write::GzipEncoder;
4use core::pin::pin;
5use reqwest::Url;
6use std::future::Future;
7use std::ops::{Bound, RangeBounds};
8use std::path::PathBuf;
9use tokio::{
10 fs::File,
11 io::{AsyncBufReadExt, AsyncRead, AsyncWriteExt, BufReader},
12 sync::mpsc,
13};
14use tokio_stream::wrappers::LinesStream;
15use tokio_util::compat::FuturesAsyncReadCompatExt;
16
17const WEEK_IN_SECONDS: i64 = 7 * 86_400;
18
19#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)]
20pub struct Week(i64);
21
22impl Week {
23 pub const fn from_n(n: i64) -> Self {
24 Self(n)
25 }
26 pub fn range(r: impl RangeBounds<Week>) -> Vec<Self> {
27 let first = match r.start_bound() {
28 Bound::Included(week) => *week,
29 Bound::Excluded(week) => week.next(),
30 Bound::Unbounded => panic!("week range must have a defined start bound"),
31 };
32 let last = match r.end_bound() {
33 Bound::Included(week) => *week,
34 Bound::Excluded(week) => week.prev(),
35 Bound::Unbounded => Self(Self::nullification_cutoff()).prev(),
36 };
37 let mut out = Vec::new();
38 let mut current = first;
39 while current <= last {
40 out.push(current);
41 current = current.next();
42 }
43 out
44 }
45 pub fn n_ago(&self) -> i64 {
46 let now = chrono::Utc::now().timestamp();
47 (now - self.0) / WEEK_IN_SECONDS
48 }
49 pub fn n_until(&self, other: Week) -> i64 {
50 let Self(until) = other;
51 (until - self.0) / WEEK_IN_SECONDS
52 }
53 pub fn next(&self) -> Week {
54 Self(self.0 + WEEK_IN_SECONDS)
55 }
56 pub fn prev(&self) -> Week {
57 Self(self.0 - WEEK_IN_SECONDS)
58 }
59 /// whether the plc log for this week outside the 72h nullification window
60 ///
61 /// plus one hour for safety (week must have ended > 73 hours ago)
62 pub fn is_immutable(&self) -> bool {
63 self.next().0 <= Self::nullification_cutoff()
64 }
65 fn nullification_cutoff() -> i64 {
66 const HOUR_IN_SECONDS: i64 = 3600;
67 let now = chrono::Utc::now().timestamp();
68 now - (73 * HOUR_IN_SECONDS)
69 }
70}
71
72impl From<Dt> for Week {
73 fn from(dt: Dt) -> Self {
74 let ts = dt.timestamp();
75 let truncated = (ts / WEEK_IN_SECONDS) * WEEK_IN_SECONDS;
76 Week(truncated)
77 }
78}
79
80impl From<Week> for Dt {
81 fn from(week: Week) -> Dt {
82 let Week(ts) = week;
83 Dt::from_timestamp(ts, 0).expect("the week to be in valid range")
84 }
85}
86
87pub trait BundleSource: Clone {
88 fn reader_for(
89 &self,
90 week: Week,
91 ) -> impl Future<Output = anyhow::Result<impl AsyncRead + Send>> + Send;
92}
93
94#[derive(Debug, Clone)]
95pub struct FolderSource(pub PathBuf);
96impl BundleSource for FolderSource {
97 async fn reader_for(&self, week: Week) -> anyhow::Result<impl AsyncRead> {
98 let FolderSource(dir) = self;
99 let path = dir.join(format!("{}.jsonl.gz", week.0));
100 tracing::debug!("opening folder source: {path:?}");
101 let file = File::open(path)
102 .await
103 .inspect_err(|e| tracing::error!("failed to open file: {e}"))?;
104 let decoder = GzipDecoder::new(BufReader::new(file));
105 Ok(decoder)
106 }
107}
108
109#[derive(Debug, Clone)]
110pub struct HttpSource(pub Url);
111impl BundleSource for HttpSource {
112 async fn reader_for(&self, week: Week) -> anyhow::Result<impl AsyncRead> {
113 use futures::TryStreamExt;
114 let HttpSource(base) = self;
115 let url = base.join(&format!("{}.jsonl.gz", week.0))?;
116 let stream = CLIENT
117 .get(url)
118 .send()
119 .await?
120 .error_for_status()?
121 .bytes_stream()
122 .map_err(futures::io::Error::other)
123 .into_async_read()
124 .compat();
125 let decoder = GzipDecoder::new(BufReader::new(stream));
126 Ok(decoder)
127 }
128}
129
130pub async fn pages_to_weeks(
131 mut rx: mpsc::Receiver<ExportPage>,
132 dir: PathBuf,
133 clobber: bool,
134) -> anyhow::Result<()> {
135 pub use std::time::Instant;
136
137 // ...there is certainly a nicer way to write this
138 let mut current_week: Option<Week> = None;
139 let dummy_file = File::create(dir.join("_dummy")).await?;
140 let mut encoder = GzipEncoder::new(dummy_file);
141
142 let mut total_ops = 0;
143 let total_t0 = Instant::now();
144 let mut week_ops = 0;
145 let mut week_t0 = total_t0;
146
147 while let Some(page) = rx.recv().await {
148 for op in page.ops {
149 let op_week = op.created_at.into();
150 if current_week.map(|w| w != op_week).unwrap_or(true) {
151 encoder.shutdown().await?;
152 let now = Instant::now();
153
154 tracing::info!(
155 "done week {:3 } ({:10 }): {week_ops:7 } ({:5.0 }/s) ops, {:5 }k total ({:5.0 }/s)",
156 current_week.map(|w| -w.n_ago()).unwrap_or(0),
157 current_week.unwrap_or(Week(0)).0,
158 (week_ops as f64) / (now - week_t0).as_secs_f64(),
159 total_ops / 1000,
160 (total_ops as f64) / (now - total_t0).as_secs_f64(),
161 );
162 let path = dir.join(format!("{}.jsonl.gz", op_week.0));
163 let file = if clobber {
164 File::create(path).await?
165 } else {
166 File::create_new(path).await?
167 };
168 encoder = GzipEncoder::with_quality(file, async_compression::Level::Best);
169 current_week = Some(op_week);
170 week_ops = 0;
171 week_t0 = now;
172 }
173 tracing::trace!("writing: {op:?}");
174 encoder
175 .write_all(serde_json::to_string(&op)?.as_bytes())
176 .await?;
177 total_ops += 1;
178 week_ops += 1;
179 }
180 }
181
182 // don't forget the final file
183 encoder.shutdown().await?;
184 let now = Instant::now();
185 tracing::info!(
186 "done week {:3 } ({:10 }): {week_ops:7 } ({:5.0 }/s) ops, {:5 }k total ({:5.0 }/s)",
187 current_week.map(|w| -w.n_ago()).unwrap_or(0),
188 current_week.unwrap_or(Week(0)).0,
189 (week_ops as f64) / (now - week_t0).as_secs_f64(),
190 total_ops / 1000,
191 (total_ops as f64) / (now - total_t0).as_secs_f64(),
192 );
193
194 Ok(())
195}
196
197pub async fn week_to_pages(
198 source: impl BundleSource,
199 week: Week,
200 dest: mpsc::Sender<ExportPage>,
201) -> anyhow::Result<()> {
202 use futures::TryStreamExt;
203 let mut retry_backoff = std::time::Duration::from_secs(2);
204
205 loop {
206 let reader = match source.reader_for(week).await {
207 Ok(r) => r,
208 Err(e) => {
209 tracing::warn!(
210 "week_to_pages reader_for failed {e}, retrying in {}s",
211 retry_backoff.as_secs()
212 );
213 tokio::time::sleep(retry_backoff).await;
214 retry_backoff = (retry_backoff * 2).min(std::time::Duration::from_secs(300));
215 continue;
216 }
217 };
218
219 let mut chunks = pin!(LinesStream::new(BufReader::new(reader).lines()).try_chunks(10000));
220 let mut success = true;
221
222 while let Some(chunk) = match chunks.as_mut().try_next().await {
223 Ok(Some(c)) => Some(c),
224 Ok(None) => None,
225 Err(e) => {
226 tracing::warn!(
227 "failed to get next chunk: {e}, retrying week in {}s",
228 retry_backoff.as_secs()
229 );
230 tokio::time::sleep(retry_backoff).await;
231 retry_backoff = (retry_backoff * 2).min(std::time::Duration::from_secs(300));
232 success = false;
233 None
234 }
235 } {
236 let ops: Vec<Op> = chunk
237 .into_iter()
238 .filter_map(|s| {
239 serde_json::from_str::<Op>(&s)
240 .inspect_err(|e| tracing::warn!("failed to parse op: {e} ({s})"))
241 .ok()
242 })
243 .collect();
244
245 if ops.is_empty() {
246 continue;
247 }
248
249 let page = ExportPage { ops };
250 if let Err(e) = dest.send(page).await {
251 tracing::error!("failed to send page (receiver closed): {e}");
252 return Err(e.into());
253 }
254 }
255
256 if success {
257 return Ok(());
258 }
259 }
260}