Server tools to backfill, tail, mirror, and verify PLC logs
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

backfill that pretty much works

phil 11d75ac7 b5bc7929

+165 -117
+83 -1
Cargo.lock
··· 31 31 version = "0.1.0" 32 32 dependencies = [ 33 33 "anyhow", 34 + "async-compression", 34 35 "chrono", 35 36 "clap", 36 37 "env_logger", 37 - "flate2", 38 38 "flume", 39 + "futures", 39 40 "log", 40 41 "reqwest", 41 42 "serde", ··· 109 110 version = "1.0.99" 110 111 source = "registry+https://github.com/rust-lang/crates.io-index" 111 112 checksum = "b0674a1ddeecb70197781e945de4b3b8ffb61fa939a5597bcf48503737663100" 113 + 114 + [[package]] 115 + name = "async-compression" 116 + version = "0.4.30" 117 + source = "registry+https://github.com/rust-lang/crates.io-index" 118 + checksum = "977eb15ea9efd848bb8a4a1a2500347ed7f0bf794edf0dc3ddcf439f43d36b23" 119 + dependencies = [ 120 + "compression-codecs", 121 + "compression-core", 122 + "futures-core", 123 + "futures-io", 124 + "pin-project-lite", 125 + ] 112 126 113 127 [[package]] 114 128 name = "async-trait" ··· 264 278 checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" 265 279 266 280 [[package]] 281 + name = "compression-codecs" 282 + version = "0.4.30" 283 + source = "registry+https://github.com/rust-lang/crates.io-index" 284 + checksum = "485abf41ac0c8047c07c87c72c8fb3eb5197f6e9d7ded615dfd1a00ae00a0f64" 285 + dependencies = [ 286 + "compression-core", 287 + "flate2", 288 + "memchr", 289 + ] 290 + 291 + [[package]] 292 + name = "compression-core" 293 + version = "0.4.29" 294 + source = "registry+https://github.com/rust-lang/crates.io-index" 295 + checksum = "e47641d3deaf41fb1538ac1f54735925e275eaf3bf4d55c81b137fba797e5cbb" 296 + 297 + [[package]] 267 298 name = "core-foundation" 268 299 version = "0.9.4" 269 300 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 448 479 ] 449 480 450 481 [[package]] 482 + name = "futures" 483 + version = "0.3.31" 484 + source = "registry+https://github.com/rust-lang/crates.io-index" 485 + checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" 486 + dependencies = [ 487 + "futures-channel", 488 + "futures-core", 489 + "futures-executor", 490 + "futures-io", 491 + "futures-sink", 492 + "futures-task", 493 + "futures-util", 494 + ] 495 + 496 + [[package]] 451 497 name = "futures-channel" 452 498 version = "0.3.31" 453 499 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 464 510 checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" 465 511 466 512 [[package]] 513 + name = "futures-executor" 514 + version = "0.3.31" 515 + source = "registry+https://github.com/rust-lang/crates.io-index" 516 + checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" 517 + dependencies = [ 518 + "futures-core", 519 + "futures-task", 520 + "futures-util", 521 + ] 522 + 523 + [[package]] 524 + name = "futures-io" 525 + version = "0.3.31" 526 + source = "registry+https://github.com/rust-lang/crates.io-index" 527 + checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" 528 + 529 + [[package]] 467 530 name = "futures-macro" 468 531 version = "0.3.31" 469 532 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 492 555 source = "registry+https://github.com/rust-lang/crates.io-index" 493 556 checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" 494 557 dependencies = [ 558 + "futures-channel", 495 559 "futures-core", 560 + "futures-io", 496 561 "futures-macro", 497 562 "futures-sink", 498 563 "futures-task", 564 + "memchr", 499 565 "pin-project-lite", 500 566 "pin-utils", 501 567 "slab", ··· 1334 1400 "bytes", 1335 1401 "encoding_rs", 1336 1402 "futures-core", 1403 + "futures-util", 1337 1404 "h2", 1338 1405 "http", 1339 1406 "http-body", ··· 1355 1422 "sync_wrapper", 1356 1423 "tokio", 1357 1424 "tokio-native-tls", 1425 + "tokio-util", 1358 1426 "tower", 1359 1427 "tower-http", 1360 1428 "tower-service", 1361 1429 "url", 1362 1430 "wasm-bindgen", 1363 1431 "wasm-bindgen-futures", 1432 + "wasm-streams", 1364 1433 "web-sys", 1365 1434 ] 1366 1435 ··· 2060 2129 checksum = "f143854a3b13752c6950862c906306adb27c7e839f7414cec8fea35beab624c1" 2061 2130 dependencies = [ 2062 2131 "unicode-ident", 2132 + ] 2133 + 2134 + [[package]] 2135 + name = "wasm-streams" 2136 + version = "0.4.2" 2137 + source = "registry+https://github.com/rust-lang/crates.io-index" 2138 + checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65" 2139 + dependencies = [ 2140 + "futures-util", 2141 + "js-sys", 2142 + "wasm-bindgen", 2143 + "wasm-bindgen-futures", 2144 + "web-sys", 2063 2145 ] 2064 2146 2065 2147 [[package]]
+4 -2
Cargo.toml
··· 2 2 name = "allegedly" 3 3 version = "0.1.0" 4 4 edition = "2024" 5 + default-run = "main" 5 6 6 7 [dependencies] 7 8 anyhow = "1.0.99" 9 + async-compression = { version = "0.4.30", features = ["futures-io", "gzip"] } 8 10 chrono = { version = "0.4.42", features = ["serde"] } 9 11 clap = { version = "4.5.47", features = ["derive", "env"] } 10 12 env_logger = "0.11.8" 11 - flate2 = "1.1.2" 12 13 flume = "0.11.1" 14 + futures = "0.3.31" 13 15 log = "0.4.28" 14 - reqwest = "0.12.23" 16 + reqwest = { version = "0.12.23", features = ["stream"] } 15 17 serde = "1.0.219" 16 18 serde_json = { version = "1.0.143", features = ["raw_value"] } 17 19 tokio = { version = "1.47.1", features = ["full"] }
+23 -68
src/backfill.rs
··· 1 1 use crate::ExportPage; 2 - use std::io::Write; 2 + use url::Url; 3 3 4 - pub struct PageForwarder<const N: usize> { 5 - newlines: usize, 6 - bytes: Vec<u8>, 4 + use async_compression::futures::bufread::GzipDecoder; 5 + use futures::{AsyncBufReadExt, StreamExt, TryStreamExt, io}; 6 + 7 + pub async fn week_to_pages( 8 + client: &reqwest::Client, 9 + url: Url, 7 10 dest: flume::Sender<ExportPage>, 8 - } 11 + ) -> anyhow::Result<()> { 12 + let reader = client 13 + .get(url) 14 + .send() 15 + .await? 16 + .error_for_status()? 17 + .bytes_stream() 18 + .map_err(io::Error::other) 19 + .into_async_read(); 9 20 10 - impl<const N: usize> PageForwarder<N> { 11 - pub fn new(dest: flume::Sender<ExportPage>) -> Self { 12 - Self { 13 - newlines: 0, 14 - bytes: Vec::new(), 15 - dest, 16 - } 17 - } 18 - fn send_page(&mut self) { 19 - log::info!("sending page!"); 20 - let page_bytes = std::mem::take(&mut self.bytes); 21 - if !page_bytes.is_empty() { 22 - let ops = String::from_utf8(page_bytes) 23 - .unwrap() 24 - .trim() 25 - .replace("}{", "}\n{"); // HACK because oops the exports i made are corrupted 26 - self.dest.send(ExportPage { ops }).unwrap(); 27 - self.newlines = 0; 28 - } 29 - } 30 - } 21 + let decoder = GzipDecoder::new(io::BufReader::new(reader)); 31 22 32 - impl<const N: usize> Write for PageForwarder<N> { 33 - fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { 34 - let mut buf = buf; 35 - loop { 36 - let newlines_to_next_split = N - 1 - self.newlines; 37 - let Some((i, _)) = buf 38 - .iter() 39 - .enumerate() 40 - .filter(|&(_, &b)| b == b'\n') 41 - .nth(newlines_to_next_split) 42 - else { 43 - // we're left with a partial page 44 - self.bytes.extend_from_slice(buf); 45 - // i guess we need this second pass to update the count 46 - self.newlines += buf.iter().filter(|&&b| b == b'\n').count(); 47 - // could probably do it all in one pass but whatever 48 - break; 49 - }; 50 - // we have one complete page from current bytes + buf[..i] 51 - let (page_rest, rest) = buf.split_at(i); 52 - self.bytes.extend_from_slice(page_rest); 53 - self.send_page(); 54 - buf = rest; 55 - } 23 + let mut chunks = io::BufReader::new(decoder).lines().chunks(1000); 56 24 57 - Ok(buf.len()) 58 - } 59 - fn flush(&mut self) -> std::io::Result<()> { 60 - self.send_page(); 61 - Ok(()) 62 - } 63 - } 64 - 65 - #[cfg(test)] 66 - mod test { 67 - use super::*; 68 - 69 - #[test] 70 - fn test_page_forwarder_empty_flush() { 71 - let (tx, rx) = flume::bounded(1); 72 - let mut pf = PageForwarder::<1>::new(tx); 73 - pf.flush().unwrap(); 74 - assert!(rx.is_empty()); 25 + while let Some(chunk) = chunks.next().await { 26 + let ops = chunk.into_iter().collect::<Result<Vec<_>, io::Error>>()?; 27 + let page = ExportPage { ops }; 28 + dest.send_async(page).await?; 75 29 } 30 + Ok(()) 76 31 }
+33
src/bin/blah.rs
··· 1 + use async_compression::futures::bufread::GzipDecoder; 2 + use futures::{AsyncBufReadExt, StreamExt, TryStreamExt, io}; 3 + 4 + #[tokio::main] 5 + async fn main() { 6 + let client = reqwest::Client::builder() 7 + .user_agent(concat!( 8 + "allegedly (blah) v", 9 + env!("CARGO_PKG_VERSION"), 10 + " (from @microcosm.blue; contact @bad-example.com)" 11 + )) 12 + .build() 13 + .unwrap(); 14 + 15 + let reader = client 16 + .get("https://plc.t3.storage.dev/plc.directory/1699488000.jsonl.gz") 17 + // .get("https://plc.t3.storage.dev/plc.directory/1669248000.jsonl.gz") 18 + .send() 19 + .await 20 + .unwrap() 21 + .error_for_status() 22 + .unwrap() 23 + .bytes_stream() 24 + .map_err(io::Error::other) 25 + .into_async_read(); 26 + 27 + let decoder = GzipDecoder::new(io::BufReader::new(reader)); 28 + let mut chunks = io::BufReader::new(decoder).lines().chunks(1000); 29 + while let Some(ref _chunk) = chunks.next().await { 30 + print!("."); 31 + } 32 + println!(); 33 + }
+3 -6
src/lib.rs
··· 1 1 mod backfill; 2 2 3 - pub use backfill::PageForwarder; 3 + pub use backfill::week_to_pages; 4 4 5 5 /// One page of PLC export 6 6 /// 7 - /// should have maximum length of 1000 lines. 8 - /// A bulk export consumer should chunk ops into pages of max 1000 ops. 9 - /// 10 - /// leading and trailing whitespace should be trimmed. 7 + /// Not limited, but expected to have up to about 1000 lines 11 8 pub struct ExportPage { 12 - pub ops: String, 9 + pub ops: Vec<String>, 13 10 }
+19 -40
src/main.rs src/bin/main.rs
··· 1 1 use clap::Parser; 2 2 use serde::Deserialize; 3 - use std::io::Write; 4 3 use std::time::Duration; 5 4 use tokio_postgres::NoTls; 6 5 use url::Url; 7 6 8 - use allegedly::{ExportPage, PageForwarder}; 7 + use allegedly::{ExportPage, week_to_pages}; 9 8 10 9 const EXPORT_PAGE_QUEUE_SIZE: usize = 0; // rendezvous for now 11 10 const UPSTREAM_REQUEST_INTERVAL: Duration = Duration::from_millis(500); ··· 76 75 while week < immutable_week { 77 76 log::info!("backfilling week {week_n} ({week})"); 78 77 let url = upstream.join(&format!("{week}.jsonl.gz")).unwrap(); 79 - let mut gzipped_chunks = client 80 - .get(url) 81 - .send() 82 - .await 83 - .unwrap() 84 - .error_for_status() 85 - .unwrap(); 86 - 87 - let mut sink = PageForwarder::<1000>::new(tx.clone()); 88 - let mut decoder = flate2::write::GzDecoder::new(&mut sink); 89 - 90 - while let Some(chunk) = gzipped_chunks.chunk().await.unwrap() { 91 - tokio::task::block_in_place(|| { 92 - let mut chunk = chunk; 93 - while !chunk.is_empty() { 94 - let Ok(n) = decoder 95 - .write(&chunk) 96 - .inspect_err(|e| log::warn!("wat: {e}")) 97 - else { 98 - panic!("can't feed bytes to the decoder :/"); 99 - }; 100 - if n == 0 { 101 - panic!("apparently we can't write"); 102 - } 103 - chunk = chunk.split_off(n); 104 - } 105 - }); 106 - } 107 - decoder.flush().unwrap(); 108 - 78 + week_to_pages(&client, url, tx.clone()).await.unwrap(); 109 79 week_n += 1; 110 80 week += WEEK_IN_SECONDS; 111 81 } ··· 121 91 .user_agent(concat!( 122 92 "allegedly v", 123 93 env!("CARGO_PKG_VERSION"), 124 - " (part of @microcosm.blue; contact @bad-example.com)" 94 + " (from @microcosm.blue; contact @bad-example.com)" 125 95 )) 126 - .timeout(Duration::from_secs(10)) 127 96 .build() 128 97 .unwrap(); 129 98 ··· 165 134 after = Some(op.created_at); 166 135 167 136 log::trace!("got some ops until {after:?}, sending them..."); 137 + let ops = ops.split('\n').map(Into::into).collect(); 168 138 tx.send_async(ExportPage { ops }).await.unwrap(); 169 139 } 170 140 } ··· 192 162 .unwrap(); 193 163 194 164 while let Ok(page) = rx.recv_async().await { 195 - log::info!("got a page..."); 165 + log::trace!("got a page..."); 196 166 197 167 let mut tx = pg_client.transaction().await.unwrap(); 198 168 199 169 // TODO: probably figure out postgres COPY IN 200 170 // for now just write everything into a transaction 201 171 202 - log::info!("setting up inserts..."); 203 - for op_line in page.ops.lines() { 204 - let Ok(op) = serde_json::from_str::<Op>(op_line) 172 + log::trace!("setting up inserts..."); 173 + for op_line in page 174 + .ops 175 + .into_iter() 176 + .flat_map(|s| { 177 + s.replace("}{", "}\n{") 178 + .split('\n') 179 + .map(|s| s.trim()) 180 + .map(Into::into) 181 + .collect::<Vec<String>>() 182 + }) 183 + .filter(|s| !s.is_empty()) 184 + { 185 + let Ok(op) = serde_json::from_str::<Op>(&op_line) 205 186 .inspect_err(|e| log::error!("failing! at the {op_line}! {e}")) 206 187 else { 207 188 log::error!("ayeeeee just ignoring this error for now......"); ··· 236 217 } 237 218 238 219 tx.commit().await.unwrap(); 239 - 240 - log::info!("hi from writer! (done page)"); 241 220 } 242 221 Ok(()) 243 222 }