Server tools to backfill, tail, mirror, and verify PLC logs
49
fork

Configure Feed

Select the types of activity you want to include in your feed.

very very broken

phil 9ecf1716 0e0f77ad

+177 -9
+20
Cargo.lock
··· 34 34 "chrono", 35 35 "clap", 36 36 "env_logger", 37 + "flate2", 37 38 "flume", 38 39 "log", 39 40 "reqwest", ··· 288 289 ] 289 290 290 291 [[package]] 292 + name = "crc32fast" 293 + version = "1.5.0" 294 + source = "registry+https://github.com/rust-lang/crates.io-index" 295 + checksum = "9481c1c90cbf2ac953f07c8d4a58aa3945c425b7185c9154d67a65e4230da511" 296 + dependencies = [ 297 + "cfg-if", 298 + ] 299 + 300 + [[package]] 291 301 name = "crypto-common" 292 302 version = "0.1.6" 293 303 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 384 394 version = "0.1.1" 385 395 source = "registry+https://github.com/rust-lang/crates.io-index" 386 396 checksum = "7fd99930f64d146689264c637b5af2f0233a933bef0d8570e2526bf9e083192d" 397 + 398 + [[package]] 399 + name = "flate2" 400 + version = "1.1.2" 401 + source = "registry+https://github.com/rust-lang/crates.io-index" 402 + checksum = "4a3d7db9596fecd151c5f638c0ee5d5bd487b6e0ea232e5dc96d5250f6f94b1d" 403 + dependencies = [ 404 + "crc32fast", 405 + "miniz_oxide", 406 + ] 387 407 388 408 [[package]] 389 409 name = "flume"
+1
Cargo.toml
··· 8 8 chrono = { version = "0.4.42", features = ["serde"] } 9 9 clap = { version = "4.5.47", features = ["derive", "env"] } 10 10 env_logger = "0.11.8" 11 + flate2 = "1.1.2" 11 12 flume = "0.11.1" 12 13 log = "0.4.28" 13 14 reqwest = "0.12.23"
+156 -9
src/main.rs
··· 1 1 use clap::Parser; 2 2 use serde::Deserialize; 3 + use std::io::Write; 3 4 use std::time::Duration; 4 5 use tokio_postgres::NoTls; 5 6 use url::Url; 6 7 7 - const EXPORT_PAGE_QUEUE_SIZE: usize = 32; 8 + const EXPORT_PAGE_QUEUE_SIZE: usize = 0; // rendezvous for now 8 9 const UPSTREAM_REQUEST_INTERVAL: Duration = Duration::from_millis(500); 10 + const WEEK_IN_SECONDS: u64 = 7 * 86400; 9 11 10 12 #[derive(Parser)] 11 13 struct Args { 12 14 /// Upstream PLC server to mirror 13 15 /// 14 - /// default: plc.directory 16 + /// default: https://plc.directory 15 17 #[arg(long, env)] 16 18 #[clap(default_value = "https://plc.directory")] 17 19 upstream: Url, 20 + /// Bulk export source prefix 21 + /// 22 + /// Must be a prefix for urls ending with {WEEK_TIMESTAMP}.jsonl.gz 23 + /// 24 + /// default: https://plc.t3.storage.dev/plc.directory/ 25 + /// 26 + /// pass "off" to skip fast bulk backfilling 27 + #[arg(long, env)] 28 + #[clap(default_value = "https://plc.t3.storage.dev/plc.directory/")] 29 + upstream_bulk: Url, 30 + /// The oldest available bulk upstream export timestamp 31 + /// 32 + /// Must be a week-truncated unix timestamp 33 + /// 34 + /// plc.directory's oldest week is `1668643200`; you probably don't want to change this. 35 + #[arg(long, env)] 36 + #[clap(default_value = "1668643200")] 37 + bulk_epoch: u64, 18 38 /// Mirror PLC's postgres database 19 39 /// 20 40 /// URI string with credentials etc ··· 49 69 pub operation: &'a serde_json::value::RawValue, 50 70 } 51 71 72 + struct PageForwarder { 73 + newlines: usize, 74 + bytes: Vec<u8>, 75 + dest: flume::Sender<ExportPage>, 76 + } 77 + 78 + impl PageForwarder { 79 + fn new(dest: flume::Sender<ExportPage>) -> Self { 80 + Self { 81 + newlines: 0, 82 + bytes: Vec::new(), 83 + dest, 84 + } 85 + } 86 + fn send_page(&mut self) { 87 + log::info!("sending page!"); 88 + let page_bytes = std::mem::take(&mut self.bytes); 89 + if !page_bytes.is_empty() { 90 + let ops = String::from_utf8(page_bytes) 91 + .unwrap() 92 + .trim() 93 + .replace("}{", "}\n{"); // HACK because oops the exports i made are corrupted 94 + self.dest.send(ExportPage { ops }).unwrap(); 95 + self.newlines = 0; 96 + } 97 + } 98 + } 99 + 100 + impl Write for PageForwarder { 101 + fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { 102 + let mut buf = buf; 103 + loop { 104 + let newlines_to_next_split = 999 - self.newlines; 105 + let Some((i, _)) = buf 106 + .iter() 107 + .enumerate() 108 + .filter(|&(_, &b)| b == b'\n') 109 + .nth(newlines_to_next_split) 110 + else { 111 + // we're left with a partial page 112 + self.bytes.extend_from_slice(buf); 113 + // i guess we need this second pass to update the count 114 + self.newlines += buf.iter().filter(|&&b| b == b'\n').count(); 115 + // could probably do it all in one pass but whatever 116 + break; 117 + }; 118 + // we have one complete page from current bytes + buf[..i] 119 + let (page_rest, rest) = buf.split_at(i); 120 + self.bytes.extend_from_slice(page_rest); 121 + self.send_page(); 122 + buf = rest; 123 + } 124 + 125 + Ok(buf.len()) 126 + } 127 + fn flush(&mut self) -> std::io::Result<()> { 128 + self.send_page(); 129 + Ok(()) 130 + } 131 + } 132 + 133 + async fn bulk_backfill( 134 + client: reqwest::Client, 135 + (upstream, epoch): (Url, u64), 136 + tx: flume::Sender<ExportPage>, 137 + ) { 138 + let immutable_cutoff = std::time::SystemTime::now() - Duration::from_secs((7 + 4) * 86400); 139 + let immutable_ts = (immutable_cutoff.duration_since(std::time::SystemTime::UNIX_EPOCH)) 140 + .unwrap() 141 + .as_secs(); 142 + let immutable_week = (immutable_ts / WEEK_IN_SECONDS) * WEEK_IN_SECONDS; 143 + let mut week = epoch; 144 + let mut week_n = 0; 145 + while week < immutable_week { 146 + log::info!("backfilling week {week_n} ({week})"); 147 + let url = upstream.join(&format!("{week}.jsonl.gz")).unwrap(); 148 + let mut gzipped_chunks = client 149 + .get(url) 150 + .send() 151 + .await 152 + .unwrap() 153 + .error_for_status() 154 + .unwrap(); 155 + 156 + let mut sink = PageForwarder::new(tx.clone()); 157 + let mut decoder = flate2::write::GzDecoder::new(&mut sink); 158 + 159 + while let Some(chunk) = gzipped_chunks.chunk().await.unwrap() { 160 + tokio::task::block_in_place(|| { 161 + let mut chunk = chunk; 162 + while !chunk.is_empty() { 163 + let Ok(n) = decoder 164 + .write(&chunk) 165 + .inspect_err(|e| log::warn!("wat: {e}")) 166 + else { 167 + panic!("can't feed bytes to the decoder :/"); 168 + }; 169 + if n == 0 { 170 + panic!("apparently we can't write"); 171 + } 172 + chunk = chunk.split_off(n); 173 + } 174 + }); 175 + } 176 + decoder.flush().unwrap(); 177 + 178 + week_n += 1; 179 + week += WEEK_IN_SECONDS; 180 + } 181 + } 182 + 52 183 async fn export_upstream( 53 184 upstream: Url, 185 + bulk: (Url, u64), 54 186 tx: flume::Sender<ExportPage>, 55 187 latest: Option<chrono::DateTime<chrono::Utc>>, 56 188 ) { 57 - let mut upstream = upstream; 58 - upstream.set_path("/export"); 59 - let mut after = latest; 60 - let mut tick = tokio::time::interval(UPSTREAM_REQUEST_INTERVAL); 61 189 let client = reqwest::Client::builder() 62 190 .user_agent(concat!( 63 191 "allegedly v", 64 192 env!("CARGO_PKG_VERSION"), 65 193 " (part of @microcosm.blue; contact @bad-example.com)" 66 194 )) 67 - .timeout(Duration::from_secs(4)) 195 + .timeout(Duration::from_secs(10)) 68 196 .build() 69 197 .unwrap(); 70 198 199 + if latest.is_none() { 200 + bulk_backfill(client.clone(), bulk, tx.clone()).await; 201 + } 202 + 203 + let mut upstream = upstream; 204 + upstream.set_path("/export"); 205 + let mut after = latest; 206 + let mut tick = tokio::time::interval(UPSTREAM_REQUEST_INTERVAL); 207 + 71 208 loop { 72 209 tick.tick().await; 73 210 let mut url = upstream.clone(); ··· 133 270 134 271 log::info!("setting up inserts..."); 135 272 for op_line in page.ops.lines() { 136 - let op: Op = serde_json::from_str(op_line).unwrap(); 273 + let Ok(op) = serde_json::from_str::<Op>(op_line) 274 + .inspect_err(|e| log::error!("failing! at the {op_line}! {e}")) 275 + else { 276 + log::error!("ayeeeee just ignoring this error for now......"); 277 + continue; 278 + }; 137 279 let client = &tx; 138 280 139 281 client.execute(upsert_did, &[&op.did]).await.unwrap(); ··· 203 345 204 346 let (tx, rx) = flume::bounded::<ExportPage>(EXPORT_PAGE_QUEUE_SIZE); 205 347 206 - let export_task = tokio::task::spawn(export_upstream(args.upstream, tx, latest)); 348 + let export_task = tokio::task::spawn(export_upstream( 349 + args.upstream, 350 + (args.upstream_bulk, args.bulk_epoch), 351 + tx, 352 + latest, 353 + )); 207 354 let writer_task = tokio::task::spawn(write_pages(rx, pg_client)); 208 355 209 356 tokio::select! {