Server tools to backfill, tail, mirror, and verify PLC logs
50
fork

Configure Feed

Select the types of activity you want to include in your feed.

at debug 298 lines 8.0 kB view raw
1use crate::{GovernorMiddleware, UA, logo}; 2use futures::TryStreamExt; 3use governor::Quota; 4use poem::{ 5 Endpoint, EndpointExt, Error, IntoResponse, Request, Response, Result, Route, Server, get, 6 handler, 7 http::StatusCode, 8 listener::{Listener, TcpListener, acme::AutoCert}, 9 middleware::{AddData, CatchPanic, Compression, Cors, Tracing}, 10 web::{Data, Json}, 11}; 12use reqwest::{Client, Url}; 13use std::{net::SocketAddr, path::PathBuf, time::Duration}; 14 15#[derive(Debug, Clone)] 16struct State { 17 client: Client, 18 plc: Url, 19 upstream: Url, 20} 21 22#[handler] 23fn hello(Data(State { upstream, .. }): Data<&State>) -> String { 24 format!( 25 r#"{} 26 27This is a PLC[1] mirror running Allegedly in mirror mode. Mirror mode wraps and 28synchronizes a local PLC reference server instance[2] (why?[3]). 29 30 31Configured upstream: 32 33 {upstream} 34 35 36Available APIs: 37 38 - GET /_health Health and version info 39 40 - GET /* Proxies to wrapped server; see PLC API docs: 41 https://web.plc.directory/api/redoc 42 43 - POST /* Always rejected. This is a mirror. 44 45 46 tip: try `GET /{{did}}` to resolve an identity 47 48 49Allegedly is a suit of open-source CLI tools for working with PLC logs: 50 51 https://tangled.org/@microcosm.blue/Allegedly 52 53 54[1] https://web.plc.directory 55[2] https://github.com/did-method-plc/did-method-plc 56[3] https://updates.microcosm.blue/3lz7nwvh4zc2u 57"#, 58 logo("mirror") 59 ) 60} 61 62#[handler] 63fn favicon() -> impl IntoResponse { 64 include_bytes!("../favicon.ico").with_content_type("image/x-icon") 65} 66 67fn failed_to_reach_wrapped() -> String { 68 format!( 69 r#"{} 70 71Failed to reach the wrapped reference PLC server. Sorry. 72"#, 73 logo("mirror 502 :( ") 74 ) 75} 76 77async fn plc_status(url: &Url, client: &Client) -> (bool, serde_json::Value) { 78 use serde_json::json; 79 80 let mut url = url.clone(); 81 url.set_path("/_health"); 82 83 let Ok(response) = client.get(url).timeout(Duration::from_secs(3)).send().await else { 84 return (false, json!({"error": "cannot reach plc server"})); 85 }; 86 87 let status = response.status(); 88 89 let Ok(text) = response.text().await else { 90 return (false, json!({"error": "failed to read response body"})); 91 }; 92 93 let body = match serde_json::from_str(&text) { 94 Ok(json) => json, 95 Err(_) => serde_json::Value::String(text.to_string()), 96 }; 97 98 if status.is_success() { 99 (true, body) 100 } else { 101 ( 102 false, 103 json!({ 104 "error": "non-ok status", 105 "status": status.as_str(), 106 "status_code": status.as_u16(), 107 "response": body, 108 }), 109 ) 110 } 111} 112 113#[handler] 114async fn health( 115 Data(State { 116 plc, 117 client, 118 upstream, 119 }): Data<&State>, 120) -> impl IntoResponse { 121 let mut overall_status = StatusCode::OK; 122 let (ok, wrapped_status) = plc_status(plc, client).await; 123 if !ok { 124 overall_status = StatusCode::BAD_GATEWAY; 125 } 126 let (ok, upstream_status) = plc_status(upstream, client).await; 127 if !ok { 128 overall_status = StatusCode::BAD_GATEWAY; 129 } 130 ( 131 overall_status, 132 Json(serde_json::json!({ 133 "server": "allegedly (mirror)", 134 "version": env!("CARGO_PKG_VERSION"), 135 "wrapped_plc": wrapped_status, 136 "upstream_plc": upstream_status, 137 })), 138 ) 139} 140 141#[handler] 142async fn proxy(req: &Request, Data(state): Data<&State>) -> Result<impl IntoResponse> { 143 let mut target = state.plc.clone(); 144 target.set_path(req.uri().path()); 145 let upstream_res = state 146 .client 147 .get(target) 148 .timeout(Duration::from_secs(3)) // should be low latency to wrapped server 149 .headers(req.headers().clone()) 150 .send() 151 .await 152 .map_err(|e| { 153 log::error!("upstream req fail: {e}"); 154 Error::from_string(failed_to_reach_wrapped(), StatusCode::BAD_GATEWAY) 155 })?; 156 157 let http_res: poem::http::Response<reqwest::Body> = upstream_res.into(); 158 let (parts, reqw_body) = http_res.into_parts(); 159 160 let parts = poem::ResponseParts { 161 status: parts.status, 162 version: parts.version, 163 headers: parts.headers, 164 extensions: parts.extensions, 165 }; 166 167 let body = http_body_util::BodyDataStream::new(reqw_body) 168 .map_err(|e| std::io::Error::other(Box::new(e))); 169 170 Ok(Response::from_parts( 171 parts, 172 poem::Body::from_bytes_stream(body), 173 )) 174} 175 176#[handler] 177async fn nope(Data(State { upstream, .. }): Data<&State>) -> (StatusCode, String) { 178 ( 179 StatusCode::BAD_REQUEST, 180 format!( 181 r#"{} 182 183Sorry, this server does not accept POST requests. 184 185You may wish to try upstream: {upstream} 186"#, 187 logo("mirror (nope)") 188 ), 189 ) 190} 191 192#[derive(Debug)] 193pub enum ListenConf { 194 Acme { 195 domains: Vec<String>, 196 cache_path: PathBuf, 197 directory_url: String, 198 }, 199 Bind(SocketAddr), 200} 201 202pub async fn serve(upstream: Url, plc: Url, listen: ListenConf) -> anyhow::Result<&'static str> { 203 log::info!("starting server..."); 204 205 // not using crate CLIENT: don't want the retries etc 206 let client = Client::builder() 207 .user_agent(UA) 208 .timeout(Duration::from_secs(10)) // fallback 209 .build() 210 .expect("reqwest client to build"); 211 212 let state = State { 213 client, 214 plc, 215 upstream: upstream.clone(), 216 }; 217 218 let app = Route::new() 219 .at("/", get(hello)) 220 .at("/favicon.ico", get(favicon)) 221 .at("/_health", get(health)) 222 .at("/:any", get(proxy).post(nope)) 223 .with(AddData::new(state)) 224 .with(Cors::new().allow_credentials(false)) 225 .with(Compression::new()) 226 .with(GovernorMiddleware::new(Quota::per_minute( 227 3000.try_into().expect("ratelimit middleware to build"), 228 ))) 229 .with(CatchPanic::new()) 230 .with(Tracing); 231 232 match listen { 233 ListenConf::Acme { 234 domains, 235 cache_path, 236 directory_url, 237 } => { 238 rustls::crypto::aws_lc_rs::default_provider() 239 .install_default() 240 .expect("crypto provider to be installable"); 241 242 let mut auto_cert = AutoCert::builder() 243 .directory_url(directory_url) 244 .cache_path(cache_path); 245 for domain in domains { 246 auto_cert = auto_cert.domain(domain); 247 } 248 let auto_cert = auto_cert.build().expect("acme config to build"); 249 250 let notice_task = tokio::task::spawn(run_insecure_notice()); 251 let app_res = run(app, TcpListener::bind("0.0.0.0:443").acme(auto_cert)).await; 252 log::warn!("server task ended, aborting insecure server task..."); 253 notice_task.abort(); 254 app_res?; 255 notice_task.await??; 256 } 257 ListenConf::Bind(addr) => run(app, TcpListener::bind(addr)).await?, 258 } 259 260 Ok("server (uh oh?)") 261} 262 263async fn run<A, L>(app: A, listener: L) -> std::io::Result<()> 264where 265 A: Endpoint + 'static, 266 L: Listener + 'static, 267{ 268 Server::new(listener) 269 .name("allegedly (mirror)") 270 .run(app) 271 .await 272} 273 274/// kick off a tiny little server on a tokio task to tell people to use 443 275async fn run_insecure_notice() -> Result<(), std::io::Error> { 276 #[handler] 277 fn oop_plz_be_secure() -> (StatusCode, String) { 278 ( 279 StatusCode::BAD_REQUEST, 280 format!( 281 r#"{} 282 283You probably want to change your request to use HTTPS instead of HTTP. 284"#, 285 logo("mirror (tls on 443 please)") 286 ), 287 ) 288 } 289 290 let app = Route::new() 291 .at("/", get(oop_plz_be_secure)) 292 .at("/favicon.ico", get(favicon)) 293 .with(Tracing); 294 Server::new(TcpListener::bind("0.0.0.0:80")) 295 .name("allegedly (mirror:80 helper)") 296 .run(app) 297 .await 298}