tracks lexicons and how many times they appeared on the jetstream
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor(server): use existing Events type, create LatencyMillis for better latency formatting

dusk c806b451 9c8c6ece

+30 -18
+30 -18
server/src/api.rs
··· 1 - use std::{collections::HashMap, net::SocketAddr, ops::Deref, sync::Arc, time::Duration}; 1 + use std::{ 2 + collections::HashMap, fmt::Display, net::SocketAddr, ops::Deref, sync::Arc, time::Duration, 3 + }; 2 4 3 5 use anyhow::anyhow; 4 6 use axum::{Json, Router, extract::State, http::Request, response::Response, routing::get}; ··· 18 20 error::{AppError, AppResult}, 19 21 }; 20 22 23 + struct LatencyMillis(u128); 24 + 25 + impl From<Duration> for LatencyMillis { 26 + fn from(duration: Duration) -> Self { 27 + LatencyMillis(duration.as_millis()) 28 + } 29 + } 30 + 31 + impl Display for LatencyMillis { 32 + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 33 + write!(f, "{}ms", self.0) 34 + } 35 + } 36 + 21 37 pub async fn serve(db: Arc<Db>, cancel_token: CancellationToken) -> AppResult<()> { 22 38 let app = Router::new() 23 39 .route("/events", get(events)) ··· 41 57 } 42 58 span 43 59 }) 44 - .on_request(|request: &Request<_>, span: &Span| { 60 + .on_request(|_request: &Request<_>, span: &Span| { 45 61 let _ = span.enter(); 46 62 tracing::info!("processing") 47 63 }) 48 64 .on_response(|response: &Response<_>, latency: Duration, span: &Span| { 49 65 let _ = span.enter(); 50 - tracing::info!({code = %response.status().as_u16(), latency = %latency.as_millis()}, "processed") 66 + tracing::info!({code = %response.status().as_u16(), latency = %LatencyMillis::from(latency)}, "processed") 51 67 }) 52 68 .on_eos(()) 53 69 .on_failure(|error: ServerErrorsFailureClass, _: Duration, span: &Span| { ··· 82 98 deleted_count: u128, 83 99 last_seen: u64, 84 100 } 101 + 85 102 #[derive(Serialize)] 86 103 struct Events { 87 104 per_second: usize, 88 105 events: HashMap<SmolStr, NsidCount>, 89 106 } 90 - #[derive(Serialize)] 91 - struct EventsRef<'a> { 92 - per_second: usize, 93 - events: &'a HashMap<SmolStr, NsidCount>, 94 - } 107 + 95 108 async fn events(db: State<Arc<Db>>) -> AppResult<Json<Events>> { 96 109 let mut events = HashMap::new(); 97 110 for result in db.get_counts() { ··· 116 129 ws.on_upgrade(move |mut socket| { 117 130 (async move { 118 131 let mut listener = db.new_listener(); 119 - let mut buffer = HashMap::<SmolStr, NsidCount>::with_capacity(10); 132 + let mut data = Events { 133 + events: HashMap::<SmolStr, NsidCount>::with_capacity(10), 134 + per_second: 0, 135 + }; 120 136 let mut updates = 0; 121 137 while let Ok((nsid, counts)) = listener.recv().await { 122 - buffer.insert( 138 + data.events.insert( 123 139 nsid, 124 140 NsidCount { 125 141 count: counts.count, ··· 129 145 ); 130 146 updates += 1; 131 147 // send 20 times every second max 132 - let per_second = db.eps(); 133 - if updates >= per_second / 20 { 134 - let msg = serde_json::to_string(&EventsRef { 135 - events: &buffer, 136 - per_second, 137 - }) 138 - .unwrap(); 148 + data.per_second = db.eps(); 149 + if updates >= data.per_second / 16 { 150 + let msg = serde_json::to_string(&data).unwrap(); 139 151 let res = socket.send(Message::text(msg)).await; 140 - buffer.clear(); 152 + data.events.clear(); 141 153 updates = 0; 142 154 if let Err(err) = res { 143 155 tracing::error!("error sending event: {err}");