tracks lexicons and how many times they appeared on the jetstream
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(server): fix request tracing

dusk 28ac2ab1 0c23c8d7

+149 -35
+66
server/Cargo.lock
··· 31 31 ] 32 32 33 33 [[package]] 34 + name = "aho-corasick" 35 + version = "1.1.3" 36 + source = "registry+https://github.com/rust-lang/crates.io-index" 37 + checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916" 38 + dependencies = [ 39 + "memchr", 40 + ] 41 + 42 + [[package]] 34 43 name = "anyhow" 35 44 version = "1.0.98" 36 45 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 755 764 checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5" 756 765 757 766 [[package]] 767 + name = "matchers" 768 + version = "0.1.0" 769 + source = "registry+https://github.com/rust-lang/crates.io-index" 770 + checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" 771 + dependencies = [ 772 + "regex-automata 0.1.10", 773 + ] 774 + 775 + [[package]] 758 776 name = "matchit" 759 777 version = "0.8.4" 760 778 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1000 1018 ] 1001 1019 1002 1020 [[package]] 1021 + name = "regex" 1022 + version = "1.11.1" 1023 + source = "registry+https://github.com/rust-lang/crates.io-index" 1024 + checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" 1025 + dependencies = [ 1026 + "aho-corasick", 1027 + "memchr", 1028 + "regex-automata 0.4.9", 1029 + "regex-syntax 0.8.5", 1030 + ] 1031 + 1032 + [[package]] 1033 + name = "regex-automata" 1034 + version = "0.1.10" 1035 + source = "registry+https://github.com/rust-lang/crates.io-index" 1036 + checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" 1037 + dependencies = [ 1038 + "regex-syntax 0.6.29", 1039 + ] 1040 + 1041 + [[package]] 1042 + name = "regex-automata" 1043 + version = "0.4.9" 1044 + source = "registry+https://github.com/rust-lang/crates.io-index" 1045 + checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" 1046 + dependencies = [ 1047 + "aho-corasick", 1048 + "memchr", 1049 + "regex-syntax 0.8.5", 1050 + ] 1051 + 1052 + [[package]] 1053 + name = "regex-syntax" 1054 + version = "0.6.29" 1055 + source = "registry+https://github.com/rust-lang/crates.io-index" 1056 + checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" 1057 + 1058 + [[package]] 1059 + name = "regex-syntax" 1060 + version = "0.8.5" 1061 + source = "registry+https://github.com/rust-lang/crates.io-index" 1062 + checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" 1063 + 1064 + [[package]] 1003 1065 name = "rend" 1004 1066 version = "0.5.2" 1005 1067 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1643 1705 source = "registry+https://github.com/rust-lang/crates.io-index" 1644 1706 checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" 1645 1707 dependencies = [ 1708 + "matchers", 1646 1709 "nu-ansi-term", 1710 + "once_cell", 1711 + "regex", 1647 1712 "sharded-slab", 1648 1713 "smallvec", 1649 1714 "thread_local", 1715 + "tracing", 1650 1716 "tracing-core", 1651 1717 "tracing-log", 1652 1718 ]
+1 -1
server/Cargo.toml
··· 6 6 [dependencies] 7 7 anyhow = "1.0" 8 8 async-trait = "0.1" 9 - tracing-subscriber = "0.3" 9 + tracing-subscriber = {version = "0.3", features = ["env-filter"]} 10 10 tracing = "0.1" 11 11 tokio = { version = "1", features = ["full"] } 12 12 tokio-util = { version = "0.7", features = ["tracing"] }
+72 -33
server/src/api.rs
··· 1 - use std::{collections::HashMap, net::SocketAddr, sync::Arc}; 1 + use std::{collections::HashMap, net::SocketAddr, ops::Deref, sync::Arc, time::Duration}; 2 2 3 3 use anyhow::anyhow; 4 - use axum::{Json, Router, extract::State, response::Response, routing::get}; 4 + use axum::{Json, Router, extract::State, http::Request, response::Response, routing::get}; 5 5 use axum_tws::{Message, WebSocketUpgrade}; 6 6 use serde::Serialize; 7 7 use smol_str::SmolStr; 8 8 use tokio_util::sync::CancellationToken; 9 9 use tower_http::{ 10 + classify::ServerErrorsFailureClass, 10 11 request_id::{MakeRequestUuid, PropagateRequestIdLayer, SetRequestIdLayer}, 11 12 trace::TraceLayer, 12 13 }; 14 + use tracing::{Instrument, Span, field}; 13 15 14 16 use crate::{ 15 17 db::Db, ··· 20 22 let app = Router::new() 21 23 .route("/events", get(events)) 22 24 .route("/stream_events", get(stream_events)) 25 + .route_layer(PropagateRequestIdLayer::x_request_id()) 26 + .route_layer( 27 + TraceLayer::new_for_http() 28 + .make_span_with(|request: &Request<_>| { 29 + let span = tracing::info_span!( 30 + "request", 31 + method = %request.method(), 32 + uri = %request.uri(), 33 + id = field::Empty, 34 + ip = field::Empty, 35 + ); 36 + if let Some(id) = request.headers().get("x-request-id") { 37 + span.record("id", String::from_utf8_lossy(id.as_bytes()).deref()); 38 + } 39 + if let Some(real_ip) = request.headers().get("x-real-ip") { 40 + span.record("ip", String::from_utf8_lossy(real_ip.as_bytes()).deref()); 41 + } 42 + span 43 + }) 44 + .on_request(|request: &Request<_>, span: &Span| { 45 + let _ = span.enter(); 46 + tracing::info!("processing") 47 + }) 48 + .on_response(|response: &Response<_>, latency: Duration, span: &Span| { 49 + let _ = span.enter(); 50 + tracing::info!({code = %response.status().as_u16(), latency = %latency.as_millis()}, "processed") 51 + }) 52 + .on_eos(()) 53 + .on_failure(|error: ServerErrorsFailureClass, _: Duration, span: &Span| { 54 + let _ = span.enter(); 55 + if matches!(error, ServerErrorsFailureClass::StatusCode(status_code) if status_code.is_server_error()) || matches!(error, ServerErrorsFailureClass::Error(_)) { 56 + tracing::error!("server error: {}", error.to_string().to_lowercase()); 57 + }; 58 + }), 59 + ) 23 60 .route_layer(SetRequestIdLayer::x_request_id(MakeRequestUuid)) 24 - .route_layer(TraceLayer::new_for_http()) 25 - .route_layer(PropagateRequestIdLayer::x_request_id()) 26 61 .with_state(db); 27 62 28 63 let addr = SocketAddr::from(( ··· 77 112 } 78 113 79 114 async fn stream_events(db: State<Arc<Db>>, ws: WebSocketUpgrade) -> Response { 80 - ws.on_upgrade(async move |mut socket| { 81 - let mut listener = db.new_listener(); 82 - let mut buffer = HashMap::<SmolStr, NsidCount>::with_capacity(10); 83 - let mut updates = 0; 84 - while let Ok((nsid, counts)) = listener.recv().await { 85 - buffer.insert( 86 - nsid, 87 - NsidCount { 88 - count: counts.count, 89 - deleted_count: counts.deleted_count, 90 - last_seen: counts.last_seen, 91 - }, 92 - ); 93 - updates += 1; 94 - // send 10 times every second max 95 - let per_second = db.eps(); 96 - if updates >= per_second / 10 { 97 - let msg = serde_json::to_string(&EventsRef { 98 - events: &buffer, 99 - per_second, 100 - }) 101 - .unwrap(); 102 - let res = socket.send(Message::text(msg)).await; 103 - buffer.clear(); 104 - updates = 0; 105 - if let Err(err) = res { 106 - tracing::error!("error sending event: {err}"); 107 - break; 115 + let span = tracing::info_span!(parent: Span::current(), "ws"); 116 + ws.on_upgrade(move |mut socket| { 117 + (async move { 118 + let mut listener = db.new_listener(); 119 + let mut buffer = HashMap::<SmolStr, NsidCount>::with_capacity(10); 120 + let mut updates = 0; 121 + while let Ok((nsid, counts)) = listener.recv().await { 122 + buffer.insert( 123 + nsid, 124 + NsidCount { 125 + count: counts.count, 126 + deleted_count: counts.deleted_count, 127 + last_seen: counts.last_seen, 128 + }, 129 + ); 130 + updates += 1; 131 + // send 10 times every second max 132 + let per_second = db.eps(); 133 + if updates >= per_second / 10 { 134 + let msg = serde_json::to_string(&EventsRef { 135 + events: &buffer, 136 + per_second, 137 + }) 138 + .unwrap(); 139 + let res = socket.send(Message::text(msg)).await; 140 + buffer.clear(); 141 + updates = 0; 142 + if let Err(err) = res { 143 + tracing::error!("error sending event: {err}"); 144 + break; 145 + } 108 146 } 109 147 } 110 - } 148 + }) 149 + .instrument(span) 111 150 }) 112 151 }
+10 -1
server/src/main.rs
··· 4 4 #[cfg(not(target_env = "msvc"))] 5 5 use tikv_jemallocator::Jemalloc; 6 6 use tokio_util::sync::CancellationToken; 7 + use tracing::Level; 8 + use tracing_subscriber::EnvFilter; 7 9 8 10 use crate::{ 9 11 api::serve, ··· 23 25 24 26 #[tokio::main] 25 27 async fn main() { 26 - tracing_subscriber::fmt::fmt().compact().init(); 28 + tracing_subscriber::fmt::fmt() 29 + .with_env_filter( 30 + EnvFilter::builder() 31 + .with_default_directive(Level::INFO.into()) 32 + .from_env_lossy(), 33 + ) 34 + .compact() 35 + .init(); 27 36 28 37 if std::env::args() 29 38 .nth(1)