Server tools to backfill, tail, mirror, and verify PLC logs
50
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor page boundary dedup

needs tests but it *should* be correct? other than parse error handling. maybe.

phil 015308e6 1e54e467

+140 -24
+6 -1
readme.md
··· 1 1 # Allegedly 2 2 3 - A public ledger copy machine for [PLC](https://github.com/did-method-plc/did-method-plc) mirrors running the canonical typescript directory code. 3 + Some [public ledger](https://github.com/did-method-plc/did-method-plc) tools and services 4 + 5 + Allegedly can 6 + 7 + - Tail PLC ops to stdout 8 + - Copy ops to postgres for a mirror running the [reference typescript implementation](https://github.com/did-method-plc/did-method-plc)
+20 -1
src/lib.rs
··· 16 16 17 17 /// One page of PLC export 18 18 /// 19 - /// Not limited, but expected to have up to about 1000 lines 19 + /// Expected to have up to around 1000 lines of raw json ops 20 20 #[derive(Debug)] 21 21 pub struct ExportPage { 22 22 pub ops: Vec<String>, ··· 28 28 } 29 29 } 30 30 31 + /// A fully-deserialized plc operation 32 + /// 33 + /// including the plc's wrapping with timestmap and nullified state 31 34 #[derive(Debug, Deserialize)] 32 35 #[serde(rename_all = "camelCase")] 33 36 pub struct Op<'a> { ··· 37 40 pub nullified: bool, 38 41 #[serde(borrow)] 39 42 pub operation: &'a serde_json::value::RawValue, 43 + } 44 + 45 + /// Database primary key for an op 46 + #[derive(Debug, PartialEq)] 47 + pub struct OpKey { 48 + pub did: String, 49 + pub cid: String, 50 + } 51 + 52 + impl From<&Op<'_>> for OpKey { 53 + fn from(Op { did, cid, .. }: &Op<'_>) -> Self { 54 + Self { 55 + did: did.to_string(), 56 + cid: cid.to_string(), 57 + } 58 + } 40 59 } 41 60 42 61 pub fn bin_init(name: &str) {
+114 -22
src/poll.rs
··· 1 - use crate::{CLIENT, Dt, ExportPage, Op}; 1 + use crate::{CLIENT, Dt, ExportPage, Op, OpKey}; 2 2 use std::time::Duration; 3 3 use thiserror::Error; 4 4 use url::Url; ··· 42 42 } 43 43 } 44 44 45 - impl ExportPage { 46 - /// this is a (slightly flawed) op deduplicator 47 - fn only_after_last(&mut self, last_op: &LastOp) { 48 - loop { 49 - let Some(s) = self.ops.first().cloned() else { 50 - break; 51 - }; 52 - let Ok(op) = serde_json::from_str::<Op>(&s) else { 53 - log::warn!( 54 - "deduplication failed op parsing ({s:?}), bailing for downstream to deal with." 55 - ); 56 - break; 45 + /// PLC 46 + struct PageBoundaryState { 47 + last_at: Dt, 48 + keys_at: Vec<OpKey>, // expected to ~always be length one 49 + } 50 + 51 + impl PageBoundaryState { 52 + fn new(page: &mut ExportPage) -> Option<Self> { 53 + // grab the very last op 54 + let (last_at, last_key) = loop { 55 + let Some(s) = page.ops.last().cloned() else { 56 + // there are no ops left? oop. bail. 57 + // last_at and existing keys remain in tact if there was no later op 58 + return None; 57 59 }; 58 - if op.created_at > last_op.created_at { 59 - break; 60 + if s.is_empty() { 61 + // annoying: trim off any trailing blank lines 62 + page.ops.pop(); 63 + continue; 60 64 } 61 - log::trace!("dedup: dropping an op"); 62 - self.ops.remove(0); 63 - if Into::<LastOp>::into(op) == *last_op { 64 - log::trace!("dedup: found exact op, keeping all after here"); 65 - break; 65 + let Ok(op) = serde_json::from_str::<Op>(&s) 66 + .inspect_err(|e| log::warn!("deduplication failed last op parsing ({s:?}: {e}), ignoring for downstream to deal with.")) 67 + else { 68 + // doubly annoying: skip over trailing garbage?? 69 + continue; 70 + }; 71 + break (op.created_at, Into::<OpKey>::into(&op)); 72 + }; 73 + 74 + // set initial state 75 + let mut me = Self { 76 + last_at, 77 + keys_at: vec![last_key], 78 + }; 79 + 80 + // and make sure all keys at this time are captured from the back 81 + page.ops 82 + .iter() 83 + .rev() 84 + .skip(1) // we alredy added the very last one 85 + .map(|s| serde_json::from_str::<Op>(s).inspect_err(|e| 86 + log::warn!("deduplication failed op parsing ({s:?}: {e}), bailing for downstream to deal with."))) 87 + .take_while(|opr| opr.as_ref().map(|op| op.created_at == last_at).unwrap_or(false)) 88 + .for_each(|opr| { 89 + let op = &opr.expect("any Errs were filtered by take_while"); 90 + me.keys_at.push(op.into()); 91 + }); 92 + 93 + Some(me) 94 + } 95 + fn apply_to_next(&mut self, page: &mut ExportPage) { 96 + // walk ops forward, kicking previously-seen ops until created_at advances 97 + let to_remove: Vec<usize> = page 98 + .ops 99 + .iter() 100 + .map(|s| serde_json::from_str::<Op>(s).inspect_err(|e| 101 + log::warn!("deduplication failed op parsing ({s:?}: {e}), bailing for downstream to deal with."))) 102 + .enumerate() 103 + .take_while(|(_, opr)| opr.as_ref().map(|op| op.created_at == self.last_at).unwrap_or(false)) 104 + .filter_map(|(i, opr)| { 105 + if self.keys_at.contains(&(&opr.expect("any Errs were filtered by take_while")).into()) { 106 + Some(i) 107 + } else { None } 108 + }) 109 + .collect(); 110 + 111 + // actually remove them. last to first to indices don't shift 112 + for dup_idx in to_remove.into_iter().rev() { 113 + page.ops.remove(dup_idx); 114 + } 115 + 116 + // grab the very last op 117 + let (last_at, last_key) = loop { 118 + let Some(s) = page.ops.last().cloned() else { 119 + // there are no ops left? oop. bail. 120 + // last_at and existing keys remain in tact if there was no later op 121 + return; 122 + }; 123 + if s.is_empty() { 124 + // annoying: trim off any trailing blank lines 125 + page.ops.pop(); 126 + continue; 66 127 } 128 + let Ok(op) = serde_json::from_str::<Op>(&s) 129 + .inspect_err(|e| log::warn!("deduplication failed last op parsing ({s:?}: {e}), ignoring for downstream to deal with.")) 130 + else { 131 + // doubly annoying: skip over trailing garbage?? 132 + continue; 133 + }; 134 + break (op.created_at, Into::<OpKey>::into(&op)); 135 + }; 136 + 137 + // reset state (as long as time actually moved forward on this page) 138 + if last_at > self.last_at { 139 + self.last_at = last_at; 140 + self.keys_at = vec![last_key]; 141 + } else { 142 + // weird cases: either time didn't move (fine...) or went backwards (not fine) 143 + assert_eq!(last_at, self.last_at, "time moved backwards on a page"); 67 144 } 145 + // and make sure all keys at this time are captured from the back 146 + page.ops 147 + .iter() 148 + .rev() 149 + .skip(1) // we alredy added the very last one 150 + .map(|s| serde_json::from_str::<Op>(s).inspect_err(|e| 151 + log::warn!("deduplication failed op parsing ({s:?}: {e}), bailing for downstream to deal with."))) 152 + .take_while(|opr| opr.as_ref().map(|op| op.created_at == last_at).unwrap_or(false)) 153 + .for_each(|opr| { 154 + let op = &opr.expect("any Errs were filtered by take_while"); 155 + self.keys_at.push(op.into()); 156 + }); 68 157 } 69 158 } 70 159 ··· 105 194 ) -> anyhow::Result<()> { 106 195 let mut tick = tokio::time::interval(UPSTREAM_REQUEST_INTERVAL); 107 196 let mut prev_last: Option<LastOp> = after.map(Into::into); 197 + let mut boundary_state: Option<PageBoundaryState> = None; 108 198 loop { 109 199 tick.tick().await; 110 200 ··· 115 205 }; 116 206 117 207 let (mut page, next_last) = get_page(url).await?; 118 - if let Some(ref pl) = prev_last { 119 - page.only_after_last(pl); 208 + if let Some(ref mut state) = boundary_state { 209 + state.apply_to_next(&mut page); 210 + } else { 211 + boundary_state = PageBoundaryState::new(&mut page); 120 212 } 121 213 if !page.is_empty() { 122 214 match dest.try_send(page) {