Server tools to backfill, tail, mirror, and verify PLC logs
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 670 lines 20 kB view raw
1use crate::{CLIENT, Dt, ExportPage, Op, OpKey, SeqOp, SeqPage}; 2use reqwest::Url; 3use std::time::Duration; 4use thiserror::Error; 5use tokio::sync::mpsc; 6 7#[derive(Debug, Error)] 8pub enum GetPageError { 9 #[error(transparent)] 10 Reqwest(#[from] reqwest::Error), 11 #[error(transparent)] 12 ReqwestMiddleware(#[from] reqwest_middleware::Error), 13 #[error(transparent)] 14 Serde(#[from] serde_json::Error), 15} 16 17/// ops are primary-keyed by (did, cid) 18/// plc orders by `created_at` but does not guarantee distinct times per op 19/// we assume that the order will at least be deterministic: this may be unsound 20#[derive(Debug, PartialEq)] 21pub struct LastOp { 22 pub created_at: Dt, // any op greater is definitely not duplicated 23 pk: (String, String), // did, cid 24} 25 26impl From<Op> for LastOp { 27 fn from(op: Op) -> Self { 28 Self { 29 created_at: op.created_at, 30 pk: (op.did, op.cid), 31 } 32 } 33} 34 35impl From<&Op> for LastOp { 36 fn from(op: &Op) -> Self { 37 Self { 38 created_at: op.created_at, 39 pk: (op.did.clone(), op.cid.clone()), 40 } 41 } 42} 43 44// bit of a hack 45impl From<Dt> for LastOp { 46 fn from(dt: Dt) -> Self { 47 Self { 48 created_at: dt, 49 pk: ("".to_string(), "".to_string()), 50 } 51 } 52} 53 54/// State for removing duplicates ops between PLC export page boundaries 55#[derive(Debug, PartialEq)] 56pub struct PageBoundaryState { 57 /// The previous page's last timestamp 58 /// 59 /// Duplicate ops from /export only occur for the same exact timestamp 60 pub last_at: Dt, 61 /// The previous page's ops at its last timestamp 62 keys_at: Vec<OpKey>, // expected to ~always be length one 63} 64 65impl PageBoundaryState { 66 /// Initialize the boundary state with a PLC page 67 pub fn new(page: &ExportPage) -> Option<Self> { 68 // grab the very last op 69 let (last_at, last_key) = page.ops.last().map(|op| (op.created_at, op.into()))?; 70 71 // set initial state 72 let mut me = Self { 73 last_at, 74 keys_at: vec![last_key], 75 }; 76 77 // and make sure all keys at this time are captured from the back 78 me.capture_nth_last_at(page, last_at, 1); 79 80 Some(me) 81 } 82 /// Apply the deduplication and update state 83 /// 84 /// The beginning of the page will be modified to remove duplicates from the 85 /// previous page. 86 /// 87 /// The end of the page is inspected to update the deduplicator state for 88 /// the next page. 89 fn apply_to_next(&mut self, page: &mut ExportPage) { 90 // walk ops forward, kicking previously-seen ops until created_at advances 91 let to_remove: Vec<usize> = page 92 .ops 93 .iter() 94 .enumerate() 95 .take_while(|(_, op)| op.created_at == self.last_at) 96 .filter(|(_, op)| self.keys_at.contains(&(*op).into())) 97 .map(|(i, _)| i) 98 .collect(); 99 100 // actually remove them. last to first so indices don't shift 101 for dup_idx in to_remove.into_iter().rev() { 102 page.ops.remove(dup_idx); 103 } 104 105 // grab the very last op 106 let Some((last_at, last_key)) = page.ops.last().map(|op| (op.created_at, op.into())) else { 107 // there are no ops left? oop. bail. 108 // last_at and existing keys remain in tact 109 return; 110 }; 111 112 // reset state (as long as time actually moved forward on this page) 113 if last_at > self.last_at { 114 self.last_at = last_at; 115 self.keys_at = vec![last_key]; 116 } else { 117 // weird cases: either time didn't move (fine...) or went backwards (not fine) 118 assert_eq!(last_at, self.last_at, "time moved backwards on a page"); 119 self.keys_at.push(last_key); 120 } 121 // and make sure all keys at this time are captured from the back 122 self.capture_nth_last_at(page, last_at, 1); 123 } 124 125 /// walk backwards from 2nd last and collect keys until created_at changes 126 fn capture_nth_last_at(&mut self, page: &ExportPage, last_at: Dt, skips: usize) { 127 page.ops 128 .iter() 129 .rev() 130 .skip(skips) 131 .take_while(|op| op.created_at == last_at) 132 .for_each(|op| { 133 self.keys_at.push(op.into()); 134 }); 135 } 136} 137 138/// Get one PLC export page 139/// 140/// Extracts the final op so it can be used to fetch the following page 141pub async fn get_page(url: Url) -> Result<(ExportPage, Option<LastOp>), GetPageError> { 142 use futures::TryStreamExt; 143 use tokio::io::{AsyncBufReadExt, BufReader}; 144 use tokio_util::compat::FuturesAsyncReadCompatExt; 145 146 tracing::trace!("Getting page: {url}"); 147 148 let res = CLIENT.get(url).send().await?.error_for_status()?; 149 let stream = Box::pin( 150 res.bytes_stream() 151 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) 152 .into_async_read() 153 .compat(), 154 ); 155 156 let mut lines = BufReader::new(stream).lines(); 157 let mut ops = Vec::new(); 158 159 loop { 160 match lines.next_line().await { 161 Ok(Some(line)) => { 162 let line = line.trim(); 163 if line.is_empty() { 164 continue; 165 } 166 match serde_json::from_str::<Op>(line) { 167 Ok(op) => ops.push(op), 168 Err(e) => tracing::warn!("failed to parse op: {e} ({line})"), 169 } 170 } 171 Ok(None) => break, 172 Err(e) => { 173 tracing::warn!("transport error mid-page: {}; returning partial page", e); 174 break; 175 } 176 } 177 } 178 179 let last_op = ops.last().map(Into::into); 180 181 Ok((ExportPage { ops }, last_op)) 182} 183 184/// Poll an upstream PLC server for new ops 185/// 186/// Pages of operations are written to the `dest` channel. 187/// 188/// ```no_run 189/// # #[tokio::main] 190/// # async fn main() { 191/// use allegedly::{ExportPage, Op, poll_upstream}; 192/// 193/// let after = Some(chrono::Utc::now()); 194/// let upstream = "https://plc.wtf/export".parse().unwrap(); 195/// let throttle = std::time::Duration::from_millis(300); 196/// 197/// let (tx, mut rx) = tokio::sync::mpsc::channel(1); 198/// tokio::task::spawn(poll_upstream(after, upstream, throttle, tx)); 199/// 200/// while let Some(ExportPage { ops }) = rx.recv().await { 201/// println!("received {} plc ops", ops.len()); 202/// 203/// for Op { did, cid, operation, .. } in ops { 204/// // in this example we're alerting when changes are found for one 205/// // specific identity 206/// if did == "did:plc:hdhoaan3xa3jiuq4fg4mefid" { 207/// println!("Update found for {did}! cid={cid}\n -> operation: {}", operation.get()); 208/// } 209/// } 210/// } 211/// # } 212/// ``` 213pub async fn poll_upstream( 214 after: Option<Dt>, 215 base: Url, 216 throttle: Duration, 217 dest: mpsc::Sender<ExportPage>, 218) -> anyhow::Result<&'static str> { 219 tracing::info!("starting upstream poller at {base} after {after:?}"); 220 let mut tick = tokio::time::interval(throttle); 221 let mut prev_last: Option<LastOp> = after.map(Into::into); 222 let mut boundary_state: Option<PageBoundaryState> = None; 223 loop { 224 tick.tick().await; 225 226 let mut url = base.clone(); 227 if let Some(ref pl) = prev_last { 228 url.query_pairs_mut() 229 .append_pair("after", &pl.created_at.to_rfc3339()); 230 }; 231 232 let (mut page, next_last) = match get_page(url).await { 233 Ok(res) => res, 234 Err(e) => { 235 tracing::warn!("error polling upstream: {e}"); 236 continue; 237 } 238 }; 239 240 if let Some(ref mut state) = boundary_state { 241 state.apply_to_next(&mut page); 242 } else { 243 boundary_state = PageBoundaryState::new(&page); 244 } 245 if !page.is_empty() { 246 match dest.try_send(page) { 247 Ok(()) => {} 248 Err(mpsc::error::TrySendError::Full(page)) => { 249 tracing::warn!("export: destination channel full, awaiting..."); 250 dest.send(page).await?; 251 } 252 e => e?, 253 }; 254 } 255 256 prev_last = next_last.or(prev_last); 257 } 258} 259 260/// Fetch one page of seq-based export from `/export?after=<seq>` 261async fn get_seq_page(url: Url) -> Result<SeqPage, GetPageError> { 262 use futures::TryStreamExt; 263 use tokio::io::{AsyncBufReadExt, BufReader}; 264 use tokio_util::compat::FuturesAsyncReadCompatExt; 265 266 tracing::trace!("getting seq page: {url}"); 267 268 let res = CLIENT.get(url).send().await?.error_for_status()?; 269 let stream = Box::pin( 270 res.bytes_stream() 271 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) 272 .into_async_read() 273 .compat(), 274 ); 275 276 let mut lines = BufReader::new(stream).lines(); 277 let mut ops = Vec::new(); 278 279 loop { 280 match lines.next_line().await { 281 Ok(Some(line)) => { 282 let line = line.trim(); 283 if line.is_empty() { 284 continue; 285 } 286 match serde_json::from_str::<SeqOp>(line) { 287 Ok(op) => ops.push(op), 288 Err(e) => tracing::warn!("failed to parse seq op: {e} ({line})"), 289 } 290 } 291 Ok(None) => break, 292 Err(e) => { 293 tracing::warn!( 294 "transport error mid-seq-page: {}; returning partial page", 295 e 296 ); 297 break; 298 } 299 } 300 } 301 302 Ok(SeqPage { ops }) 303} 304 305/// Poll an upstream PLC server using seq-number-based cursoring 306/// 307/// Uses `/export?after=<seq>` — each op from the server carries a `seq` field 308/// which is a globally monotonic unsigned integer. Because seq is unique per op 309/// there is no need for page-boundary deduplication. 310/// 311/// Pages are sent to `dest`. Returns when the channel closes. 312pub async fn poll_upstream_seq( 313 after: Option<u64>, 314 base: Url, 315 throttle: Duration, 316 dest: mpsc::Sender<SeqPage>, 317) -> anyhow::Result<&'static str> { 318 tracing::info!("starting seq upstream poller at {base} after {after:?}"); 319 let mut tick = tokio::time::interval(throttle); 320 let mut last_seq: u64 = after.unwrap_or(0); 321 322 loop { 323 tick.tick().await; 324 325 let mut url = base.clone(); 326 url.query_pairs_mut() 327 .append_pair("after", &last_seq.to_string()); 328 329 let page = match get_seq_page(url).await { 330 Ok(p) => p, 331 Err(e) => { 332 tracing::warn!("error polling upstream (seq): {e}"); 333 continue; 334 } 335 }; 336 337 if let Some(last) = page.ops.last() { 338 last_seq = last.seq; 339 } 340 341 if !page.is_empty() { 342 tracing::debug!( 343 "seq poll: page with {} ops, seq {}..{}", 344 page.ops.len(), 345 page.ops.first().map(|op| op.seq).unwrap_or(0), 346 last_seq 347 ); 348 match dest.try_send(page) { 349 Ok(()) => {} 350 Err(mpsc::error::TrySendError::Full(page)) => { 351 tracing::warn!("seq poll: destination channel full, awaiting..."); 352 dest.send(page).await?; 353 } 354 e => e?, 355 }; 356 } 357 } 358} 359 360/// Tail the upstream PLC `/export/stream` WebSocket endpoint 361/// 362/// `cursor` is a seq number to resume from. The server only supports backfill 363/// of up to ~1 week (server-configurable), so this cannot replay from seq 0. 364/// Use `poll_upstream_seq` to catch up first, then hand off to this function. 365/// 366/// Messages arrive as single-op `SeqPage`s sent to `dest`. Returns on 367/// disconnect so the caller can reconnect or fall back to polling. 368pub async fn tail_upstream_stream( 369 cursor: Option<u64>, 370 base: Url, 371 dest: mpsc::Sender<SeqPage>, 372) -> anyhow::Result<()> { 373 use futures::StreamExt; 374 use tokio_tungstenite::{connect_async, tungstenite::Message}; 375 376 let mut url = base.clone(); 377 // convert ws(s):// scheme if needed; some callers pass http(s):// 378 let ws_scheme = match url.scheme() { 379 "https" => "wss", 380 "http" => "ws", 381 _ => "ws", 382 } 383 .to_owned(); 384 url.set_scheme(&ws_scheme) 385 .map_err(|_| anyhow::anyhow!("failed to set websocket scheme"))?; 386 if let Some(seq) = cursor { 387 url.query_pairs_mut() 388 .append_pair("cursor", &seq.to_string()); 389 } 390 391 tracing::info!("connecting to stream: {url}"); 392 let (mut ws, _) = connect_async(url.as_str()).await?; 393 tracing::info!("stream connected"); 394 395 while let Some(msg) = ws.next().await { 396 let msg = msg?; 397 let text = match msg { 398 Message::Text(t) => t, 399 Message::Close(_) => { 400 tracing::info!("stream closed by server"); 401 break; 402 } 403 _ => continue, 404 }; 405 406 let op: SeqOp = match serde_json::from_str(&text) { 407 Ok(op) => op, 408 Err(e) => { 409 tracing::warn!("failed to parse stream event: {e} ({text})"); 410 continue; 411 } 412 }; 413 414 let page = SeqPage { ops: vec![op] }; 415 if dest.send(page).await.is_err() { 416 tracing::info!("stream dest channel closed, stopping"); 417 break; 418 } 419 } 420 421 Ok(()) 422} 423 424#[cfg(test)] 425mod test { 426 use super::*; 427 428 const FIVES_TS: i64 = 1431648000; 429 const NEXT_TS: i64 = 1431648001; 430 431 fn valid_op() -> Op { 432 serde_json::from_value(serde_json::json!({ 433 "did": "did", 434 "cid": "cid", 435 "createdAt": "2015-05-15T00:00:00Z", 436 "nullified": false, 437 "operation": {}, 438 })) 439 .unwrap() 440 } 441 442 fn next_op() -> Op { 443 serde_json::from_value(serde_json::json!({ 444 "did": "didnext", 445 "cid": "cidnext", 446 "createdAt": "2015-05-15T00:00:01Z", 447 "nullified": false, 448 "operation": {}, 449 })) 450 .unwrap() 451 } 452 453 fn base_state() -> PageBoundaryState { 454 let page = ExportPage { 455 ops: vec![valid_op()], 456 }; 457 PageBoundaryState::new(&page).expect("to have a base page boundary state") 458 } 459 460 #[test] 461 fn test_boundary_new_empty() { 462 let page = ExportPage { ops: vec![] }; 463 let state = PageBoundaryState::new(&page); 464 assert!(state.is_none()); 465 } 466 467 #[test] 468 fn test_boundary_new_one_op() { 469 let page = ExportPage { 470 ops: vec![valid_op()], 471 }; 472 let state = PageBoundaryState::new(&page).unwrap(); 473 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); 474 assert_eq!( 475 state.keys_at, 476 vec![OpKey { 477 cid: "cid".to_string(), 478 did: "did".to_string(), 479 }] 480 ); 481 } 482 483 #[test] 484 fn test_add_new_empty() { 485 let mut state = base_state(); 486 state.apply_to_next(&mut ExportPage { ops: vec![] }); 487 assert_eq!(state, base_state()); 488 } 489 490 #[test] 491 fn test_add_new_same_op() { 492 let mut page = ExportPage { 493 ops: vec![valid_op()], 494 }; 495 let mut state = base_state(); 496 state.apply_to_next(&mut page); 497 assert_eq!(state, base_state()); 498 } 499 500 #[test] 501 fn test_add_new_same_time() { 502 // make an op with a different OpKey 503 let mut op = valid_op(); 504 op.cid = "cid2".to_string(); 505 let mut page = ExportPage { ops: vec![op] }; 506 507 let mut state = base_state(); 508 state.apply_to_next(&mut page); 509 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); 510 assert_eq!( 511 state.keys_at, 512 vec![ 513 OpKey { 514 cid: "cid".to_string(), 515 did: "did".to_string(), 516 }, 517 OpKey { 518 cid: "cid2".to_string(), 519 did: "did".to_string(), 520 }, 521 ] 522 ); 523 } 524 525 #[test] 526 fn test_add_new_same_time_dup_before() { 527 // make an op with a different OpKey 528 let mut op = valid_op(); 529 op.cid = "cid2".to_string(); 530 let mut page = ExportPage { 531 ops: vec![valid_op(), op], 532 }; 533 534 let mut state = base_state(); 535 state.apply_to_next(&mut page); 536 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); 537 assert_eq!( 538 state.keys_at, 539 vec![ 540 OpKey { 541 cid: "cid".to_string(), 542 did: "did".to_string(), 543 }, 544 OpKey { 545 cid: "cid2".to_string(), 546 did: "did".to_string(), 547 }, 548 ] 549 ); 550 } 551 552 #[test] 553 fn test_add_new_same_time_dup_after() { 554 // make an op with a different OpKey 555 let mut op = valid_op(); 556 op.cid = "cid2".to_string(); 557 let mut page = ExportPage { 558 ops: vec![op, valid_op()], 559 }; 560 561 let mut state = base_state(); 562 state.apply_to_next(&mut page); 563 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); 564 assert_eq!( 565 state.keys_at, 566 vec![ 567 OpKey { 568 cid: "cid".to_string(), 569 did: "did".to_string(), 570 }, 571 OpKey { 572 cid: "cid2".to_string(), 573 did: "did".to_string(), 574 }, 575 ] 576 ); 577 } 578 579 #[test] 580 fn test_add_new_next_time() { 581 let mut page = ExportPage { 582 ops: vec![next_op()], 583 }; 584 let mut state = base_state(); 585 state.apply_to_next(&mut page); 586 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap()); 587 assert_eq!( 588 state.keys_at, 589 vec![OpKey { 590 cid: "cidnext".to_string(), 591 did: "didnext".to_string(), 592 },] 593 ); 594 } 595 596 #[test] 597 fn test_add_new_next_time_with_dup() { 598 let mut page = ExportPage { 599 ops: vec![valid_op(), next_op()], 600 }; 601 let mut state = base_state(); 602 state.apply_to_next(&mut page); 603 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap()); 604 assert_eq!( 605 state.keys_at, 606 vec![OpKey { 607 cid: "cidnext".to_string(), 608 did: "didnext".to_string(), 609 },] 610 ); 611 assert_eq!(page.ops.len(), 1); 612 assert_eq!(page.ops[0], next_op()); 613 } 614 615 #[test] 616 fn test_add_new_next_time_with_dup_and_new_prev_same_time() { 617 // make an op with a different OpKey 618 let mut op = valid_op(); 619 op.cid = "cid2".to_string(); 620 621 let mut page = ExportPage { 622 ops: vec![ 623 valid_op(), // should get dropped 624 op.clone(), // should be kept 625 next_op(), 626 ], 627 }; 628 let mut state = base_state(); 629 state.apply_to_next(&mut page); 630 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap()); 631 assert_eq!( 632 state.keys_at, 633 vec![OpKey { 634 cid: "cidnext".to_string(), 635 did: "didnext".to_string(), 636 },] 637 ); 638 assert_eq!(page.ops.len(), 2); 639 assert_eq!(page.ops[0], op); 640 assert_eq!(page.ops[1], next_op()); 641 } 642 643 #[test] 644 fn test_add_new_next_time_with_dup_later_and_new_prev_same_time() { 645 // make an op with a different OpKey 646 let mut op = valid_op(); 647 op.cid = "cid2".to_string(); 648 649 let mut page = ExportPage { 650 ops: vec![ 651 op.clone(), // should be kept 652 valid_op(), // should get dropped 653 next_op(), 654 ], 655 }; 656 let mut state = base_state(); 657 state.apply_to_next(&mut page); 658 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap()); 659 assert_eq!( 660 state.keys_at, 661 vec![OpKey { 662 cid: "cidnext".to_string(), 663 did: "didnext".to_string(), 664 },] 665 ); 666 assert_eq!(page.ops.len(), 2); 667 assert_eq!(page.ops[0], op); 668 assert_eq!(page.ops[1], next_op()); 669 } 670}