Server tools to backfill, tail, mirror, and verify PLC logs
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

add a health endpoint

phil 30d8dc19 5adfa74c

+88 -16
+1 -1
Cargo.toml
··· 16 16 http-body-util = "0.1.3" 17 17 log = "0.4.28" 18 18 poem = { version = "3.1.12", features = ["compression"] } 19 - reqwest = { version = "0.12.23", features = ["stream"] } 19 + reqwest = { version = "0.12.23", features = ["stream", "json"] } 20 20 reqwest-middleware = "0.4.2" 21 21 reqwest-retry = "0.7.0" 22 22 serde = "1.0.219"
+7 -8
src/client.rs
··· 3 3 use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff}; 4 4 use std::sync::LazyLock; 5 5 6 + pub const UA: &str = concat!( 7 + "allegedly, v", 8 + env!("CARGO_PKG_VERSION"), 9 + " (from @microcosm.blue; contact @bad-example.com)" 10 + ); 11 + 6 12 pub static CLIENT: LazyLock<ClientWithMiddleware> = LazyLock::new(|| { 7 - let inner = Client::builder() 8 - .user_agent(concat!( 9 - "allegedly, v", 10 - env!("CARGO_PKG_VERSION"), 11 - " (from @microcosm.blue; contact @bad-example.com)" 12 - )) 13 - .build() 14 - .unwrap(); 13 + let inner = Client::builder().user_agent(UA).build().unwrap(); 15 14 16 15 let policy = ExponentialBackoff::builder().build_with_max_retries(12); 17 16
+1 -1
src/lib.rs
··· 9 9 mod weekly; 10 10 11 11 pub use backfill::backfill; 12 - pub use client::CLIENT; 12 + pub use client::{CLIENT, UA}; 13 13 pub use mirror::serve; 14 14 pub use plc_pg::{Db, backfill_to_pg, pages_to_pg}; 15 15 pub use poll::{PageBoundaryState, get_page, poll_upstream};
+79 -6
src/mirror.rs
··· 1 - use crate::{GovernorMiddleware, logo}; 1 + use crate::{GovernorMiddleware, UA, logo}; 2 2 use futures::TryStreamExt; 3 3 use governor::Quota; 4 4 use poem::{ ··· 6 6 http::StatusCode, 7 7 listener::TcpListener, 8 8 middleware::{AddData, CatchPanic, Compression, Cors, Tracing}, 9 - web::Data, 9 + web::{Data, Json}, 10 10 }; 11 11 use reqwest::{Client, Url}; 12 12 use std::{net::SocketAddr, time::Duration}; 13 13 14 14 #[derive(Debug, Clone)] 15 15 struct State { 16 - client: Client, 16 + upstream_client: Client, 17 + wrapped_client: Client, 17 18 plc: Url, 18 19 upstream: Url, 19 20 } ··· 58 59 "#, 59 60 logo("mirror 502 :( ") 60 61 ) 62 + } 63 + 64 + async fn plc_status(url: &Url, client: &Client) -> (bool, serde_json::Value) { 65 + use serde_json::json; 66 + 67 + let mut url = url.clone(); 68 + url.set_path("/_health"); 69 + 70 + let Ok(response) = client.get(url).send().await else { 71 + return (false, json!({"error": "cannot reach plc server"})); 72 + }; 73 + 74 + let status = response.status(); 75 + 76 + let Ok(text) = response.text().await else { 77 + return (false, json!({"error": "failed to read response body"})); 78 + }; 79 + 80 + let body = match serde_json::from_str(&text) { 81 + Ok(json) => json, 82 + Err(_) => serde_json::Value::String(text.to_string()), 83 + }; 84 + 85 + if status.is_success() { 86 + (true, body) 87 + } else { 88 + ( 89 + false, 90 + json!({ 91 + "error": "non-ok status", 92 + "status": status.as_str(), 93 + "status_code": status.as_u16(), 94 + "response": body, 95 + }), 96 + ) 97 + } 98 + } 99 + 100 + #[handler] 101 + async fn health( 102 + Data(State { 103 + plc, 104 + wrapped_client, 105 + upstream, 106 + upstream_client, 107 + }): Data<&State>, 108 + ) -> impl IntoResponse { 109 + let mut overall_status = StatusCode::OK; 110 + let (ok, wrapped_status) = plc_status(plc, wrapped_client).await; 111 + if !ok { 112 + overall_status = StatusCode::BAD_GATEWAY; 113 + } 114 + let (ok, upstream_status) = plc_status(upstream, upstream_client).await; 115 + if !ok { 116 + overall_status = StatusCode::BAD_GATEWAY; 117 + } 118 + ( 119 + overall_status, 120 + Json(serde_json::json!({ 121 + "server": "allegedly (mirror)", 122 + "version": env!("CARGO_PKG_VERSION"), 123 + "wrapped_plc": wrapped_status, 124 + "upstream_plc": upstream_status, 125 + })), 126 + ) 61 127 } 62 128 63 129 #[handler] ··· 65 131 let mut target = state.plc.clone(); 66 132 target.set_path(req.uri().path()); 67 133 let upstream_res = state 68 - .client 134 + .upstream_client 69 135 .get(target) 70 136 .headers(req.headers().clone()) 71 137 .send() ··· 111 177 } 112 178 113 179 pub async fn serve(upstream: &Url, plc: Url, bind: SocketAddr) -> std::io::Result<()> { 114 - let wrapped_req_client = Client::builder() 180 + let wrapped_client = Client::builder() 115 181 .timeout(Duration::from_secs(3)) 116 182 .build() 117 183 .unwrap(); 184 + let upstream_client = Client::builder() 185 + .user_agent(UA) 186 + .timeout(Duration::from_secs(6)) 187 + .build() 188 + .unwrap(); 118 189 119 190 let state = State { 120 - client: wrapped_req_client, 191 + wrapped_client, 192 + upstream_client, 121 193 plc, 122 194 upstream: upstream.clone(), 123 195 }; 124 196 125 197 let app = Route::new() 126 198 .at("/", get(hello)) 199 + .at("/_health", get(health)) 127 200 .at("/:any", get(proxy).post(nope)) 128 201 .with(AddData::new(state)) 129 202 .with(Cors::new().allow_credentials(false))