Server tools to backfill, tail, mirror, and verify PLC logs
50
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 484 lines 14 kB view raw
1use crate::{CLIENT, Dt, ExportPage, Op, OpKey}; 2use reqwest::Url; 3use std::time::Duration; 4use thiserror::Error; 5use tokio::sync::mpsc; 6 7#[derive(Debug, Error)] 8pub enum GetPageError { 9 #[error(transparent)] 10 Reqwest(#[from] reqwest::Error), 11 #[error(transparent)] 12 ReqwestMiddleware(#[from] reqwest_middleware::Error), 13 #[error(transparent)] 14 Serde(#[from] serde_json::Error), 15} 16 17/// ops are primary-keyed by (did, cid) 18/// plc orders by `created_at` but does not guarantee distinct times per op 19/// we assume that the order will at least be deterministic: this may be unsound 20#[derive(Debug, PartialEq)] 21pub struct LastOp { 22 pub created_at: Dt, // any op greater is definitely not duplicated 23 pk: (String, String), // did, cid 24} 25 26impl From<Op> for LastOp { 27 fn from(op: Op) -> Self { 28 Self { 29 created_at: op.created_at, 30 pk: (op.did, op.cid), 31 } 32 } 33} 34 35impl From<&Op> for LastOp { 36 fn from(op: &Op) -> Self { 37 Self { 38 created_at: op.created_at, 39 pk: (op.did.clone(), op.cid.clone()), 40 } 41 } 42} 43 44// bit of a hack 45impl From<Dt> for LastOp { 46 fn from(dt: Dt) -> Self { 47 Self { 48 created_at: dt, 49 pk: ("".to_string(), "".to_string()), 50 } 51 } 52} 53 54/// State for removing duplicates ops between PLC export page boundaries 55#[derive(Debug, PartialEq)] 56pub struct PageBoundaryState { 57 /// The previous page's last timestamp 58 /// 59 /// Duplicate ops from /export only occur for the same exact timestamp 60 pub last_at: Dt, 61 /// The previous page's ops at its last timestamp 62 keys_at: Vec<OpKey>, // expected to ~always be length one 63} 64 65impl PageBoundaryState { 66 /// Initialize the boundary state with a PLC page 67 pub fn new(page: &ExportPage) -> Option<Self> { 68 // grab the very last op 69 let (last_at, last_key) = page.ops.last().map(|op| (op.created_at, op.into()))?; 70 71 // set initial state 72 let mut me = Self { 73 last_at, 74 keys_at: vec![last_key], 75 }; 76 77 // and make sure all keys at this time are captured from the back 78 me.capture_nth_last_at(page, last_at, 1); 79 80 Some(me) 81 } 82 /// Apply the deduplication and update state 83 /// 84 /// The beginning of the page will be modified to remove duplicates from the 85 /// previous page. 86 /// 87 /// The end of the page is inspected to update the deduplicator state for 88 /// the next page. 89 fn apply_to_next(&mut self, page: &mut ExportPage) { 90 // walk ops forward, kicking previously-seen ops until created_at advances 91 let to_remove: Vec<usize> = page 92 .ops 93 .iter() 94 .enumerate() 95 .take_while(|(_, op)| op.created_at == self.last_at) 96 .filter(|(_, op)| self.keys_at.contains(&(*op).into())) 97 .map(|(i, _)| i) 98 .collect(); 99 100 // actually remove them. last to first so indices don't shift 101 for dup_idx in to_remove.into_iter().rev() { 102 page.ops.remove(dup_idx); 103 } 104 105 // grab the very last op 106 let Some((last_at, last_key)) = page.ops.last().map(|op| (op.created_at, op.into())) else { 107 // there are no ops left? oop. bail. 108 // last_at and existing keys remain in tact 109 return; 110 }; 111 112 // reset state (as long as time actually moved forward on this page) 113 if last_at > self.last_at { 114 self.last_at = last_at; 115 self.keys_at = vec![last_key]; 116 } else { 117 // weird cases: either time didn't move (fine...) or went backwards (not fine) 118 assert_eq!(last_at, self.last_at, "time moved backwards on a page"); 119 self.keys_at.push(last_key); 120 } 121 // and make sure all keys at this time are captured from the back 122 self.capture_nth_last_at(page, last_at, 1); 123 } 124 125 /// walk backwards from 2nd last and collect keys until created_at changes 126 fn capture_nth_last_at(&mut self, page: &ExportPage, last_at: Dt, skips: usize) { 127 page.ops 128 .iter() 129 .rev() 130 .skip(skips) 131 .take_while(|op| op.created_at == last_at) 132 .for_each(|op| { 133 self.keys_at.push(op.into()); 134 }); 135 } 136} 137 138/// Get one PLC export page 139/// 140/// Extracts the final op so it can be used to fetch the following page 141pub async fn get_page(url: Url) -> Result<(ExportPage, Option<LastOp>), GetPageError> { 142 log::trace!("Getting page: {url}"); 143 144 let ops: Vec<Op> = CLIENT 145 .get(url) 146 .send() 147 .await? 148 .error_for_status()? 149 .text() 150 .await? 151 .trim() 152 .split('\n') 153 .filter_map(|s| { 154 serde_json::from_str::<Op>(s) 155 .inspect_err(|e| { 156 if !s.is_empty() { 157 log::warn!("failed to parse op: {e} ({s})") 158 } 159 }) 160 .ok() 161 }) 162 .collect(); 163 164 let last_op = ops.last().map(Into::into); 165 166 Ok((ExportPage { ops }, last_op)) 167} 168 169/// Poll an upstream PLC server for new ops 170/// 171/// Pages of operations are written to the `dest` channel. 172/// 173/// ```no_run 174/// # #[tokio::main] 175/// # async fn main() { 176/// use allegedly::{ExportPage, Op, poll_upstream}; 177/// 178/// let after = Some(chrono::Utc::now()); 179/// let upstream = "https://plc.wtf/export".parse().unwrap(); 180/// let throttle = std::time::Duration::from_millis(300); 181/// 182/// let (tx, mut rx) = tokio::sync::mpsc::channel(1); 183/// tokio::task::spawn(poll_upstream(after, upstream, throttle, tx)); 184/// 185/// while let Some(ExportPage { ops }) = rx.recv().await { 186/// println!("received {} plc ops", ops.len()); 187/// 188/// for Op { did, cid, operation, .. } in ops { 189/// // in this example we're alerting when changes are found for one 190/// // specific identity 191/// if did == "did:plc:hdhoaan3xa3jiuq4fg4mefid" { 192/// println!("Update found for {did}! cid={cid}\n -> operation: {}", operation.get()); 193/// } 194/// } 195/// } 196/// # } 197/// ``` 198pub async fn poll_upstream( 199 after: Option<Dt>, 200 base: Url, 201 throttle: Duration, 202 dest: mpsc::Sender<ExportPage>, 203) -> anyhow::Result<&'static str> { 204 log::info!("starting upstream poller at {base} after {after:?}"); 205 let mut tick = tokio::time::interval(throttle); 206 let mut prev_last: Option<LastOp> = after.map(Into::into); 207 let mut boundary_state: Option<PageBoundaryState> = None; 208 loop { 209 tick.tick().await; 210 211 let mut url = base.clone(); 212 if let Some(ref pl) = prev_last { 213 url.query_pairs_mut() 214 .append_pair("after", &pl.created_at.to_rfc3339()); 215 }; 216 217 let (mut page, next_last) = get_page(url).await?; 218 if let Some(ref mut state) = boundary_state { 219 state.apply_to_next(&mut page); 220 } else { 221 boundary_state = PageBoundaryState::new(&page); 222 } 223 if !page.is_empty() { 224 match dest.try_send(page) { 225 Ok(()) => {} 226 Err(mpsc::error::TrySendError::Full(page)) => { 227 log::warn!("export: destination channel full, awaiting..."); 228 dest.send(page).await?; 229 } 230 e => e?, 231 }; 232 } 233 234 prev_last = next_last.or(prev_last); 235 } 236} 237 238#[cfg(test)] 239mod test { 240 use super::*; 241 242 const FIVES_TS: i64 = 1431648000; 243 const NEXT_TS: i64 = 1431648001; 244 245 fn valid_op() -> Op { 246 serde_json::from_value(serde_json::json!({ 247 "did": "did", 248 "cid": "cid", 249 "createdAt": "2015-05-15T00:00:00Z", 250 "nullified": false, 251 "operation": {}, 252 })) 253 .unwrap() 254 } 255 256 fn next_op() -> Op { 257 serde_json::from_value(serde_json::json!({ 258 "did": "didnext", 259 "cid": "cidnext", 260 "createdAt": "2015-05-15T00:00:01Z", 261 "nullified": false, 262 "operation": {}, 263 })) 264 .unwrap() 265 } 266 267 fn base_state() -> PageBoundaryState { 268 let page = ExportPage { 269 ops: vec![valid_op()], 270 }; 271 PageBoundaryState::new(&page).expect("to have a base page boundary state") 272 } 273 274 #[test] 275 fn test_boundary_new_empty() { 276 let page = ExportPage { ops: vec![] }; 277 let state = PageBoundaryState::new(&page); 278 assert!(state.is_none()); 279 } 280 281 #[test] 282 fn test_boundary_new_one_op() { 283 let page = ExportPage { 284 ops: vec![valid_op()], 285 }; 286 let state = PageBoundaryState::new(&page).unwrap(); 287 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); 288 assert_eq!( 289 state.keys_at, 290 vec![OpKey { 291 cid: "cid".to_string(), 292 did: "did".to_string(), 293 }] 294 ); 295 } 296 297 #[test] 298 fn test_add_new_empty() { 299 let mut state = base_state(); 300 state.apply_to_next(&mut ExportPage { ops: vec![] }); 301 assert_eq!(state, base_state()); 302 } 303 304 #[test] 305 fn test_add_new_same_op() { 306 let mut page = ExportPage { 307 ops: vec![valid_op()], 308 }; 309 let mut state = base_state(); 310 state.apply_to_next(&mut page); 311 assert_eq!(state, base_state()); 312 } 313 314 #[test] 315 fn test_add_new_same_time() { 316 // make an op with a different OpKey 317 let mut op = valid_op(); 318 op.cid = "cid2".to_string(); 319 let mut page = ExportPage { ops: vec![op] }; 320 321 let mut state = base_state(); 322 state.apply_to_next(&mut page); 323 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); 324 assert_eq!( 325 state.keys_at, 326 vec![ 327 OpKey { 328 cid: "cid".to_string(), 329 did: "did".to_string(), 330 }, 331 OpKey { 332 cid: "cid2".to_string(), 333 did: "did".to_string(), 334 }, 335 ] 336 ); 337 } 338 339 #[test] 340 fn test_add_new_same_time_dup_before() { 341 // make an op with a different OpKey 342 let mut op = valid_op(); 343 op.cid = "cid2".to_string(); 344 let mut page = ExportPage { 345 ops: vec![valid_op(), op], 346 }; 347 348 let mut state = base_state(); 349 state.apply_to_next(&mut page); 350 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); 351 assert_eq!( 352 state.keys_at, 353 vec![ 354 OpKey { 355 cid: "cid".to_string(), 356 did: "did".to_string(), 357 }, 358 OpKey { 359 cid: "cid2".to_string(), 360 did: "did".to_string(), 361 }, 362 ] 363 ); 364 } 365 366 #[test] 367 fn test_add_new_same_time_dup_after() { 368 // make an op with a different OpKey 369 let mut op = valid_op(); 370 op.cid = "cid2".to_string(); 371 let mut page = ExportPage { 372 ops: vec![op, valid_op()], 373 }; 374 375 let mut state = base_state(); 376 state.apply_to_next(&mut page); 377 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); 378 assert_eq!( 379 state.keys_at, 380 vec![ 381 OpKey { 382 cid: "cid".to_string(), 383 did: "did".to_string(), 384 }, 385 OpKey { 386 cid: "cid2".to_string(), 387 did: "did".to_string(), 388 }, 389 ] 390 ); 391 } 392 393 #[test] 394 fn test_add_new_next_time() { 395 let mut page = ExportPage { 396 ops: vec![next_op()], 397 }; 398 let mut state = base_state(); 399 state.apply_to_next(&mut page); 400 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap()); 401 assert_eq!( 402 state.keys_at, 403 vec![OpKey { 404 cid: "cidnext".to_string(), 405 did: "didnext".to_string(), 406 },] 407 ); 408 } 409 410 #[test] 411 fn test_add_new_next_time_with_dup() { 412 let mut page = ExportPage { 413 ops: vec![valid_op(), next_op()], 414 }; 415 let mut state = base_state(); 416 state.apply_to_next(&mut page); 417 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap()); 418 assert_eq!( 419 state.keys_at, 420 vec![OpKey { 421 cid: "cidnext".to_string(), 422 did: "didnext".to_string(), 423 },] 424 ); 425 assert_eq!(page.ops.len(), 1); 426 assert_eq!(page.ops[0], next_op()); 427 } 428 429 #[test] 430 fn test_add_new_next_time_with_dup_and_new_prev_same_time() { 431 // make an op with a different OpKey 432 let mut op = valid_op(); 433 op.cid = "cid2".to_string(); 434 435 let mut page = ExportPage { 436 ops: vec![ 437 valid_op(), // should get dropped 438 op.clone(), // should be kept 439 next_op(), 440 ], 441 }; 442 let mut state = base_state(); 443 state.apply_to_next(&mut page); 444 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap()); 445 assert_eq!( 446 state.keys_at, 447 vec![OpKey { 448 cid: "cidnext".to_string(), 449 did: "didnext".to_string(), 450 },] 451 ); 452 assert_eq!(page.ops.len(), 2); 453 assert_eq!(page.ops[0], op); 454 assert_eq!(page.ops[1], next_op()); 455 } 456 457 #[test] 458 fn test_add_new_next_time_with_dup_later_and_new_prev_same_time() { 459 // make an op with a different OpKey 460 let mut op = valid_op(); 461 op.cid = "cid2".to_string(); 462 463 let mut page = ExportPage { 464 ops: vec![ 465 op.clone(), // should be kept 466 valid_op(), // should get dropped 467 next_op(), 468 ], 469 }; 470 let mut state = base_state(); 471 state.apply_to_next(&mut page); 472 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap()); 473 assert_eq!( 474 state.keys_at, 475 vec![OpKey { 476 cid: "cidnext".to_string(), 477 did: "didnext".to_string(), 478 },] 479 ); 480 assert_eq!(page.ops.len(), 2); 481 assert_eq!(page.ops[0], op); 482 assert_eq!(page.ops[1], next_op()); 483 } 484}