forked from
microcosm.blue/Allegedly
Server tools to backfill, tail, mirror, and verify PLC logs
1use crate::{CLIENT, Dt, ExportPage, Op, OpKey, SeqOp, SeqPage};
2use reqwest::Url;
3use std::time::Duration;
4use thiserror::Error;
5use tokio::sync::mpsc;
6
7#[derive(Debug, Error)]
8pub enum GetPageError {
9 #[error(transparent)]
10 Reqwest(#[from] reqwest::Error),
11 #[error(transparent)]
12 ReqwestMiddleware(#[from] reqwest_middleware::Error),
13 #[error(transparent)]
14 Serde(#[from] serde_json::Error),
15}
16
17/// ops are primary-keyed by (did, cid)
18/// plc orders by `created_at` but does not guarantee distinct times per op
19/// we assume that the order will at least be deterministic: this may be unsound
20#[derive(Debug, PartialEq)]
21pub struct LastOp {
22 pub created_at: Dt, // any op greater is definitely not duplicated
23 pk: (String, String), // did, cid
24}
25
26impl From<Op> for LastOp {
27 fn from(op: Op) -> Self {
28 Self {
29 created_at: op.created_at,
30 pk: (op.did, op.cid),
31 }
32 }
33}
34
35impl From<&Op> for LastOp {
36 fn from(op: &Op) -> Self {
37 Self {
38 created_at: op.created_at,
39 pk: (op.did.clone(), op.cid.clone()),
40 }
41 }
42}
43
44// bit of a hack
45impl From<Dt> for LastOp {
46 fn from(dt: Dt) -> Self {
47 Self {
48 created_at: dt,
49 pk: ("".to_string(), "".to_string()),
50 }
51 }
52}
53
54/// State for removing duplicates ops between PLC export page boundaries
55#[derive(Debug, PartialEq)]
56pub struct PageBoundaryState {
57 /// The previous page's last timestamp
58 ///
59 /// Duplicate ops from /export only occur for the same exact timestamp
60 pub last_at: Dt,
61 /// The previous page's ops at its last timestamp
62 keys_at: Vec<OpKey>, // expected to ~always be length one
63}
64
65impl PageBoundaryState {
66 /// Initialize the boundary state with a PLC page
67 pub fn new(page: &ExportPage) -> Option<Self> {
68 // grab the very last op
69 let (last_at, last_key) = page.ops.last().map(|op| (op.created_at, op.into()))?;
70
71 // set initial state
72 let mut me = Self {
73 last_at,
74 keys_at: vec![last_key],
75 };
76
77 // and make sure all keys at this time are captured from the back
78 me.capture_nth_last_at(page, last_at, 1);
79
80 Some(me)
81 }
82 /// Apply the deduplication and update state
83 ///
84 /// The beginning of the page will be modified to remove duplicates from the
85 /// previous page.
86 ///
87 /// The end of the page is inspected to update the deduplicator state for
88 /// the next page.
89 fn apply_to_next(&mut self, page: &mut ExportPage) {
90 // walk ops forward, kicking previously-seen ops until created_at advances
91 let to_remove: Vec<usize> = page
92 .ops
93 .iter()
94 .enumerate()
95 .take_while(|(_, op)| op.created_at == self.last_at)
96 .filter(|(_, op)| self.keys_at.contains(&(*op).into()))
97 .map(|(i, _)| i)
98 .collect();
99
100 // actually remove them. last to first so indices don't shift
101 for dup_idx in to_remove.into_iter().rev() {
102 page.ops.remove(dup_idx);
103 }
104
105 // grab the very last op
106 let Some((last_at, last_key)) = page.ops.last().map(|op| (op.created_at, op.into())) else {
107 // there are no ops left? oop. bail.
108 // last_at and existing keys remain in tact
109 return;
110 };
111
112 // reset state (as long as time actually moved forward on this page)
113 if last_at > self.last_at {
114 self.last_at = last_at;
115 self.keys_at = vec![last_key];
116 } else {
117 // weird cases: either time didn't move (fine...) or went backwards (not fine)
118 assert_eq!(last_at, self.last_at, "time moved backwards on a page");
119 self.keys_at.push(last_key);
120 }
121 // and make sure all keys at this time are captured from the back
122 self.capture_nth_last_at(page, last_at, 1);
123 }
124
125 /// walk backwards from 2nd last and collect keys until created_at changes
126 fn capture_nth_last_at(&mut self, page: &ExportPage, last_at: Dt, skips: usize) {
127 page.ops
128 .iter()
129 .rev()
130 .skip(skips)
131 .take_while(|op| op.created_at == last_at)
132 .for_each(|op| {
133 self.keys_at.push(op.into());
134 });
135 }
136}
137
138/// Get one PLC export page
139///
140/// Extracts the final op so it can be used to fetch the following page
141pub async fn get_page(url: Url) -> Result<(ExportPage, Option<LastOp>), GetPageError> {
142 use futures::TryStreamExt;
143 use tokio::io::{AsyncBufReadExt, BufReader};
144 use tokio_util::compat::FuturesAsyncReadCompatExt;
145
146 tracing::trace!("Getting page: {url}");
147
148 let res = CLIENT.get(url).send().await?.error_for_status()?;
149 let stream = Box::pin(
150 res.bytes_stream()
151 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
152 .into_async_read()
153 .compat(),
154 );
155
156 let mut lines = BufReader::new(stream).lines();
157 let mut ops = Vec::new();
158
159 loop {
160 match lines.next_line().await {
161 Ok(Some(line)) => {
162 let line = line.trim();
163 if line.is_empty() {
164 continue;
165 }
166 match serde_json::from_str::<Op>(line) {
167 Ok(op) => ops.push(op),
168 Err(e) => tracing::warn!("failed to parse op: {e} ({line})"),
169 }
170 }
171 Ok(None) => break,
172 Err(e) => {
173 tracing::warn!("transport error mid-page: {}; returning partial page", e);
174 break;
175 }
176 }
177 }
178
179 let last_op = ops.last().map(Into::into);
180
181 Ok((ExportPage { ops }, last_op))
182}
183
184/// Poll an upstream PLC server for new ops
185///
186/// Pages of operations are written to the `dest` channel.
187///
188/// ```no_run
189/// # #[tokio::main]
190/// # async fn main() {
191/// use allegedly::{ExportPage, Op, poll_upstream};
192///
193/// let after = Some(chrono::Utc::now());
194/// let upstream = "https://plc.wtf/export".parse().unwrap();
195/// let throttle = std::time::Duration::from_millis(300);
196///
197/// let (tx, mut rx) = tokio::sync::mpsc::channel(1);
198/// tokio::task::spawn(poll_upstream(after, upstream, throttle, tx));
199///
200/// while let Some(ExportPage { ops }) = rx.recv().await {
201/// println!("received {} plc ops", ops.len());
202///
203/// for Op { did, cid, operation, .. } in ops {
204/// // in this example we're alerting when changes are found for one
205/// // specific identity
206/// if did == "did:plc:hdhoaan3xa3jiuq4fg4mefid" {
207/// println!("Update found for {did}! cid={cid}\n -> operation: {}", operation.get());
208/// }
209/// }
210/// }
211/// # }
212/// ```
213pub async fn poll_upstream(
214 after: Option<Dt>,
215 base: Url,
216 throttle: Duration,
217 dest: mpsc::Sender<ExportPage>,
218) -> anyhow::Result<&'static str> {
219 tracing::info!("starting upstream poller at {base} after {after:?}");
220 let mut tick = tokio::time::interval(throttle);
221 let mut prev_last: Option<LastOp> = after.map(Into::into);
222 let mut boundary_state: Option<PageBoundaryState> = None;
223 loop {
224 tick.tick().await;
225
226 let mut url = base.clone();
227 if let Some(ref pl) = prev_last {
228 url.query_pairs_mut()
229 .append_pair("after", &pl.created_at.to_rfc3339());
230 };
231
232 let (mut page, next_last) = match get_page(url).await {
233 Ok(res) => res,
234 Err(e) => {
235 tracing::warn!("error polling upstream: {e}");
236 continue;
237 }
238 };
239
240 if let Some(ref mut state) = boundary_state {
241 state.apply_to_next(&mut page);
242 } else {
243 boundary_state = PageBoundaryState::new(&page);
244 }
245 if !page.is_empty() {
246 match dest.try_send(page) {
247 Ok(()) => {}
248 Err(mpsc::error::TrySendError::Full(page)) => {
249 tracing::warn!("export: destination channel full, awaiting...");
250 dest.send(page).await?;
251 }
252 e => e?,
253 };
254 }
255
256 prev_last = next_last.or(prev_last);
257 }
258}
259
260/// Fetch one page of seq-based export from `/export?after=<seq>`
261async fn get_seq_page(url: Url) -> Result<SeqPage, GetPageError> {
262 use futures::TryStreamExt;
263 use tokio::io::{AsyncBufReadExt, BufReader};
264 use tokio_util::compat::FuturesAsyncReadCompatExt;
265
266 tracing::trace!("getting seq page: {url}");
267
268 let res = CLIENT.get(url).send().await?.error_for_status()?;
269 let stream = Box::pin(
270 res.bytes_stream()
271 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
272 .into_async_read()
273 .compat(),
274 );
275
276 let mut lines = BufReader::new(stream).lines();
277 let mut ops = Vec::new();
278
279 loop {
280 match lines.next_line().await {
281 Ok(Some(line)) => {
282 let line = line.trim();
283 if line.is_empty() {
284 continue;
285 }
286 match serde_json::from_str::<SeqOp>(line) {
287 Ok(op) => ops.push(op),
288 Err(e) => tracing::warn!("failed to parse seq op: {e} ({line})"),
289 }
290 }
291 Ok(None) => break,
292 Err(e) => {
293 tracing::warn!(
294 "transport error mid-seq-page: {}; returning partial page",
295 e
296 );
297 break;
298 }
299 }
300 }
301
302 Ok(SeqPage { ops })
303}
304
305/// Poll an upstream PLC server using seq-number-based cursoring
306///
307/// Uses `/export?after=<seq>` — each op from the server carries a `seq` field
308/// which is a globally monotonic unsigned integer. Because seq is unique per op
309/// there is no need for page-boundary deduplication.
310///
311/// Pages are sent to `dest`. Returns when the channel closes.
312pub async fn poll_upstream_seq(
313 after: Option<u64>,
314 base: Url,
315 throttle: Duration,
316 dest: mpsc::Sender<SeqPage>,
317) -> anyhow::Result<&'static str> {
318 tracing::info!("starting seq upstream poller at {base} after {after:?}");
319 let mut tick = tokio::time::interval(throttle);
320 let mut last_seq: u64 = after.unwrap_or(0);
321
322 loop {
323 tick.tick().await;
324
325 let mut url = base.clone();
326 url.query_pairs_mut()
327 .append_pair("after", &last_seq.to_string());
328
329 let page = match get_seq_page(url).await {
330 Ok(p) => p,
331 Err(e) => {
332 tracing::warn!("error polling upstream (seq): {e}");
333 continue;
334 }
335 };
336
337 if let Some(last) = page.ops.last() {
338 last_seq = last.seq;
339 }
340
341 if !page.is_empty() {
342 tracing::debug!(
343 "seq poll: page with {} ops, seq {}..{}",
344 page.ops.len(),
345 page.ops.first().map(|op| op.seq).unwrap_or(0),
346 last_seq
347 );
348 match dest.try_send(page) {
349 Ok(()) => {}
350 Err(mpsc::error::TrySendError::Full(page)) => {
351 tracing::warn!("seq poll: destination channel full, awaiting...");
352 dest.send(page).await?;
353 }
354 e => e?,
355 };
356 }
357 }
358}
359
360/// Tail the upstream PLC `/export/stream` WebSocket endpoint
361///
362/// `cursor` is a seq number to resume from. The server only supports backfill
363/// of up to ~1 week (server-configurable), so this cannot replay from seq 0.
364/// Use `poll_upstream_seq` to catch up first, then hand off to this function.
365///
366/// Messages arrive as single-op `SeqPage`s sent to `dest`. Returns on
367/// disconnect so the caller can reconnect or fall back to polling.
368pub async fn tail_upstream_stream(
369 cursor: Option<u64>,
370 base: Url,
371 dest: mpsc::Sender<SeqPage>,
372) -> anyhow::Result<()> {
373 use futures::StreamExt;
374 use tokio_tungstenite::{connect_async, tungstenite::Message};
375
376 let mut url = base.clone();
377 // convert ws(s):// scheme if needed; some callers pass http(s)://
378 let ws_scheme = match url.scheme() {
379 "https" => "wss",
380 "http" => "ws",
381 _ => "ws",
382 }
383 .to_owned();
384 url.set_scheme(&ws_scheme)
385 .map_err(|_| anyhow::anyhow!("failed to set websocket scheme"))?;
386 if let Some(seq) = cursor {
387 url.query_pairs_mut()
388 .append_pair("cursor", &seq.to_string());
389 }
390
391 tracing::info!("connecting to stream: {url}");
392 let (mut ws, _) = connect_async(url.as_str()).await?;
393 tracing::info!("stream connected");
394
395 while let Some(msg) = ws.next().await {
396 let msg = msg?;
397 let text = match msg {
398 Message::Text(t) => t,
399 Message::Close(_) => {
400 tracing::info!("stream closed by server");
401 break;
402 }
403 _ => continue,
404 };
405
406 let op: SeqOp = match serde_json::from_str(&text) {
407 Ok(op) => op,
408 Err(e) => {
409 tracing::warn!("failed to parse stream event: {e} ({text})");
410 continue;
411 }
412 };
413
414 let page = SeqPage { ops: vec![op] };
415 if dest.send(page).await.is_err() {
416 tracing::info!("stream dest channel closed, stopping");
417 break;
418 }
419 }
420
421 Ok(())
422}
423
424#[cfg(test)]
425mod test {
426 use super::*;
427
428 const FIVES_TS: i64 = 1431648000;
429 const NEXT_TS: i64 = 1431648001;
430
431 fn valid_op() -> Op {
432 serde_json::from_value(serde_json::json!({
433 "did": "did",
434 "cid": "cid",
435 "createdAt": "2015-05-15T00:00:00Z",
436 "nullified": false,
437 "operation": {},
438 }))
439 .unwrap()
440 }
441
442 fn next_op() -> Op {
443 serde_json::from_value(serde_json::json!({
444 "did": "didnext",
445 "cid": "cidnext",
446 "createdAt": "2015-05-15T00:00:01Z",
447 "nullified": false,
448 "operation": {},
449 }))
450 .unwrap()
451 }
452
453 fn base_state() -> PageBoundaryState {
454 let page = ExportPage {
455 ops: vec![valid_op()],
456 };
457 PageBoundaryState::new(&page).expect("to have a base page boundary state")
458 }
459
460 #[test]
461 fn test_boundary_new_empty() {
462 let page = ExportPage { ops: vec![] };
463 let state = PageBoundaryState::new(&page);
464 assert!(state.is_none());
465 }
466
467 #[test]
468 fn test_boundary_new_one_op() {
469 let page = ExportPage {
470 ops: vec![valid_op()],
471 };
472 let state = PageBoundaryState::new(&page).unwrap();
473 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap());
474 assert_eq!(
475 state.keys_at,
476 vec![OpKey {
477 cid: "cid".to_string(),
478 did: "did".to_string(),
479 }]
480 );
481 }
482
483 #[test]
484 fn test_add_new_empty() {
485 let mut state = base_state();
486 state.apply_to_next(&mut ExportPage { ops: vec![] });
487 assert_eq!(state, base_state());
488 }
489
490 #[test]
491 fn test_add_new_same_op() {
492 let mut page = ExportPage {
493 ops: vec![valid_op()],
494 };
495 let mut state = base_state();
496 state.apply_to_next(&mut page);
497 assert_eq!(state, base_state());
498 }
499
500 #[test]
501 fn test_add_new_same_time() {
502 // make an op with a different OpKey
503 let mut op = valid_op();
504 op.cid = "cid2".to_string();
505 let mut page = ExportPage { ops: vec![op] };
506
507 let mut state = base_state();
508 state.apply_to_next(&mut page);
509 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap());
510 assert_eq!(
511 state.keys_at,
512 vec![
513 OpKey {
514 cid: "cid".to_string(),
515 did: "did".to_string(),
516 },
517 OpKey {
518 cid: "cid2".to_string(),
519 did: "did".to_string(),
520 },
521 ]
522 );
523 }
524
525 #[test]
526 fn test_add_new_same_time_dup_before() {
527 // make an op with a different OpKey
528 let mut op = valid_op();
529 op.cid = "cid2".to_string();
530 let mut page = ExportPage {
531 ops: vec![valid_op(), op],
532 };
533
534 let mut state = base_state();
535 state.apply_to_next(&mut page);
536 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap());
537 assert_eq!(
538 state.keys_at,
539 vec![
540 OpKey {
541 cid: "cid".to_string(),
542 did: "did".to_string(),
543 },
544 OpKey {
545 cid: "cid2".to_string(),
546 did: "did".to_string(),
547 },
548 ]
549 );
550 }
551
552 #[test]
553 fn test_add_new_same_time_dup_after() {
554 // make an op with a different OpKey
555 let mut op = valid_op();
556 op.cid = "cid2".to_string();
557 let mut page = ExportPage {
558 ops: vec![op, valid_op()],
559 };
560
561 let mut state = base_state();
562 state.apply_to_next(&mut page);
563 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap());
564 assert_eq!(
565 state.keys_at,
566 vec![
567 OpKey {
568 cid: "cid".to_string(),
569 did: "did".to_string(),
570 },
571 OpKey {
572 cid: "cid2".to_string(),
573 did: "did".to_string(),
574 },
575 ]
576 );
577 }
578
579 #[test]
580 fn test_add_new_next_time() {
581 let mut page = ExportPage {
582 ops: vec![next_op()],
583 };
584 let mut state = base_state();
585 state.apply_to_next(&mut page);
586 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap());
587 assert_eq!(
588 state.keys_at,
589 vec![OpKey {
590 cid: "cidnext".to_string(),
591 did: "didnext".to_string(),
592 },]
593 );
594 }
595
596 #[test]
597 fn test_add_new_next_time_with_dup() {
598 let mut page = ExportPage {
599 ops: vec![valid_op(), next_op()],
600 };
601 let mut state = base_state();
602 state.apply_to_next(&mut page);
603 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap());
604 assert_eq!(
605 state.keys_at,
606 vec![OpKey {
607 cid: "cidnext".to_string(),
608 did: "didnext".to_string(),
609 },]
610 );
611 assert_eq!(page.ops.len(), 1);
612 assert_eq!(page.ops[0], next_op());
613 }
614
615 #[test]
616 fn test_add_new_next_time_with_dup_and_new_prev_same_time() {
617 // make an op with a different OpKey
618 let mut op = valid_op();
619 op.cid = "cid2".to_string();
620
621 let mut page = ExportPage {
622 ops: vec![
623 valid_op(), // should get dropped
624 op.clone(), // should be kept
625 next_op(),
626 ],
627 };
628 let mut state = base_state();
629 state.apply_to_next(&mut page);
630 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap());
631 assert_eq!(
632 state.keys_at,
633 vec![OpKey {
634 cid: "cidnext".to_string(),
635 did: "didnext".to_string(),
636 },]
637 );
638 assert_eq!(page.ops.len(), 2);
639 assert_eq!(page.ops[0], op);
640 assert_eq!(page.ops[1], next_op());
641 }
642
643 #[test]
644 fn test_add_new_next_time_with_dup_later_and_new_prev_same_time() {
645 // make an op with a different OpKey
646 let mut op = valid_op();
647 op.cid = "cid2".to_string();
648
649 let mut page = ExportPage {
650 ops: vec![
651 op.clone(), // should be kept
652 valid_op(), // should get dropped
653 next_op(),
654 ],
655 };
656 let mut state = base_state();
657 state.apply_to_next(&mut page);
658 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap());
659 assert_eq!(
660 state.keys_at,
661 vec![OpKey {
662 cid: "cidnext".to_string(),
663 did: "didnext".to_string(),
664 },]
665 );
666 assert_eq!(page.ops.len(), 2);
667 assert_eq!(page.ops[0], op);
668 assert_eq!(page.ops[1], next_op());
669 }
670}