Server tools to backfill, tail, mirror, and verify PLC logs
50
fork

Configure Feed

Select the types of activity you want to include in your feed.

at debug 439 lines 13 kB view raw
1use crate::{CLIENT, Dt, ExportPage, Op, OpKey}; 2use reqwest::Url; 3use std::time::Duration; 4use thiserror::Error; 5use tokio::sync::mpsc; 6 7// plc.directory ratelimit on /export is 500 per 5 mins 8const UPSTREAM_REQUEST_INTERVAL: Duration = Duration::from_millis(600); 9 10#[derive(Debug, Error)] 11pub enum GetPageError { 12 #[error(transparent)] 13 Reqwest(#[from] reqwest::Error), 14 #[error(transparent)] 15 ReqwestMiddleware(#[from] reqwest_middleware::Error), 16 #[error(transparent)] 17 Serde(#[from] serde_json::Error), 18} 19 20/// ops are primary-keyed by (did, cid) 21/// plc orders by `created_at` but does not guarantee distinct times per op 22/// we assume that the order will at least be deterministic: this may be unsound 23#[derive(Debug, PartialEq)] 24pub struct LastOp { 25 pub created_at: Dt, // any op greater is definitely not duplicated 26 pk: (String, String), // did, cid 27} 28 29impl From<Op> for LastOp { 30 fn from(op: Op) -> Self { 31 Self { 32 created_at: op.created_at, 33 pk: (op.did, op.cid), 34 } 35 } 36} 37 38impl From<&Op> for LastOp { 39 fn from(op: &Op) -> Self { 40 Self { 41 created_at: op.created_at, 42 pk: (op.did.clone(), op.cid.clone()), 43 } 44 } 45} 46 47// bit of a hack 48impl From<Dt> for LastOp { 49 fn from(dt: Dt) -> Self { 50 Self { 51 created_at: dt, 52 pk: ("".to_string(), "".to_string()), 53 } 54 } 55} 56 57/// PLC 58#[derive(Debug, PartialEq)] 59pub struct PageBoundaryState { 60 pub last_at: Dt, 61 keys_at: Vec<OpKey>, // expected to ~always be length one 62} 63 64/// track keys at final createdAt to deduplicate the start of the next page 65impl PageBoundaryState { 66 pub fn new(page: &ExportPage) -> Option<Self> { 67 // grab the very last op 68 let (last_at, last_key) = page.ops.last().map(|op| (op.created_at, op.into()))?; 69 70 // set initial state 71 let mut me = Self { 72 last_at, 73 keys_at: vec![last_key], 74 }; 75 76 // and make sure all keys at this time are captured from the back 77 me.capture_nth_last_at(page, last_at, 1); 78 79 Some(me) 80 } 81 fn apply_to_next(&mut self, page: &mut ExportPage) { 82 // walk ops forward, kicking previously-seen ops until created_at advances 83 let to_remove: Vec<usize> = page 84 .ops 85 .iter() 86 .enumerate() 87 .take_while(|(_, op)| op.created_at == self.last_at) 88 .filter(|(_, op)| self.keys_at.contains(&(*op).into())) 89 .map(|(i, _)| i) 90 .collect(); 91 92 // actually remove them. last to first so indices don't shift 93 for dup_idx in to_remove.into_iter().rev() { 94 page.ops.remove(dup_idx); 95 } 96 97 // grab the very last op 98 let Some((last_at, last_key)) = page.ops.last().map(|op| (op.created_at, op.into())) else { 99 // there are no ops left? oop. bail. 100 // last_at and existing keys remain in tact 101 return; 102 }; 103 104 // reset state (as long as time actually moved forward on this page) 105 if last_at > self.last_at { 106 self.last_at = last_at; 107 self.keys_at = vec![last_key]; 108 } else { 109 // weird cases: either time didn't move (fine...) or went backwards (not fine) 110 assert_eq!(last_at, self.last_at, "time moved backwards on a page"); 111 self.keys_at.push(last_key); 112 } 113 // and make sure all keys at this time are captured from the back 114 self.capture_nth_last_at(page, last_at, 1); 115 } 116 117 /// walk backwards from 2nd last and collect keys until created_at changes 118 fn capture_nth_last_at(&mut self, page: &ExportPage, last_at: Dt, skips: usize) { 119 page.ops 120 .iter() 121 .rev() 122 .skip(skips) 123 .take_while(|op| op.created_at == last_at) 124 .for_each(|op| { 125 self.keys_at.push(op.into()); 126 }); 127 } 128} 129 130pub async fn get_page(url: Url) -> Result<(ExportPage, Option<LastOp>), GetPageError> { 131 log::trace!("Getting page: {url}"); 132 133 let ops: Vec<Op> = CLIENT 134 .get(url) 135 .send() 136 .await? 137 .error_for_status()? 138 .text() 139 .await? 140 .trim() 141 .split('\n') 142 .filter_map(|s| { 143 serde_json::from_str::<Op>(s) 144 .inspect_err(|e| log::warn!("failed to parse op: {e} ({s})")) 145 .ok() 146 }) 147 .collect(); 148 149 let last_op = ops.last().map(Into::into); 150 151 Ok((ExportPage { ops }, last_op)) 152} 153 154pub async fn poll_upstream( 155 after: Option<Dt>, 156 base: Url, 157 dest: mpsc::Sender<ExportPage>, 158) -> anyhow::Result<&'static str> { 159 log::info!("starting upstream poller after {after:?}"); 160 let mut tick = tokio::time::interval(UPSTREAM_REQUEST_INTERVAL); 161 let mut prev_last: Option<LastOp> = after.map(Into::into); 162 let mut boundary_state: Option<PageBoundaryState> = None; 163 loop { 164 tick.tick().await; 165 166 let mut url = base.clone(); 167 if let Some(ref pl) = prev_last { 168 url.query_pairs_mut() 169 .append_pair("after", &pl.created_at.to_rfc3339()); 170 }; 171 172 let (mut page, next_last) = get_page(url).await?; 173 if let Some(ref mut state) = boundary_state { 174 state.apply_to_next(&mut page); 175 } else { 176 boundary_state = PageBoundaryState::new(&page); 177 } 178 if !page.is_empty() { 179 match dest.try_send(page) { 180 Ok(()) => {} 181 Err(mpsc::error::TrySendError::Full(page)) => { 182 log::warn!("export: destination channel full, awaiting..."); 183 dest.send(page).await?; 184 } 185 e => e?, 186 }; 187 } 188 189 prev_last = next_last.or(prev_last); 190 } 191} 192 193#[cfg(test)] 194mod test { 195 use super::*; 196 197 const FIVES_TS: i64 = 1431648000; 198 const NEXT_TS: i64 = 1431648001; 199 200 fn valid_op() -> Op { 201 serde_json::from_value(serde_json::json!({ 202 "did": "did", 203 "cid": "cid", 204 "createdAt": "2015-05-15T00:00:00Z", 205 "nullified": false, 206 "operation": {}, 207 })) 208 .unwrap() 209 } 210 211 fn next_op() -> Op { 212 serde_json::from_value(serde_json::json!({ 213 "did": "didnext", 214 "cid": "cidnext", 215 "createdAt": "2015-05-15T00:00:01Z", 216 "nullified": false, 217 "operation": {}, 218 })) 219 .unwrap() 220 } 221 222 fn base_state() -> PageBoundaryState { 223 let page = ExportPage { 224 ops: vec![valid_op()], 225 }; 226 PageBoundaryState::new(&page).expect("to have a base page boundary state") 227 } 228 229 #[test] 230 fn test_boundary_new_empty() { 231 let page = ExportPage { ops: vec![] }; 232 let state = PageBoundaryState::new(&page); 233 assert!(state.is_none()); 234 } 235 236 #[test] 237 fn test_boundary_new_one_op() { 238 let page = ExportPage { 239 ops: vec![valid_op()], 240 }; 241 let state = PageBoundaryState::new(&page).unwrap(); 242 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); 243 assert_eq!( 244 state.keys_at, 245 vec![OpKey { 246 cid: "cid".to_string(), 247 did: "did".to_string(), 248 }] 249 ); 250 } 251 252 #[test] 253 fn test_add_new_empty() { 254 let mut state = base_state(); 255 state.apply_to_next(&mut ExportPage { ops: vec![] }); 256 assert_eq!(state, base_state()); 257 } 258 259 #[test] 260 fn test_add_new_same_op() { 261 let mut page = ExportPage { 262 ops: vec![valid_op()], 263 }; 264 let mut state = base_state(); 265 state.apply_to_next(&mut page); 266 assert_eq!(state, base_state()); 267 } 268 269 #[test] 270 fn test_add_new_same_time() { 271 // make an op with a different OpKey 272 let mut op = valid_op(); 273 op.cid = "cid2".to_string(); 274 let mut page = ExportPage { ops: vec![op] }; 275 276 let mut state = base_state(); 277 state.apply_to_next(&mut page); 278 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); 279 assert_eq!( 280 state.keys_at, 281 vec![ 282 OpKey { 283 cid: "cid".to_string(), 284 did: "did".to_string(), 285 }, 286 OpKey { 287 cid: "cid2".to_string(), 288 did: "did".to_string(), 289 }, 290 ] 291 ); 292 } 293 294 #[test] 295 fn test_add_new_same_time_dup_before() { 296 // make an op with a different OpKey 297 let mut op = valid_op(); 298 op.cid = "cid2".to_string(); 299 let mut page = ExportPage { 300 ops: vec![valid_op(), op], 301 }; 302 303 let mut state = base_state(); 304 state.apply_to_next(&mut page); 305 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); 306 assert_eq!( 307 state.keys_at, 308 vec![ 309 OpKey { 310 cid: "cid".to_string(), 311 did: "did".to_string(), 312 }, 313 OpKey { 314 cid: "cid2".to_string(), 315 did: "did".to_string(), 316 }, 317 ] 318 ); 319 } 320 321 #[test] 322 fn test_add_new_same_time_dup_after() { 323 // make an op with a different OpKey 324 let mut op = valid_op(); 325 op.cid = "cid2".to_string(); 326 let mut page = ExportPage { 327 ops: vec![op, valid_op()], 328 }; 329 330 let mut state = base_state(); 331 state.apply_to_next(&mut page); 332 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); 333 assert_eq!( 334 state.keys_at, 335 vec![ 336 OpKey { 337 cid: "cid".to_string(), 338 did: "did".to_string(), 339 }, 340 OpKey { 341 cid: "cid2".to_string(), 342 did: "did".to_string(), 343 }, 344 ] 345 ); 346 } 347 348 #[test] 349 fn test_add_new_next_time() { 350 let mut page = ExportPage { 351 ops: vec![next_op()], 352 }; 353 let mut state = base_state(); 354 state.apply_to_next(&mut page); 355 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap()); 356 assert_eq!( 357 state.keys_at, 358 vec![OpKey { 359 cid: "cidnext".to_string(), 360 did: "didnext".to_string(), 361 },] 362 ); 363 } 364 365 #[test] 366 fn test_add_new_next_time_with_dup() { 367 let mut page = ExportPage { 368 ops: vec![valid_op(), next_op()], 369 }; 370 let mut state = base_state(); 371 state.apply_to_next(&mut page); 372 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap()); 373 assert_eq!( 374 state.keys_at, 375 vec![OpKey { 376 cid: "cidnext".to_string(), 377 did: "didnext".to_string(), 378 },] 379 ); 380 assert_eq!(page.ops.len(), 1); 381 assert_eq!(page.ops[0], next_op()); 382 } 383 384 #[test] 385 fn test_add_new_next_time_with_dup_and_new_prev_same_time() { 386 // make an op with a different OpKey 387 let mut op = valid_op(); 388 op.cid = "cid2".to_string(); 389 390 let mut page = ExportPage { 391 ops: vec![ 392 valid_op(), // should get dropped 393 op.clone(), // should be kept 394 next_op(), 395 ], 396 }; 397 let mut state = base_state(); 398 state.apply_to_next(&mut page); 399 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap()); 400 assert_eq!( 401 state.keys_at, 402 vec![OpKey { 403 cid: "cidnext".to_string(), 404 did: "didnext".to_string(), 405 },] 406 ); 407 assert_eq!(page.ops.len(), 2); 408 assert_eq!(page.ops[0], op); 409 assert_eq!(page.ops[1], next_op()); 410 } 411 412 #[test] 413 fn test_add_new_next_time_with_dup_later_and_new_prev_same_time() { 414 // make an op with a different OpKey 415 let mut op = valid_op(); 416 op.cid = "cid2".to_string(); 417 418 let mut page = ExportPage { 419 ops: vec![ 420 op.clone(), // should be kept 421 valid_op(), // should get dropped 422 next_op(), 423 ], 424 }; 425 let mut state = base_state(); 426 state.apply_to_next(&mut page); 427 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap()); 428 assert_eq!( 429 state.keys_at, 430 vec![OpKey { 431 cid: "cidnext".to_string(), 432 did: "didnext".to_string(), 433 },] 434 ); 435 assert_eq!(page.ops.len(), 2); 436 assert_eq!(page.ops[0], op); 437 assert_eq!(page.ops[1], next_op()); 438 } 439}