don't
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(jetstream): metrics

Signed-off-by: tjh <x@tjh.dev>

tjh 71a35ccf c5a03557

+240 -137
+102
Cargo.lock
··· 9 9 checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" 10 10 11 11 [[package]] 12 + name = "ahash" 13 + version = "0.8.12" 14 + source = "registry+https://github.com/rust-lang/crates.io-index" 15 + checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" 16 + dependencies = [ 17 + "cfg-if", 18 + "once_cell", 19 + "version_check", 20 + "zerocopy", 21 + ] 22 + 23 + [[package]] 12 24 name = "aho-corasick" 13 25 version = "1.1.4" 14 26 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2124 2112 "flume", 2125 2113 "futures-util", 2126 2114 "gordian-types", 2115 + "metrics", 2116 + "metrics-exporter-prometheus", 2127 2117 "rustls", 2128 2118 "serde", 2129 2119 "serde_json", ··· 2497 2483 "hyper", 2498 2484 "hyper-util", 2499 2485 "rustls", 2486 + "rustls-native-certs", 2500 2487 "rustls-pki-types", 2501 2488 "tokio", 2502 2489 "tokio-rustls", ··· 3008 2993 ] 3009 2994 3010 2995 [[package]] 2996 + name = "metrics" 2997 + version = "0.24.3" 2998 + source = "registry+https://github.com/rust-lang/crates.io-index" 2999 + checksum = "5d5312e9ba3771cfa961b585728215e3d972c950a3eed9252aa093d6301277e8" 3000 + dependencies = [ 3001 + "ahash", 3002 + "portable-atomic", 3003 + ] 3004 + 3005 + [[package]] 3006 + name = "metrics-exporter-prometheus" 3007 + version = "0.18.1" 3008 + source = "registry+https://github.com/rust-lang/crates.io-index" 3009 + checksum = "3589659543c04c7dc5526ec858591015b87cd8746583b51b48ef4353f99dbcda" 3010 + dependencies = [ 3011 + "base64", 3012 + "http-body-util", 3013 + "hyper", 3014 + "hyper-rustls", 3015 + "hyper-util", 3016 + "indexmap", 3017 + "ipnet", 3018 + "metrics", 3019 + "metrics-util", 3020 + "quanta", 3021 + "rustls", 3022 + "thiserror 2.0.18", 3023 + "tokio", 3024 + "tracing", 3025 + ] 3026 + 3027 + [[package]] 3028 + name = "metrics-util" 3029 + version = "0.20.1" 3030 + source = "registry+https://github.com/rust-lang/crates.io-index" 3031 + checksum = "cdfb1365fea27e6dd9dc1dbc19f570198bc86914533ad639dae939635f096be4" 3032 + dependencies = [ 3033 + "crossbeam-epoch", 3034 + "crossbeam-utils", 3035 + "hashbrown 0.16.1", 3036 + "metrics", 3037 + "quanta", 3038 + "rand 0.9.2", 3039 + "rand_xoshiro", 3040 + "sketches-ddsketch", 3041 + ] 3042 + 3043 + [[package]] 3011 3044 name = "mime" 3012 3045 version = "0.3.17" 3013 3046 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3440 3377 ] 3441 3378 3442 3379 [[package]] 3380 + name = "quanta" 3381 + version = "0.12.6" 3382 + source = "registry+https://github.com/rust-lang/crates.io-index" 3383 + checksum = "f3ab5a9d756f0d97bdc89019bd2e4ea098cf9cde50ee7564dde6b81ccc8f06c7" 3384 + dependencies = [ 3385 + "crossbeam-utils", 3386 + "libc", 3387 + "once_cell", 3388 + "raw-cpuid", 3389 + "wasi", 3390 + "web-sys", 3391 + "winapi", 3392 + ] 3393 + 3394 + [[package]] 3443 3395 name = "quinn" 3444 3396 version = "0.11.9" 3445 3397 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3582 3504 checksum = "76afc826de14238e6e8c374ddcc1fa19e374fd8dd986b0d2af0d02377261d83c" 3583 3505 dependencies = [ 3584 3506 "getrandom 0.3.4", 3507 + ] 3508 + 3509 + [[package]] 3510 + name = "rand_xoshiro" 3511 + version = "0.7.0" 3512 + source = "registry+https://github.com/rust-lang/crates.io-index" 3513 + checksum = "f703f4665700daf5512dcca5f43afa6af89f09db47fb56be587f80636bda2d41" 3514 + dependencies = [ 3515 + "rand_core 0.9.5", 3516 + ] 3517 + 3518 + [[package]] 3519 + name = "raw-cpuid" 3520 + version = "11.6.0" 3521 + source = "registry+https://github.com/rust-lang/crates.io-index" 3522 + checksum = "498cd0dc59d73224351ee52a95fee0f1a617a2eae0e7d9d720cc622c73a54186" 3523 + dependencies = [ 3524 + "bitflags", 3585 3525 ] 3586 3526 3587 3527 [[package]] ··· 4125 4029 version = "0.3.8" 4126 4030 source = "registry+https://github.com/rust-lang/crates.io-index" 4127 4031 checksum = "e320a6c5ad31d271ad523dcf3ad13e2767ad8b1cb8f047f75a8aeaf8da139da2" 4032 + 4033 + [[package]] 4034 + name = "sketches-ddsketch" 4035 + version = "0.3.0" 4036 + source = "registry+https://github.com/rust-lang/crates.io-index" 4037 + checksum = "c1e9a774a6c28142ac54bb25d25562e6bcf957493a184f15ad4eebccb23e410a" 4128 4038 4129 4039 [[package]] 4130 4040 name = "slab"
+1
Cargo.toml
··· 34 34 data-encoding = "2.9.0" 35 35 exn = "0.3.0" 36 36 gix = { version = "0.78.0", features = ["max-performance"] } 37 + metrics = "0.24.3" 37 38 rand = "0.9.2" 38 39 reqwest = { version = "0.13.1", features = ["form", "json"] } 39 40 serde = { version = "1.0.226", features = ["derive"] }
+2
crates/gordian-jetstream/Cargo.toml
··· 10 10 [dependencies] 11 11 gordian-types = { workspace = true, features = ["serde"] } 12 12 13 + metrics.workspace = true 13 14 serde.workspace = true 14 15 serde_json.workspace = true 15 16 thiserror.workspace = true ··· 23 22 fastrand = "2.3.0" 24 23 flume = "0.11.1" 25 24 futures-util = "0.3.31" 25 + metrics-exporter-prometheus = { version = "0.18.1", optional = true } 26 26 rustls = { version = "0.23.37", features = ["aws-lc-rs"] } 27 27 tokio = { version = "1.48.0", features = ["macros", "rt", "sync", "time"] } 28 28 tokio-tungstenite = { version = "0.28.0", features = ["rustls-tls-native-roots"] }
+17 -14
crates/gordian-jetstream/examples/cli.rs
··· 21 21 #[arg(long)] 22 22 pub hide_identity: bool, 23 23 24 - /// Print Jetstream metrics after each event. 24 + /// Spawn a metrics endpoint at http://localhost:9000/ 25 + #[cfg(feature = "metrics-exporter-prometheus")] 25 26 #[arg(long)] 26 27 pub metrics: bool, 27 28 ··· 46 45 47 46 use gordian_jetstream::Event; 48 47 use gordian_jetstream::client_config::JetstreamConfig; 48 + #[cfg(feature = "metrics-exporter-prometheus")] 49 + use metrics_exporter_prometheus::PrometheusBuilder; 49 50 use tracing::level_filters::LevelFilter; 50 51 use tracing_subscriber::EnvFilter; 51 52 use tracing_subscriber::layer::SubscriberExt as _; ··· 69 66 .init(); 70 67 71 68 let arguments = cli::parse(); 69 + 70 + #[cfg(feature = "metrics-exporter-prometheus")] 71 + if arguments.metrics { 72 + if let Err(error) = PrometheusBuilder::new().install() { 73 + tracing::error!(?error, "failed to install metrics exporter"); 74 + } 75 + } 76 + 72 77 let mut log = arguments 73 78 .log 74 79 .map(|path| File::options().create(true).append(true).open(path)) ··· 109 98 } 110 99 111 100 tracing::debug!(?config); 112 - let (client, rx, task) = config.connect(); 101 + let (_client, rx, task) = config.connect(); 113 102 114 103 // Spawn the client task. 115 104 let handle = tokio::spawn(task); ··· 120 109 writeln!(log, "{msg}").expect("Failed to write to log file"); 121 110 } 122 111 123 - let output = match message.deserialize() { 124 - Ok(Event::Commit(commit)) if arguments.hide_commit => false, 112 + match message.deserialize() { 113 + Ok(Event::Commit(commit)) if arguments.hide_commit => {} 125 114 Ok(Event::Commit(commit)) => { 126 115 println!("{commit:#?}"); 127 - true 128 116 } 129 - Ok(Event::Account(account)) if arguments.hide_account => false, 117 + Ok(Event::Account(account)) if arguments.hide_account => {} 130 118 Ok(Event::Account(account)) => { 131 119 println!("{account:#?}"); 132 - true 133 120 } 134 - Ok(Event::Identity(identity)) if arguments.hide_identity => false, 121 + Ok(Event::Identity(identity)) if arguments.hide_identity => {} 135 122 Ok(Event::Identity(identity)) => { 136 123 println!("{identity:#?}"); 137 - true 138 124 } 139 125 Err(error) => { 140 126 tracing::error!(?error); 141 - true 142 127 } 143 - }; 144 - 145 - if arguments.metrics && output { 146 - eprintln!("{:#?}", client.metrics()); 147 128 } 148 129 } 149 130
-10
crates/gordian-jetstream/src/client.rs
··· 9 9 10 10 use crate::Nsid; 11 11 use crate::de::Event; 12 - use crate::metrics::Metrics; 13 - use crate::metrics::MetricsData; 14 12 use crate::subscriber_options::SubscriberOptions; 15 13 use crate::task::JetstreamTaskError; 16 14 ··· 16 18 pub struct JetstreamClient { 17 19 client_tx: flume::Sender<ClientCommand>, 18 20 options: Arc<Mutex<SubscriberOptions>>, 19 - metrics: Metrics, 20 21 #[allow(unused)] 21 22 shutdown: DropGuard, 22 23 } ··· 24 27 pub(crate) fn new( 25 28 client_tx: flume::Sender<ClientCommand>, 26 29 options: Arc<Mutex<SubscriberOptions>>, 27 - metrics: Metrics, 28 30 shutdown: CancellationToken, 29 31 ) -> Self { 30 32 Self { 31 33 client_tx, 32 34 options, 33 - metrics, 34 35 shutdown: shutdown.drop_guard(), 35 36 } 36 37 } ··· 113 118 self.update_task().await?; 114 119 } 115 120 Ok(()) 116 - } 117 - 118 - #[must_use] 119 - pub fn metrics(&self) -> MetricsData { 120 - self.metrics.export() 121 121 } 122 122 123 123 /// Shutdown the Jetstream client.
+1 -4
crates/gordian-jetstream/src/client_config.rs
··· 12 12 use crate::PUBLIC_JETSTREAM_US_WEST1; 13 13 use crate::PUBLIC_JETSTREAM_US_WEST2; 14 14 use crate::client_options::ClientOptions; 15 - use crate::metrics::Metrics; 16 15 use crate::subscriber_options::SubscriberOptions; 17 16 use crate::task::JetstreamTask; 18 17 ··· 59 60 let (client_tx, client_rx) = flume::bounded(8); 60 61 61 62 let options = Arc::new(Mutex::new(self.subscriber_options)); 62 - let metrics = Metrics::new(); 63 63 let shutdown = CancellationToken::new(); 64 64 65 65 let task = JetstreamTask(Some( 66 66 crate::task::jetstream_subscriber( 67 67 event_tx, 68 68 client_rx, 69 - metrics.clone(), 70 69 self.client_options, 71 70 Arc::clone(&options), 72 71 shutdown.child_token(), ··· 72 75 .boxed(), 73 76 )); 74 77 75 - let client = JetstreamClient::new(client_tx, options, metrics, shutdown); 78 + let client = JetstreamClient::new(client_tx, options, shutdown); 76 79 let receiver = JetstreamReceiver::new(event_rx); 77 80 78 81 (client, receiver, task)
-1
crates/gordian-jetstream/src/lib.rs
··· 4 4 5 5 pub mod client_config; 6 6 pub mod client_options; 7 - pub mod metrics; 8 7 pub mod subscriber_options; 9 8 10 9 pub use client::JetstreamClient;
-68
crates/gordian-jetstream/src/metrics.rs
··· 1 - use std::sync::Arc; 2 - use std::sync::Mutex; 3 - use std::sync::MutexGuard; 4 - 5 - /// Jetstream client metrics. 6 - #[derive(Debug, Default)] 7 - pub struct Metrics { 8 - inner: Arc<Mutex<MetricsData>>, 9 - } 10 - 11 - impl Clone for Metrics { 12 - #[inline] 13 - fn clone(&self) -> Self { 14 - Self { 15 - inner: Arc::clone(&self.inner), 16 - } 17 - } 18 - } 19 - 20 - #[derive(Clone, Debug, Default)] 21 - pub struct MetricsData { 22 - pub connects: usize, 23 - pub disconnects: usize, 24 - pub timeouts: usize, 25 - pub bytes_received_raw: usize, 26 - pub bytes_received: usize, 27 - pub messages_received: usize, 28 - pub identity_messages: usize, 29 - pub account_messages: usize, 30 - pub commit_messages: usize, 31 - pub unknown_messages: usize, 32 - pub pings_received: usize, 33 - pub pongs_sent: usize, 34 - } 35 - 36 - impl Metrics { 37 - #[must_use] 38 - pub fn new() -> Self { 39 - Self::default() 40 - } 41 - 42 - /// Create a clone of the metrics data. 43 - /// 44 - /// # Panics 45 - /// 46 - /// Panics if the contained mutex has been poisoned 47 - #[must_use] 48 - pub fn export(&self) -> MetricsData { 49 - self.inner.lock().unwrap().clone() 50 - } 51 - 52 - pub(crate) fn modify<F>(&self, f: F) 53 - where 54 - F: Fn(MutexGuard<MetricsData>), 55 - { 56 - let guard = self.inner.lock().unwrap(); 57 - f(guard); 58 - } 59 - 60 - pub(crate) fn increment_message_kind(&self, kind: &str) { 61 - match kind { 62 - "account" => self.inner.lock().unwrap().account_messages += 1, 63 - "commit" => self.inner.lock().unwrap().commit_messages += 1, 64 - "identity" => self.inner.lock().unwrap().identity_messages += 1, 65 - _ => self.inner.lock().unwrap().unknown_messages += 1, 66 - } 67 - } 68 - }
+66 -38
crates/gordian-jetstream/src/task.rs
··· 18 18 19 19 use crate::client::ClientCommand; 20 20 use crate::client_options::ClientOptions; 21 - use crate::metrics::Metrics; 22 21 use crate::subscriber_options::SubscriberOptions; 22 + 23 + mod counters; 23 24 24 25 #[cfg(feature = "zstd")] 25 26 const ZSTD_DICTIONARY: &[u8] = include_bytes!("dictionary"); ··· 33 32 OptionsUpdate(TungsteniteError), 34 33 #[error("Failed to send close message: {0}")] 35 34 Close(TungsteniteError), 36 - } 37 - 38 - #[derive(Default)] 39 - struct State { 40 - metrics: Metrics, 41 35 } 42 36 43 37 pub struct JetstreamTask(pub(crate) Option<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>); ··· 78 82 pub async fn jetstream_subscriber( 79 83 event_tx: flume::Sender<Bytes>, 80 84 client_rx: flume::Receiver<ClientCommand>, 81 - metrics: Metrics, 82 85 client_options: ClientOptions, 83 86 subscriber_options: Arc<Mutex<SubscriberOptions>>, 84 87 shutdown: CancellationToken, 85 88 ) { 86 - let state = State { metrics }; 87 - let mut reconnect_backoff = client_options.reconnect_backoff_min; 89 + counters::describe(); 90 + let dispatched = metrics::counter!(counters::EVENTS_DISPATCHED); 88 91 92 + let mut reconnect_backoff = client_options.reconnect_backoff_min; 89 93 let mut instance_idx = fastrand::usize(0..client_options.instances.len()); 90 94 91 95 'outer: loop { 92 - let instance = Url::parse(&client_options.instances[instance_idx]).unwrap(); 93 - let (subscribe_url, require_hello) = 94 - subscriber_options.lock().unwrap().subscribe_url(&instance); 96 + let instance = client_options.instances[instance_idx].clone(); 97 + let labels = [("instance", instance.to_string())]; 98 + 99 + let instance_url = Url::parse(&instance).unwrap(); 100 + let (subscribe_url, require_hello) = { 101 + let options = subscriber_options.lock().unwrap(); 102 + update_options_metrics(&options); 103 + 104 + options.subscribe_url(&instance_url) 105 + }; 95 106 96 107 tracing::debug!(%subscribe_url, "connecting to jetstream"); 97 108 let uri: Uri = subscribe_url 98 109 .as_str() 99 110 .parse() 100 111 .expect("URL should be a valid URI"); 112 + 113 + metrics::counter!(counters::CONNECTION_ATTEMPTS, &labels).increment(1); 101 114 102 115 #[cfg(feature = "zstd")] 103 116 let request = ClientRequestBuilder::new(uri).with_header("Socket-Encoding", "zstd"); ··· 140 135 std::cmp::min(reconnect_backoff * 2, client_options.reconnect_backoff_max); 141 136 instance_idx = (instance_idx + 1).rem_euclid(client_options.instances.len()); 142 137 138 + metrics::counter!(counters::CONNECTION_ERRORS, &labels).increment(1); 143 139 continue; 144 140 } 145 141 None => break, 146 142 }; 147 143 148 - state.metrics.modify(|mut data| data.connects += 1); 144 + metrics::counter!(counters::CONNECTION_SUCCESS, &labels).increment(1); 149 145 let (mut write, mut read) = socket.split(); 150 146 151 147 if require_hello ··· 159 153 160 154 loop { 161 155 let message = tokio::select! { 162 - Some(Ok(outcome)) = shutdown.run_until_cancelled(handle_read_socket(&mut read, &state)) => { 156 + Some(Ok(outcome)) = shutdown.run_until_cancelled(handle_read_socket(&mut read, &labels)) => { 163 157 match outcome { 164 158 ReadOutcome::Event(message) => message, 165 159 ReadOutcome::Ping(payload) => { ··· 167 161 tracing::error!(?error, "failed to send pong"); 168 162 break; 169 163 } 170 - state.metrics.modify(|mut data| data.pongs_sent += 1); 164 + metrics::counter!(counters::WEBSOCKET_PONGS_SENT, &labels).increment(1); 171 165 continue; 172 166 } 173 167 ReadOutcome::Timeout => { 174 168 tracing::error!("time since last received message exceeds threshold"); 175 - state.metrics.modify(|mut data| data.timeouts += 1); 169 + metrics::counter!(counters::CONNECTION_TIMEOUTS, &labels).increment(1); 176 170 rewind_cursor(&subscriber_options, client_options.rewind); 177 171 break; 178 172 ··· 209 203 // Deserialize just the event timestamp and event kind. 210 204 let new_cursor = match serde_json::from_slice::<PartialEvent>(&message) { 211 205 Ok(event) => { 212 - state.metrics.increment_message_kind(event.kind); 206 + metrics::counter!(counters::EVENTS_RECEIVED, "kind" => event.kind.to_string()) 207 + .increment(1); 213 208 event.time_us.into() 214 209 } 215 210 Err(error) => { ··· 223 216 } 224 217 }; 225 218 226 - state.metrics.modify(|mut data| data.messages_received += 1); 227 219 if let Err(error) = event_tx.send_async(message).await { 228 220 let payload = error.into_inner(); 229 221 if let Ok(payload) = std::str::from_utf8(&payload) { ··· 233 227 break 'outer; 234 228 } 235 229 230 + dispatched.increment(1); 231 + 236 232 // Update the cursor since the message has been dispatched. 237 233 set_cursor(&subscriber_options, new_cursor); 234 + if let Ok(cursor) = u64::try_from(new_cursor) { 235 + metrics::counter!(counters::CURSOR, &labels).absolute(cursor); 236 + } 238 237 } 239 238 240 - state.metrics.modify(|mut data| data.disconnects += 1); 239 + metrics::counter!(counters::CONNECTION_DISCONNECTS, &labels).increment(1); 241 240 } 242 241 243 242 tracing::warn!("jetstream subscriber task ended"); ··· 257 246 258 247 async fn handle_read_socket<S>( 259 248 stream: &mut S, 260 - state: &State, 249 + labels: &[(&'static str, String)], 261 250 ) -> Result<ReadOutcome, TungsteniteError> 262 251 where 263 252 S: StreamExt<Item = Result<Message, TungsteniteError>> + Unpin, 264 253 { 265 254 #[cfg(feature = "zstd")] 266 255 let dictionary = zstd::dict::DecoderDictionary::copy(ZSTD_DICTIONARY); 256 + 257 + let bytes_recv = metrics::counter!(counters::WEBSOCKET_BYTES, labels); 258 + let bytes_raw_recv = metrics::counter!(counters::WEBSOCKET_BYTES_RAW, labels); 259 + let update_bytes_recv = |m: &metrics::Counter, value: usize| { 260 + m.increment(u64::try_from(value).unwrap_or(u64::MAX)); 261 + }; 267 262 268 263 loop { 269 264 let message = match timeout(RECV_TIMEOUT, stream.next()).await { ··· 289 272 } 290 273 #[cfg(not(feature = "zstd"))] 291 274 Message::Text(payload) => { 292 - state.metrics.modify(|mut data| { 293 - data.bytes_received_raw += payload.len(); 294 - data.bytes_received += payload.len(); 295 - }); 296 - 275 + update_bytes_recv(&bytes_raw_recv, compressed_bytes); 276 + update_bytes_recv(&bytes_recv, payload.len()); 297 277 payload.into() 298 278 } 299 279 #[cfg(feature = "zstd")] ··· 312 298 continue; 313 299 }; 314 300 315 - state.metrics.modify(|mut data| { 316 - data.bytes_received_raw += compressed_bytes; 317 - data.bytes_received += payload.len(); 318 - }); 319 - 301 + update_bytes_recv(&bytes_raw_recv, compressed_bytes); 302 + update_bytes_recv(&bytes_recv, payload.len()); 320 303 payload.into() 321 304 } 322 305 #[cfg(not(feature = "zstd"))] ··· 323 312 } 324 313 Message::Ping(payload) => { 325 314 tracing::trace!(?payload, "received ping, sending pong"); 326 - state.metrics.modify(|mut data| data.pings_received += 1); 315 + metrics::counter!(counters::WEBSOCKET_PINGS_RECEIVED, labels).increment(1); 327 316 return Ok(ReadOutcome::Ping(payload)); 328 317 } 329 318 Message::Pong(payload) => { 330 319 tracing::warn!(payload = ?std::str::from_utf8(&payload), "received unexpected pong"); 320 + metrics::counter!(counters::WEBSOCKET_PONGS_RECEIVED, labels).increment(1); 331 321 continue; 332 322 } 333 323 Message::Frame(frame) => { 334 324 tracing::warn!(frame = ?std::str::from_utf8(&frame.into_payload()), "received unexpected frame"); 325 + metrics::counter!(counters::WEBSOCKET_FRAMES_RECEIVED, labels).increment(1); 335 326 continue; 336 327 } 337 328 Message::Close(_) => { 338 329 tracing::error!("websocket closed"); 330 + metrics::counter!(counters::WEBSOCKET_CLOSE_RECEIVED, labels).increment(1); 339 331 break; 340 332 } 341 333 }; ··· 369 355 S: SinkExt<Message> + Unpin, 370 356 E: From<S::Error>, 371 357 { 372 - let update = options 373 - .lock() 374 - .unwrap() 375 - .as_subscriber_sourced_message() 376 - .to_json(); 358 + let json = { 359 + let options = options.lock().unwrap(); 360 + update_options_metrics(&options); 377 361 378 - tracing::debug!(%update, "sending options update"); 379 - sink.send(Message::Text(update.into())).await?; 362 + options.as_subscriber_sourced_message().to_json() 363 + }; 364 + 365 + tracing::debug!(%json, "sending options update"); 366 + sink.send(Message::Text(json.into())).await?; 367 + 380 368 Ok(()) 369 + } 370 + 371 + fn update_options_metrics(options: &SubscriberOptions) { 372 + metrics::gauge!(counters::SUBSCRIBED_COLLECTIONS).set( 373 + u32::try_from(options.wanted_collections.len()) 374 + .expect("u32::MAX is much larger than the maximum number of subscribed collections"), 375 + ); 376 + 377 + metrics::gauge!(counters::SUBSCRIBED_DIDS).set( 378 + u32::try_from(options.wanted_dids.len()) 379 + .expect("u32::MAX is much larger than the maximum number of subscribed DIDs"), 380 + ); 381 381 }
+49
crates/gordian-jetstream/src/task/counters.rs
··· 1 + use std::sync::OnceLock; 2 + 3 + macro_rules! metric { 4 + ($name:ident, $label:literal) => { 5 + pub const $name: &str = concat!("jetstream_", $label); 6 + }; 7 + } 8 + 9 + metric!(CONNECTION_ATTEMPTS, "connection_attempts"); 10 + metric!(CONNECTION_ERRORS, "connection_errors"); 11 + metric!(CONNECTION_SUCCESS, "connection_success"); 12 + metric!(CONNECTION_TIMEOUTS, "connection_timeouts"); 13 + metric!(CONNECTION_DISCONNECTS, "connection_disconnects"); 14 + 15 + metric!(WEBSOCKET_BYTES, "websocket_bytes_recv"); 16 + metric!(WEBSOCKET_BYTES_RAW, "websocket_bytes_raw_recv"); 17 + metric!(WEBSOCKET_PINGS_RECEIVED, "websocket_pings_recv"); 18 + metric!(WEBSOCKET_PONGS_RECEIVED, "websocket_pongs_recv"); 19 + metric!(WEBSOCKET_PONGS_SENT, "websocket_pongs_sent"); 20 + metric!(WEBSOCKET_FRAMES_RECEIVED, "websocket_frames_recv"); 21 + metric!(WEBSOCKET_CLOSE_RECEIVED, "websocket_close_recv"); 22 + 23 + metric!(EVENTS_RECEIVED, "events_recv"); 24 + metric!(EVENTS_DISPATCHED, "events_dispatched"); 25 + 26 + metric!(SUBSCRIBED_COLLECTIONS, "subscribed_collections"); 27 + metric!(SUBSCRIBED_DIDS, "subscribed_dids"); 28 + metric!(CURSOR, "cursor"); 29 + 30 + static DESCRIBED: OnceLock<()> = OnceLock::new(); 31 + 32 + pub fn describe() { 33 + use metrics::Unit; 34 + use metrics::describe_gauge; 35 + 36 + DESCRIBED.get_or_init(|| { 37 + describe_gauge!( 38 + SUBSCRIBED_COLLECTIONS, 39 + Unit::Count, 40 + "Number of collections the client is subscribed to" 41 + ); 42 + 43 + describe_gauge!( 44 + SUBSCRIBED_DIDS, 45 + Unit::Count, 46 + "Number of DIDs the client is subscribed to" 47 + ); 48 + }); 49 + }
+2 -2
justfile
··· 22 22 incus exec {{host}} -- systemctl restart gordian-knot.service 23 23 24 24 resolve *ident: 25 - cargo run --package gordian-identity --example resolve --features tracing-subscriber,tokio/rt -- {{ident}} 25 + cargo run --release --package gordian-identity --example resolve --features tracing-subscriber,tokio/rt -- {{ident}} 26 26 27 27 jetstream *args: 28 - cargo run --package gordian-jetstream --example cli --features clap -- {{args}} 28 + cargo run --release --package gordian-jetstream --example cli --features clap,metrics-exporter-prometheus -- {{args}}