Server tools to backfill, tail, mirror, and verify PLC logs
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fjall: return ops with type: sequenced_op

dawn b6cc9fdb a21adcf9

+22 -6
+6 -4
src/mirror/fjall.rs
··· 285 285 .await?; 286 286 287 287 let stream = futures::stream::iter(ops).map(|op| { 288 - let mut json = serde_json::to_string(&op).unwrap(); 288 + let mut json = serde_json::to_string(&op.to_sequenced_json()).unwrap(); 289 289 json.push('\n'); 290 290 Ok::<_, std::io::Error>(json) 291 291 }); ··· 340 340 // check that the provided cursor is not stale 341 341 if (chrono::Utc::now() - created_at).num_days() > 1 { 342 342 return Err(Error::from_string( 343 - format!("cursor {cursor} is stale, catch up using /export first"), 343 + format!( 344 + "cursor {cursor} is stale (older than a day), catch up using /export first" 345 + ), 344 346 StatusCode::BAD_REQUEST, 345 347 )); 346 348 } ··· 382 384 }); 383 385 384 386 while let Some(op) = op_rx.recv().await { 385 - cursor = op.seq; 386 - let json = serde_json::to_string(&op).unwrap(); 387 + let json = serde_json::to_string(&op.to_sequenced_json()).unwrap(); 387 388 if let Err(e) = socket.send(Message::Text(json)).await { 388 389 log::warn!("closing export stream: {e}"); 389 390 return; 390 391 } 392 + cursor = op.seq; 391 393 } 392 394 393 395 tokio::select! {
+16 -2
src/plc_fjall.rs
··· 834 834 pub seq: u64, 835 835 pub did: String, 836 836 pub cid: String, 837 + #[serde(rename = "createdAt")] 837 838 pub created_at: Dt, 838 839 pub nullified: bool, 839 840 pub operation: serde_json::Value, 841 + } 842 + 843 + impl Op { 844 + /// adds the `type` field to the op 845 + pub fn to_sequenced_json(&self) -> serde_json::Value { 846 + let mut val = serde_json::to_value(self).expect("Op is serializable"); 847 + if let serde_json::Value::Object(ref mut map) = val { 848 + map.insert("type".to_string(), "sequenced_op".into()); 849 + } 850 + val 851 + } 840 852 } 841 853 842 854 #[derive(Clone)] ··· 1313 1325 1314 1326 // we can start two threads, one for forward iteration and one for reverse iteration 1315 1327 // this way we have two scans in parallel which should be faster! 1316 - let f_handle = spawn_scan_thread!(next, 0, false, ops / 2); 1317 - let b_handle = spawn_scan_thread!(next_back, workers / 2, true, ops - (ops / 2)); 1328 + let f_count = ops / 2; 1329 + let f_handle = spawn_scan_thread!(next, 0, false, f_count); 1330 + let b_count = ops - f_count; 1331 + let b_handle = spawn_scan_thread!(next_back, workers / 2, true, b_count); 1318 1332 1319 1333 f_handle.join().unwrap()?; 1320 1334 b_handle.join().unwrap()?;