Server tools to backfill, tail, mirror, and verify PLC logs
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fjall: implement /export/stream

dawn a21adcf9 078d4829

+239 -28
+58 -2
Cargo.lock
··· 60 60 "tokio", 61 61 "tokio-postgres", 62 62 "tokio-stream", 63 - "tokio-tungstenite", 63 + "tokio-tungstenite 0.26.2", 64 64 "tokio-util", 65 65 "tracing", 66 66 "tracing-opentelemetry", ··· 1102 1102 "memchr", 1103 1103 "pin-project-lite", 1104 1104 "slab", 1105 + ] 1106 + 1107 + [[package]] 1108 + name = "futures_codec" 1109 + version = "0.5.0" 1110 + source = "registry+https://github.com/rust-lang/crates.io-index" 1111 + checksum = "ad004dd81205978a2bba6c566ed70535ccf88c0be34649e628186474603f43ca" 1112 + dependencies = [ 1113 + "bytes", 1114 + "futures-sink", 1115 + "futures-util", 1116 + "memchr", 1117 + "pin-project-lite", 1105 1118 ] 1106 1119 1107 1120 [[package]] ··· 2329 2342 "serde_json", 2330 2343 "serde_urlencoded", 2331 2344 "smallvec", 2345 + "sse-codec", 2332 2346 "sync_wrapper", 2333 2347 "thiserror 2.0.18", 2334 2348 "tokio", 2335 2349 "tokio-rustls", 2350 + "tokio-stream", 2351 + "tokio-tungstenite 0.27.0", 2336 2352 "tokio-util", 2337 2353 "tracing", 2338 2354 "wildmatch", ··· 3253 3269 ] 3254 3270 3255 3271 [[package]] 3272 + name = "sse-codec" 3273 + version = "0.3.3" 3274 + source = "registry+https://github.com/rust-lang/crates.io-index" 3275 + checksum = "3a395a858c7ff5c4b42aeab0501e07c978ac5e1ae5059f301884dab3fa405f47" 3276 + dependencies = [ 3277 + "futures-io", 3278 + "futures_codec", 3279 + "memchr", 3280 + ] 3281 + 3282 + [[package]] 3256 3283 name = "stable_deref_trait" 3257 3284 version = "1.2.1" 3258 3285 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3557 3584 "native-tls", 3558 3585 "tokio", 3559 3586 "tokio-native-tls", 3560 - "tungstenite", 3587 + "tungstenite 0.26.2", 3588 + ] 3589 + 3590 + [[package]] 3591 + name = "tokio-tungstenite" 3592 + version = "0.27.0" 3593 + source = "registry+https://github.com/rust-lang/crates.io-index" 3594 + checksum = "489a59b6730eda1b0171fcfda8b121f4bee2b35cba8645ca35c5f7ba3eb736c1" 3595 + dependencies = [ 3596 + "futures-util", 3597 + "log", 3598 + "tokio", 3599 + "tungstenite 0.27.0", 3561 3600 ] 3562 3601 3563 3602 [[package]] ··· 3772 3811 "httparse", 3773 3812 "log", 3774 3813 "native-tls", 3814 + "rand 0.9.2", 3815 + "sha1", 3816 + "thiserror 2.0.18", 3817 + "utf-8", 3818 + ] 3819 + 3820 + [[package]] 3821 + name = "tungstenite" 3822 + version = "0.27.0" 3823 + source = "registry+https://github.com/rust-lang/crates.io-index" 3824 + checksum = "eadc29d668c91fcc564941132e17b28a7ceb2f3ebf0b9dae3e03fd7a6748eb0d" 3825 + dependencies = [ 3826 + "bytes", 3827 + "data-encoding", 3828 + "http", 3829 + "httparse", 3830 + "log", 3775 3831 "rand 0.9.2", 3776 3832 "sha1", 3777 3833 "thiserror 2.0.18",
+1 -1
Cargo.toml
··· 27 27 opentelemetry = "0.30.0" 28 28 opentelemetry-otlp = { version = "0.30.0" } 29 29 opentelemetry_sdk = { version = "0.30.0", features = ["rt-tokio"] } 30 - poem = { version = "3.1.12", features = ["acme", "compression"] } 30 + poem = { version = "3.1.12", features = ["acme", "compression", "test", "websocket"] } 31 31 postgres-native-tls = "0.5.1" 32 32 reqwest = { version = "0.12.23", features = ["stream", "json", "gzip"] } 33 33 reqwest-middleware = "0.4.2"
+128 -11
src/mirror/fjall.rs
··· 1 + use std::sync::Arc; 2 + 1 3 use super::*; 2 - use futures::StreamExt; 4 + use futures::{SinkExt as _, StreamExt as _}; 5 + use poem::IntoResponse; 3 6 use poem::web::Query; 4 7 use serde::Deserialize; 8 + 9 + async fn spawn_blocking<F, R>(f: F) -> poem::Result<R> 10 + where 11 + R: Send + 'static, 12 + F: FnOnce() -> anyhow::Result<R> + Send + 'static, 13 + { 14 + tokio::task::spawn_blocking(f) 15 + .await 16 + .map_err(anyhow::Error::from) 17 + .flatten() 18 + .map_err(|e| Error::from_string(e.to_string(), StatusCode::INTERNAL_SERVER_ERROR)) 19 + } 5 20 6 21 #[derive(Clone)] 7 22 struct FjallState { ··· 145 160 146 161 let did = did.to_string(); 147 162 let db = state.fjall.clone(); 148 - let ops = tokio::task::spawn_blocking(move || { 163 + let ops = spawn_blocking(move || { 149 164 let iter = db.ops_for_did(&did)?; 150 165 iter.collect::<anyhow::Result<Vec<_>>>() 151 166 }) 152 - .await 153 - .map_err(|e| Error::from_string(e.to_string(), StatusCode::INTERNAL_SERVER_ERROR))? 154 - .map_err(|e| Error::from_string(e.to_string(), StatusCode::INTERNAL_SERVER_ERROR))?; 167 + .await?; 155 168 156 169 if ops.is_empty() { 157 170 return Err(Error::from_string( ··· 256 269 } 257 270 258 271 #[handler] 259 - async fn fjall_export( 272 + async fn export( 260 273 _req: &Request, 261 274 Query(query): Query<ExportQuery>, 262 275 Data(FjallState { fjall, .. }): Data<&FjallState>, ··· 265 278 let limit = 1000; 266 279 let db = fjall.clone(); 267 280 268 - let ops = tokio::task::spawn_blocking(move || { 281 + let ops = spawn_blocking(move || { 269 282 let iter = db.export_ops(after..)?; 270 283 iter.take(limit).collect::<anyhow::Result<Vec<_>>>() 271 284 }) 272 - .await 273 - .map_err(|e| Error::from_string(e.to_string(), StatusCode::INTERNAL_SERVER_ERROR))? 274 - .map_err(|e| Error::from_string(e.to_string(), StatusCode::INTERNAL_SERVER_ERROR))?; 285 + .await?; 275 286 276 287 let stream = futures::stream::iter(ops).map(|op| { 277 288 let mut json = serde_json::to_string(&op).unwrap(); ··· 282 293 Ok(Body::from_bytes_stream(stream)) 283 294 } 284 295 296 + #[derive(Deserialize)] 297 + struct StreamQuery { 298 + cursor: Option<u64>, 299 + } 300 + 301 + #[handler] 302 + async fn export_stream( 303 + _req: &Request, 304 + Query(query): Query<StreamQuery>, 305 + ws: poem::web::websocket::WebSocket, 306 + Data(FjallState { fjall, .. }): Data<&FjallState>, 307 + ) -> poem::Result<impl IntoResponse> { 308 + use poem::web::websocket::Message; 309 + use tokio::sync::Notify; 310 + 311 + let db = fjall.clone(); 312 + 313 + let latest_cursor = spawn_blocking({ 314 + let db = db.clone(); 315 + move || db.get_latest().map(|res| res.map(|(c, _)| c)) 316 + }) 317 + .await? 318 + .unwrap_or(0); 319 + 320 + let mut cursor = match query.cursor { 321 + Some(cursor) => { 322 + if cursor > latest_cursor { 323 + return Err(Error::from_string( 324 + format!("cursor {cursor} is in the future"), 325 + StatusCode::BAD_REQUEST, 326 + )); 327 + } 328 + 329 + let created_at = spawn_blocking({ 330 + let db = db.clone(); 331 + move || { 332 + db.get_op_at_or_after(cursor) 333 + .map(|res| res.map(|op| op.created_at)) 334 + } 335 + }) 336 + .await?; 337 + 338 + match created_at { 339 + Some(created_at) => { 340 + // check that the provided cursor is not stale 341 + if (chrono::Utc::now() - created_at).num_days() > 1 { 342 + return Err(Error::from_string( 343 + format!("cursor {cursor} is stale, catch up using /export first"), 344 + StatusCode::BAD_REQUEST, 345 + )); 346 + } 347 + cursor 348 + } 349 + None => latest_cursor, 350 + } 351 + } 352 + None => { 353 + // if cursor is not provided, start at the latest op 354 + latest_cursor 355 + } 356 + }; 357 + 358 + Ok(ws.on_upgrade(move |mut socket| async move { 359 + let errored = Arc::new(Notify::new()); 360 + 361 + loop { 362 + let (tx, mut op_rx) = tokio::sync::mpsc::channel(64); 363 + 364 + tokio::task::spawn_blocking({ 365 + let db = db.clone(); 366 + let errored = errored.clone(); 367 + move || { 368 + let iter = match db.export_ops(cursor..) { 369 + Ok(it) => it, 370 + Err(e) => { 371 + log::error!("read failed: {e}"); 372 + errored.notify_one(); 373 + return; 374 + } 375 + }; 376 + for op in iter.flatten() { 377 + if tx.blocking_send(op).is_err() { 378 + return; 379 + } 380 + } 381 + } 382 + }); 383 + 384 + while let Some(op) = op_rx.recv().await { 385 + cursor = op.seq; 386 + let json = serde_json::to_string(&op).unwrap(); 387 + if let Err(e) = socket.send(Message::Text(json)).await { 388 + log::warn!("closing export stream: {e}"); 389 + return; 390 + } 391 + } 392 + 393 + tokio::select! { 394 + _ = db.subscribe() => {}, 395 + _ = errored.notified() => return, 396 + } 397 + } 398 + })) 399 + } 400 + 285 401 #[handler] 286 402 async fn fjall_nope(Data(FjallState { upstream, .. }): Data<&FjallState>) -> (StatusCode, String) { 287 403 ( ··· 334 450 .at("/", get(fjall_hello)) 335 451 .at("/favicon.ico", get(favicon)) 336 452 .at("/_health", get(fjall_health)) 337 - .at("/export", get(fjall_export)); 453 + .at("/export", get(export)) 454 + .at("/export/stream", get(export_stream)); 338 455 339 456 if experimental.write_upstream { 340 457 log::info!("enabling experimental write forwarding to upstream");
+52 -14
src/plc_fjall.rs
··· 12 12 use std::path::Path; 13 13 use std::sync::Arc; 14 14 use std::time::Instant; 15 - use tokio::sync::{mpsc, oneshot}; 15 + use tokio::sync::{Notify, futures::Notified, mpsc, oneshot}; 16 16 17 17 const SEP: u8 = 0; 18 18 ··· 829 829 } 830 830 831 831 // we have our own Op struct for fjall since we dont want to have to convert Value back to RawValue 832 - #[derive(Debug, Serialize)] 832 + #[derive(Debug, Serialize, Deserialize, Clone)] 833 833 pub struct Op { 834 + pub seq: u64, 834 835 pub did: String, 835 836 pub cid: String, 836 837 pub created_at: Dt, ··· 849 850 ops: Keyspace, 850 851 /// secondary index: [encoded_did][SEP][seq_varint] -> [] 851 852 by_did: Keyspace, 853 + notify_stream: Notify, 852 854 } 853 855 854 856 impl FjallDb { ··· 880 882 let by_did = db.keyspace("by_did", || { 881 883 opts() 882 884 .max_memtable_size(mb(64)) 883 - // this isn't gonna compress well anyway, since its just keys (did + timestamp + cid) 885 + // this isn't gonna compress well anyway, since its just keys (did + seq) 884 886 // and dids dont have many operations in the first place, so we can use small blocks 885 887 .data_block_size_policy(BlockSizePolicy::all(kb(2))) 886 888 })?; 887 889 Ok(Self { 888 - inner: Arc::new(FjallInner { db, ops, by_did }), 890 + inner: Arc::new(FjallInner { 891 + db, 892 + ops, 893 + by_did, 894 + notify_stream: Notify::new(), 895 + }), 889 896 }) 890 897 } 891 898 ··· 905 912 Ok(()) 906 913 } 907 914 915 + pub fn subscribe(&self) -> Notified<'_> { 916 + self.inner.notify_stream.notified() 917 + } 918 + 908 919 /// Returns `(seq, created_at)` for the last stored op, or `None` if empty. 909 920 pub fn get_latest(&self) -> anyhow::Result<Option<(u64, Dt)>> { 910 921 let Some(guard) = self.inner.ops.last_key_value() else { ··· 950 961 self._ops_for_did(&op.did) 951 962 .map(|ops| { 952 963 ops.rev() 953 - .find(|r| r.as_ref().map_or(true, |(_, cid, _)| cid == prev_cid)) 964 + .find(|r| r.as_ref().map_or(true, |(_, _, cid, _)| cid == prev_cid)) 954 965 .transpose() 955 966 }) 956 967 .flatten() ··· 958 969 .transpose()? 959 970 .flatten(); 960 971 961 - let prev_stored = prev_op.as_ref().map(|(_, _, p)| &p.operation); 972 + let prev_stored = prev_op.as_ref().map(|(_, _, _, p)| &p.operation); 962 973 963 974 match verify_op_sig(&operation, prev_stored) { 964 975 Ok(results) => { ··· 1002 1013 batch.insert(&self.inner.by_did, by_did_key_bytes, &[]); 1003 1014 batch.commit()?; 1004 1015 1016 + self.inner.notify_stream.notify_waiters(); 1017 + 1005 1018 Ok(1) 1006 1019 } 1007 - } 1008 1020 1009 - impl FjallDb { 1021 + pub(crate) fn get_op_at_or_after(&self, seq: u64) -> anyhow::Result<Option<Op>> { 1022 + self.inner 1023 + .ops 1024 + .range(seq_key(seq)..) 1025 + .next() 1026 + .map(|v| { 1027 + rmp_serde::from_slice::<DbOp>(&v.value()?) 1028 + .context("failed to decode op") 1029 + .map(|op| { 1030 + Ok(Op { 1031 + seq, 1032 + did: decode_did(&op.did), 1033 + cid: decode_cid(&op.cid)?, 1034 + created_at: Dt::from_timestamp_micros(op.created_at as i64) 1035 + .ok_or_else(|| anyhow::anyhow!("invalid created_at in op"))?, 1036 + nullified: op.nullified, 1037 + operation: op.operation.to_json_value(), 1038 + }) 1039 + }) 1040 + .flatten() 1041 + }) 1042 + .transpose() 1043 + } 1044 + 1010 1045 /// Decode a `by_did` entry: extract the seq from the key suffix, then 1011 1046 /// look up the full `DbOp` in the `ops` keyspace. 1012 1047 fn decode_by_did_entry( 1013 1048 &self, 1014 1049 by_did_key_bytes: &[u8], 1015 1050 prefix_len: usize, 1016 - ) -> anyhow::Result<(Dt, PlcCid, DbOp)> { 1051 + ) -> anyhow::Result<(u64, Dt, PlcCid, DbOp)> { 1017 1052 let key_suffix = by_did_key_bytes 1018 1053 .get(prefix_len..) 1019 1054 .ok_or_else(|| anyhow::anyhow!("invalid by_did key {by_did_key_bytes:?}"))?; ··· 1032 1067 .ok_or_else(|| anyhow::anyhow!("invalid created_at_micros {}", op.created_at))?; 1033 1068 let cid = PlcCid(op.cid.clone()); 1034 1069 1035 - Ok((ts, cid, op)) 1070 + Ok((seq, ts, cid, op)) 1036 1071 } 1037 1072 1038 1073 fn _ops_for_did( 1039 1074 &self, 1040 1075 did: &str, 1041 - ) -> anyhow::Result<impl DoubleEndedIterator<Item = anyhow::Result<(Dt, PlcCid, DbOp)>> + '_> 1076 + ) -> anyhow::Result<impl DoubleEndedIterator<Item = anyhow::Result<(u64, Dt, PlcCid, DbOp)>> + '_> 1042 1077 { 1043 1078 let prefix = by_did_prefix(did)?; 1044 1079 ··· 1055 1090 did: &str, 1056 1091 ) -> anyhow::Result<impl DoubleEndedIterator<Item = anyhow::Result<Op>> + '_> { 1057 1092 Ok(self._ops_for_did(did)?.map(|res| { 1058 - let (ts, cid, op) = res?; 1093 + let (seq, ts, cid, op) = res?; 1059 1094 let cid = decode_cid(&cid.0)?; 1060 1095 let did = decode_did(&op.did); 1061 1096 Ok(Op { 1097 + seq, 1062 1098 did, 1063 1099 cid, 1064 1100 created_at: ts, ··· 1087 1123 .ops 1088 1124 .range(range) 1089 1125 .map(|item| -> anyhow::Result<Op> { 1090 - let (_, value) = item 1126 + let (key, value) = item 1091 1127 .into_inner() 1092 1128 .map_err(|e: fjall::Error| anyhow::anyhow!("fjall read error: {e}"))?; 1129 + let seq = decode_seq_key(&key)?; 1093 1130 let db_op: DbOp = rmp_serde::from_slice(&value)?; 1094 1131 let created_at = 1095 1132 Dt::from_timestamp_micros(db_op.created_at as i64).ok_or_else(|| { ··· 1098 1135 let cid = decode_cid(&db_op.cid)?; 1099 1136 let did = decode_did(&db_op.did); 1100 1137 Ok(Op { 1138 + seq, 1101 1139 did, 1102 1140 cid, 1103 1141 created_at, ··· 1257 1295 current_prefix = Some(prefix_array); 1258 1296 } 1259 1297 1260 - did_ops.push(op); 1298 + did_ops.push((op.1, op.2, op.3)); 1261 1299 processed_ops += 1; 1262 1300 } 1263 1301