tracks lexicons and how many times they appeared on the jetstream
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: track and send events per second, send 10 WS messages maximum per second, buffering in between

dusk 0c23c8d7 0e76b961

+174 -108
+3 -3
client/src/lib/api.ts
··· 1 1 import { dev } from "$app/environment"; 2 - import type { EventRecord } from "./types"; 2 + import type { Events } from "./types"; 3 3 import { PUBLIC_API_URL } from "$env/static/public"; 4 4 5 - export const fetchEvents = async (): Promise<EventRecord[]> => { 5 + export const fetchEvents = async (): Promise<Events> => { 6 6 const response = await fetch( 7 7 `${dev ? "http" : "https"}://${PUBLIC_API_URL}/events`, 8 8 ); ··· 11 11 } 12 12 13 13 const data = await response.json(); 14 - return data.events; 14 + return data; 15 15 };
+14 -8
client/src/lib/components/StatsCard.svelte
··· 1 1 <script lang="ts"> 2 2 import { formatNumber } from "$lib/format"; 3 3 4 - interface Props { 5 - title: string; 6 - value: number; 7 - colorScheme: "green" | "red" | "orange"; 8 - } 9 - 10 - let { title, value, colorScheme }: Props = $props(); 11 - 12 4 const colorClasses = { 13 5 green: { 14 6 bg: "from-green-50 to-green-100", ··· 22 14 titleText: "text-red-700", 23 15 valueText: "text-red-900", 24 16 }, 17 + turqoise: { 18 + bg: "from-teal-50 to-teal-100", 19 + border: "border-teal-200", 20 + titleText: "text-teal-700", 21 + valueText: "text-teal-900", 22 + }, 25 23 orange: { 26 24 bg: "from-orange-50 to-orange-100", 27 25 border: "border-orange-200", ··· 29 27 valueText: "text-orange-900", 30 28 }, 31 29 }; 30 + 31 + interface Props { 32 + title: string; 33 + value: number; 34 + colorScheme: keyof typeof colorClasses; 35 + } 36 + 37 + let { title, value, colorScheme }: Props = $props(); 32 38 33 39 const colors = $derived(colorClasses[colorScheme]); 34 40 </script>
+6 -3
client/src/lib/types.ts
··· 1 + export type Events = { 2 + per_second: number; 3 + events: Record<string, EventRecord>; 4 + }; 1 5 export type EventRecord = { 2 - nsid: string; 3 6 last_seen: number; 4 7 count: number; 5 8 deleted_count: number; 6 9 }; 7 - 8 - export type NsidCounts = { 10 + export type NsidCount = { 11 + nsid: string; 9 12 last_seen: number; 10 13 count: number; 11 14 deleted_count: number;
+24 -66
client/src/routes/+page.svelte
··· 1 1 <script lang="ts"> 2 2 import { dev } from "$app/environment"; 3 - import type { EventRecord, NsidCounts } from "$lib/types"; 3 + import type { EventRecord, NsidCount } from "$lib/types"; 4 4 import { onMount, onDestroy } from "svelte"; 5 5 import { writable } from "svelte/store"; 6 6 import { PUBLIC_API_URL } from "$env/static/public"; ··· 11 11 import EventCard from "$lib/components/EventCard.svelte"; 12 12 import FilterControls from "$lib/components/FilterControls.svelte"; 13 13 14 - const events = writable(new Map<string, NsidCounts>()); 15 - let eventsList: EventRecord[] = $state([]); 14 + const events = writable(new Map<string, EventRecord>()); 15 + let eventsList: NsidCount[] = $state([]); 16 16 events.subscribe((value) => { 17 17 eventsList = value 18 18 .entries() ··· 23 23 .toArray(); 24 24 eventsList.sort((a, b) => b.count - a.count); 25 25 }); 26 + let per_second = $state(0); 26 27 27 - // Backpressure system 28 - let eventBuffer: EventRecord[] = []; 29 - let updateTimer: number | null = null; 30 - const BATCH_SIZE = 10; 31 - const UPDATE_INTERVAL = 100; // ms 32 - 33 - const flushEventBuffer = () => { 34 - if (eventBuffer.length === 0) return; 35 - 36 - events.update((map) => { 37 - for (const event of eventBuffer) { 38 - map.set(event.nsid, event); 39 - } 40 - return map; 41 - }); 42 - 43 - eventBuffer = []; 44 - }; 45 - 46 - const scheduleUpdate = () => { 47 - if (updateTimer !== null) return; 48 - 49 - updateTimer = window.setTimeout(() => { 50 - flushEventBuffer(); 51 - updateTimer = null; 52 - }, UPDATE_INTERVAL); 53 - }; 54 - let all: NsidCounts = $derived( 28 + let all: EventRecord = $derived( 55 29 eventsList.reduce( 56 30 (acc, event) => { 57 31 return { ··· 92 66 websocketStatus = "connected"; 93 67 }; 94 68 websocket.onmessage = async (event) => { 95 - const view = new DataView(event.data); 96 - const decoder = new TextDecoder("utf-8"); 97 - const jsonStr = decoder.decode(view); 98 - const jsonData = JSON.parse(jsonStr); 69 + const jsonData = JSON.parse(event.data); 99 70 100 - // Add to buffer instead of immediate update 101 - eventBuffer.push(jsonData); 102 - 103 - // If buffer is full, flush immediately 104 - if (eventBuffer.length >= BATCH_SIZE) { 105 - if (updateTimer !== null) { 106 - window.clearTimeout(updateTimer); 107 - updateTimer = null; 71 + if (jsonData.per_second > 0) { 72 + per_second = jsonData.per_second; 73 + } 74 + events.update((map) => { 75 + for (const [nsid, event] of Object.entries(jsonData.events)) { 76 + map.set(nsid, event as EventRecord); 108 77 } 109 - flushEventBuffer(); 110 - } else { 111 - // Otherwise schedule a delayed update 112 - scheduleUpdate(); 113 - } 78 + return map; 79 + }); 114 80 }; 115 81 websocket.onerror = (error) => { 116 82 console.error("ws error:", error); ··· 120 86 console.log("ws closed"); 121 87 isStreamOpen = false; 122 88 websocketStatus = "disconnected"; 123 - 124 - // Flush any remaining events when connection closes 125 - if (updateTimer !== null) { 126 - window.clearTimeout(updateTimer); 127 - updateTimer = null; 128 - } 129 - flushEventBuffer(); 130 89 }; 131 90 }; 132 91 ··· 134 93 try { 135 94 error = null; 136 95 const data = await fetchEvents(); 96 + per_second = data.per_second; 137 97 events.update((map) => { 138 - for (const event of data) { 139 - map.set(event.nsid, event); 98 + for (const [nsid, event] of Object.entries(data.events)) { 99 + map.set(nsid, event); 140 100 } 141 101 return map; 142 102 }); ··· 155 115 }); 156 116 157 117 onDestroy(() => { 158 - // Clean up timer and flush any remaining events 159 - if (updateTimer !== null) { 160 - window.clearTimeout(updateTimer); 161 - updateTimer = null; 162 - } 163 - flushEventBuffer(); 164 - 165 118 // Close WebSocket connection 166 119 if (websocket) { 167 120 websocket.close(); 168 121 } 169 122 }); 170 123 171 - const filterEvents = (events: EventRecord[]) => { 124 + const filterEvents = (events: NsidCount[]) => { 172 125 let filtered = events; 173 126 174 127 // Apply regex filter ··· 207 160 </header> 208 161 209 162 <div 210 - class="mx-auto w-fit grid grid-cols-2 md:grid-cols-3 gap-2 md:gap-5 mb-8" 163 + class="mx-auto w-fit grid grid-cols-2 md:grid-cols-4 gap-2 md:gap-5 mb-8" 211 164 > 212 165 <StatsCard 213 166 title="total creation" ··· 218 171 title="total deletion" 219 172 value={all.deleted_count} 220 173 colorScheme="red" 174 + /> 175 + <StatsCard 176 + title="per second" 177 + value={per_second} 178 + colorScheme="turqoise" 221 179 /> 222 180 <StatsCard 223 181 title="unique collections"
+70 -1
server/Cargo.lock
··· 18 18 checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" 19 19 20 20 [[package]] 21 + name = "ahash" 22 + version = "0.8.12" 23 + source = "registry+https://github.com/rust-lang/crates.io-index" 24 + checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" 25 + dependencies = [ 26 + "cfg-if", 27 + "getrandom 0.3.3", 28 + "once_cell", 29 + "version_check", 30 + "zerocopy", 31 + ] 32 + 33 + [[package]] 21 34 name = "anyhow" 22 35 version = "1.0.98" 23 36 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 33 46 "quote", 34 47 "syn", 35 48 ] 49 + 50 + [[package]] 51 + name = "atomic-waker" 52 + version = "1.1.2" 53 + source = "registry+https://github.com/rust-lang/crates.io-index" 54 + checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" 36 55 37 56 [[package]] 38 57 name = "autocfg" ··· 490 509 checksum = "17e2ac29387b1aa07a1e448f7bb4f35b500787971e965b02842b900afa5c8f6f" 491 510 492 511 [[package]] 512 + name = "h2" 513 + version = "0.4.11" 514 + source = "registry+https://github.com/rust-lang/crates.io-index" 515 + checksum = "17da50a276f1e01e0ba6c029e47b7100754904ee8a278f886546e98575380785" 516 + dependencies = [ 517 + "atomic-waker", 518 + "bytes", 519 + "fnv", 520 + "futures-core", 521 + "futures-sink", 522 + "http", 523 + "indexmap", 524 + "slab", 525 + "tokio", 526 + "tokio-util", 527 + "tracing", 528 + ] 529 + 530 + [[package]] 493 531 name = "hashbrown" 494 532 version = "0.14.5" 495 533 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 556 594 "bytes", 557 595 "futures-channel", 558 596 "futures-util", 597 + "h2", 559 598 "http", 560 599 "http-body", 561 600 "httparse", ··· 880 919 checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" 881 920 882 921 [[package]] 922 + name = "pingora-limits" 923 + version = "0.5.0" 924 + source = "registry+https://github.com/rust-lang/crates.io-index" 925 + checksum = "a719a8cb5558ca06bd6076c97b8905d500ea556da89e132ba53d4272844f95b9" 926 + dependencies = [ 927 + "ahash", 928 + ] 929 + 930 + [[package]] 883 931 name = "proc-macro2" 884 932 version = "1.0.95" 885 933 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1083 1131 "security-framework", 1084 1132 "security-framework-sys", 1085 1133 "webpki-root-certs", 1086 - "windows-sys 0.52.0", 1134 + "windows-sys 0.59.0", 1087 1135 ] 1088 1136 1089 1137 [[package]] ··· 1231 1279 "fjall", 1232 1280 "futures-util", 1233 1281 "papaya", 1282 + "pingora-limits", 1234 1283 "rkyv", 1235 1284 "rustls", 1236 1285 "serde", ··· 2024 2073 version = "0.8.15" 2025 2074 source = "registry+https://github.com/rust-lang/crates.io-index" 2026 2075 checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3" 2076 + 2077 + [[package]] 2078 + name = "zerocopy" 2079 + version = "0.8.26" 2080 + source = "registry+https://github.com/rust-lang/crates.io-index" 2081 + checksum = "1039dd0d3c310cf05de012d8a39ff557cb0d23087fd44cad61df08fc31907a2f" 2082 + dependencies = [ 2083 + "zerocopy-derive", 2084 + ] 2085 + 2086 + [[package]] 2087 + name = "zerocopy-derive" 2088 + version = "0.8.26" 2089 + source = "registry+https://github.com/rust-lang/crates.io-index" 2090 + checksum = "9ecf5b4cc5364572d7f4c329661bcc82724222973f2cab6f050a4e5c22f75181" 2091 + dependencies = [ 2092 + "proc-macro2", 2093 + "quote", 2094 + "syn", 2095 + ] 2027 2096 2028 2097 [[package]] 2029 2098 name = "zeroize"
+2 -1
server/Cargo.toml
··· 14 14 tokio-websockets = { version = "0.12", features = ["client", "rustls-platform-verifier", "getrandom", "ring"] } 15 15 futures-util = "0.3" 16 16 axum = { version = "0.8", default-features = false, features = ["http1", "tokio", "tracing", "json"] } 17 - axum-tws = { git = "https://github.com/90-008/axum-tws.git" } 17 + axum-tws = { git = "https://github.com/90-008/axum-tws.git", features = ["http2"] } 18 + pingora-limits = "0.5" 18 19 tower-http = {version = "0.6", features = ["request-id", "trace"]} 19 20 fjall = { version = "2", default-features = false, features = ["miniz", "lz4"] } 20 21 rkyv = {version = "0.8", features = ["unaligned"]}
+46 -25
server/src/api.rs
··· 1 - use std::{net::SocketAddr, sync::Arc}; 1 + use std::{collections::HashMap, net::SocketAddr, sync::Arc}; 2 2 3 3 use anyhow::anyhow; 4 4 use axum::{Json, Router, extract::State, response::Response, routing::get}; ··· 43 43 44 44 #[derive(Serialize)] 45 45 struct NsidCount { 46 - nsid: SmolStr, 47 46 count: u128, 48 47 deleted_count: u128, 49 48 last_seen: u64, 50 49 } 51 50 #[derive(Serialize)] 52 51 struct Events { 53 - events: Vec<NsidCount>, 52 + per_second: usize, 53 + events: HashMap<SmolStr, NsidCount>, 54 + } 55 + #[derive(Serialize)] 56 + struct EventsRef<'a> { 57 + per_second: usize, 58 + events: &'a HashMap<SmolStr, NsidCount>, 54 59 } 55 60 async fn events(db: State<Arc<Db>>) -> AppResult<Json<Events>> { 56 - let mut events = Vec::new(); 61 + let mut events = HashMap::new(); 57 62 for result in db.get_counts() { 58 63 let (nsid, counts) = result?; 59 - events.push(NsidCount { 64 + events.insert( 60 65 nsid, 61 - count: counts.count, 62 - deleted_count: counts.deleted_count, 63 - last_seen: counts.last_seen, 64 - }) 66 + NsidCount { 67 + count: counts.count, 68 + deleted_count: counts.deleted_count, 69 + last_seen: counts.last_seen, 70 + }, 71 + ); 65 72 } 66 - events.sort_unstable_by(|a, b| b.count.cmp(&a.count)); 67 - Ok(Json(Events { events })) 73 + Ok(Json(Events { 74 + events, 75 + per_second: db.eps(), 76 + })) 68 77 } 69 78 70 79 async fn stream_events(db: State<Arc<Db>>, ws: WebSocketUpgrade) -> Response { 71 80 ws.on_upgrade(async move |mut socket| { 72 81 let mut listener = db.new_listener(); 82 + let mut buffer = HashMap::<SmolStr, NsidCount>::with_capacity(10); 83 + let mut updates = 0; 73 84 while let Ok((nsid, counts)) = listener.recv().await { 74 - let res = socket 75 - .send(Message::text( 76 - serde_json::to_string(&NsidCount { 77 - nsid, 78 - count: counts.count, 79 - deleted_count: counts.deleted_count, 80 - last_seen: counts.last_seen, 81 - }) 82 - .unwrap(), 83 - )) 84 - .await; 85 - if let Err(err) = res { 86 - tracing::error!("error sending event: {err}"); 87 - break; 85 + buffer.insert( 86 + nsid, 87 + NsidCount { 88 + count: counts.count, 89 + deleted_count: counts.deleted_count, 90 + last_seen: counts.last_seen, 91 + }, 92 + ); 93 + updates += 1; 94 + // send 10 times every second max 95 + let per_second = db.eps(); 96 + if updates >= per_second / 10 { 97 + let msg = serde_json::to_string(&EventsRef { 98 + events: &buffer, 99 + per_second, 100 + }) 101 + .unwrap(); 102 + let res = socket.send(Message::text(msg)).await; 103 + buffer.clear(); 104 + updates = 0; 105 + if let Err(err) = res { 106 + tracing::error!("error sending event: {err}"); 107 + break; 108 + } 88 109 } 89 110 } 90 111 })
+9 -1
server/src/db.rs
··· 1 - use std::{ops::Deref, path::Path}; 1 + use std::{ops::Deref, path::Path, time::Duration}; 2 2 3 3 use fjall::{Config, Keyspace, Partition, PartitionCreateOptions}; 4 + use pingora_limits::rate::Rate; 4 5 use rkyv::{Archive, Deserialize, Serialize, rancor::Error}; 5 6 use smol_str::SmolStr; 6 7 use tokio::sync::broadcast; ··· 59 60 hits: papaya::HashMap<SmolStr, Partition>, 60 61 counts: Partition, 61 62 event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>, 63 + eps: Rate, 62 64 } 63 65 64 66 impl Db { ··· 75 77 )?, 76 78 inner: ks, 77 79 event_broadcaster: broadcast::channel(1000).0, 80 + eps: Rate::new(Duration::from_secs(1)), 78 81 }) 79 82 } 80 83 84 + pub fn eps(&self) -> usize { 85 + self.eps.rate(&()) as usize 86 + } 87 + 81 88 pub fn new_listener(&self) -> broadcast::Receiver<(SmolStr, NsidCounts)> { 82 89 self.event_broadcaster.subscribe() 83 90 } ··· 119 126 if self.event_broadcaster.receiver_count() > 0 { 120 127 let _ = self.event_broadcaster.send((SmolStr::new(&nsid), counts)); 121 128 } 129 + self.eps.observe(&(), 1); 122 130 Ok(()) 123 131 } 124 132