Nix Observability Daemon
observability nix
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor project structure

+499 -485
+24 -29
README.md
··· 1 1 ## nod 2 2 3 - A simple self-contained daemon to gather statistics on Nix builds and substitutions using structured JSON logs. 3 + A simple daemon that collects Nix build and substitution statistics using structured JSON logs. 4 4 5 5 ## requirements 6 6 7 - - nix 2.30 or later (for `json-log-path` support). 7 + - Nix 2.30 or later (`json-log-path` support) 8 8 9 9 ## building 10 10 11 - ``` bash 11 + ```bash 12 12 cargo build --release 13 13 ``` 14 14 15 15 ## usage 16 16 17 - First you must make sure the nod daemon is running. 17 + Start the daemon: 18 18 19 19 ```bash 20 20 nod daemon 21 21 ``` 22 - By default, it listens on `/run/nod/nod.sock` and stores data in `/var/lib/nod/nod.db`. 23 - Manual setup requires write permissions for the socket. 24 22 25 - Then configure nix to output detailed trace logs to the socket. 23 + Point Nix at the socket in `nix.conf`: 26 24 27 - Add the following to your `nix.conf` (usually `/etc/nix/nix.conf` or `~/.config/nix/nix.conf`): 28 - ```text 29 - json-log-path = /run/nod/nod.sock 25 + ``` 26 + json-log-path = /run/user/1000/nod/nod.sock 30 27 ``` 31 28 32 - ### nixos 29 + Then use Nix normally. View accumulated stats with: 33 30 34 - Add the flake to your inputs and use the provided module: 31 + ```bash 32 + nod stats # all time 33 + nod stats -d 7 # last 7 days 34 + nod stats -m 3 # last 3 months 35 + nod stats -y 1 # last year 36 + nod clean # reset the database 37 + ``` 35 38 36 - ```nix 37 - services.nod.enable = true; 38 - ``` 39 - This handles the socket path, permissions, and `nix.conf` automatically. 39 + ## configuration 40 40 41 - Just use Nix as usual: 42 - ```bash 43 - nix build nixpkgs#hello 44 - ``` 41 + | flag | env | default | 42 + |------|-----|---------| 43 + | `--socket` | `NOD_SOCKET` | `$XDG_RUNTIME_DIR/nod/nod.sock` | 44 + | `--db` | `NOD_DB` | `$XDG_DATA_HOME/nod/nod.db` | 45 45 46 - Then view your stats: 47 - ```bash 48 - nod stats 49 - ``` 46 + Both directories are created automatically. The socket path in `nix.conf` must match `--socket`. 50 47 51 48 ## how? 52 49 53 - The daemon uses the `json-log-path` feature introduced in Nix 54 - 2.30. Instead of parsing the complex binary worker protocol, it 55 - consumes a structured stream of events directly from the Nix 56 - process. Relevant 57 - [code](https://github.com/NixOS/nix/blob/b4de973847370204cf28fe2092abdd21f25ee0e8/src/libutil/include/nix/util/logging.hh) 50 + Nix 2.30 added `json-log-path`, which writes a stream of structured activity events (start/result/stop) to a file or Unix socket while a build runs. nod listens on that socket, tracks in-flight activities by ID, and on each stop event inserts a completed row into a local SQLite database. `nod stats` queries that database through the daemon. 51 + 52 + Relevant Nix source: [logging.hh](https://github.com/NixOS/nix/blob/b4de973847370204cf28fe2092abdd21f25ee0e8/src/libutil/include/nix/util/logging.hh)
+346
src/daemon.rs
··· 1 + use anyhow::{Context, Result}; 2 + use chrono::{DateTime, Utc}; 3 + use rusqlite::Connection; 4 + use serde::{Deserialize, Serialize}; 5 + use std::collections::HashMap; 6 + use std::fmt; 7 + use std::path::PathBuf; 8 + use std::sync::{Arc, Mutex}; 9 + use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; 10 + use tokio::net::{UnixListener, UnixStream}; 11 + use tracing::{error, info}; 12 + 13 + use crate::stats::collect_stats; 14 + 15 + // Schema version - increment when the schema changes incompatibly. 16 + const SCHEMA_VERSION: u32 = 1; 17 + 18 + const SCHEMA: &str = " 19 + CREATE TABLE IF NOT EXISTS events ( 20 + id INTEGER PRIMARY KEY AUTOINCREMENT, 21 + nix_id INTEGER, 22 + parent_id INTEGER, 23 + event_type INTEGER, 24 + text TEXT, 25 + drv_path TEXT, 26 + cache_url TEXT, 27 + start_time TEXT, 28 + end_time TEXT, 29 + duration_ms INTEGER, 30 + total_bytes INTEGER 31 + ); 32 + "; 33 + 34 + #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] 35 + #[repr(u64)] 36 + pub enum ActivityType { 37 + Unknown = 0, 38 + CopyPath = 100, 39 + FileTransfer = 101, // Actual HTTP download 40 + Realise = 102, 41 + CopyPaths = 103, 42 + Builds = 104, 43 + Build = 105, // Individual derivation build 44 + OptimiseStore = 106, 45 + VerifyPaths = 107, 46 + Substitute = 108, // High-level cache substitution 47 + QueryPathInfo = 109, // Checking if path exists on cache 48 + PostBuildHook = 110, 49 + BuildWaiting = 111, 50 + FetchTree = 112, 51 + } 52 + 53 + impl From<u64> for ActivityType { 54 + fn from(n: u64) -> Self { 55 + match n { 56 + 100 => Self::CopyPath, 57 + 101 => Self::FileTransfer, 58 + 102 => Self::Realise, 59 + 103 => Self::CopyPaths, 60 + 104 => Self::Builds, 61 + 105 => Self::Build, 62 + 106 => Self::OptimiseStore, 63 + 107 => Self::VerifyPaths, 64 + 108 => Self::Substitute, 65 + 109 => Self::QueryPathInfo, 66 + 110 => Self::PostBuildHook, 67 + 111 => Self::BuildWaiting, 68 + 112 => Self::FetchTree, 69 + _ => Self::Unknown, 70 + } 71 + } 72 + } 73 + 74 + impl fmt::Display for ActivityType { 75 + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 76 + write!(f, "{:?}", self) 77 + } 78 + } 79 + 80 + #[derive(Debug, Clone, Copy, PartialEq, Eq)] 81 + enum ResultType { 82 + Unknown, 83 + FileLinked, 84 + BuildLogLine, 85 + UntrustedPath, 86 + CorruptedPath, 87 + SetPhase, 88 + Progress, 89 + SetExpected, 90 + PostBuildLogLine, 91 + FetchStatus, 92 + } 93 + 94 + impl From<u64> for ResultType { 95 + fn from(n: u64) -> Self { 96 + match n { 97 + 100 => Self::FileLinked, 98 + 101 => Self::BuildLogLine, 99 + 102 => Self::UntrustedPath, 100 + 103 => Self::CorruptedPath, 101 + 104 => Self::SetPhase, 102 + 105 => Self::Progress, 103 + 106 => Self::SetExpected, 104 + 107 => Self::PostBuildLogLine, 105 + 108 => Self::FetchStatus, 106 + _ => Self::Unknown, 107 + } 108 + } 109 + } 110 + 111 + impl fmt::Display for ResultType { 112 + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 113 + write!(f, "{:?}", self) 114 + } 115 + } 116 + 117 + #[derive(Debug, Deserialize, Serialize)] 118 + struct NixEvent { 119 + action: String, 120 + id: u64, 121 + // Raw number: ActivityType for "start", ResultType for "result", absent on "stop" 122 + #[serde(default, rename = "type")] 123 + event_type: u64, 124 + #[serde(default)] 125 + text: String, 126 + #[serde(default)] 127 + fields: Vec<serde_json::Value>, 128 + #[serde(default)] 129 + parent: u64, 130 + } 131 + 132 + struct Activity { 133 + id: u64, 134 + parent_id: u64, 135 + event_type: u64, 136 + text: String, 137 + start_time: DateTime<Utc>, 138 + fields: Vec<serde_json::Value>, 139 + total_bytes: u64, 140 + } 141 + 142 + struct State { 143 + active_activities: HashMap<u64, Activity>, 144 + db: Arc<Mutex<Connection>>, 145 + } 146 + 147 + pub fn open_db(path: &PathBuf) -> Result<Connection> { 148 + let conn = Connection::open(path) 149 + .with_context(|| format!("Failed to open database at {}", path.display()))?; 150 + 151 + // Check schema version; reset if mismatched. 152 + let version: u32 = conn 153 + .query_row("PRAGMA user_version", [], |r| r.get(0)) 154 + .unwrap_or(0); 155 + 156 + if version != 0 && version != SCHEMA_VERSION { 157 + info!(current = version, expected = SCHEMA_VERSION, "Schema version mismatch, resetting database"); 158 + drop(conn); 159 + std::fs::remove_file(path) 160 + .with_context(|| format!("Failed to remove stale database at {}", path.display()))?; 161 + return open_db(path); 162 + } 163 + 164 + conn.execute_batch(SCHEMA) 165 + .context("Failed to create schema")?; 166 + conn.execute_batch(&format!("PRAGMA user_version = {}", SCHEMA_VERSION)) 167 + .context("Failed to set schema version")?; 168 + conn.execute_batch(" 169 + PRAGMA journal_mode = WAL; 170 + PRAGMA synchronous = NORMAL; 171 + PRAGMA locking_mode = EXCLUSIVE; 172 + PRAGMA temp_store = MEMORY; 173 + PRAGMA mmap_size = 134217728; 174 + PRAGMA cache_size = -8000; 175 + PRAGMA wal_autocheckpoint = 0; 176 + ").context("Failed to configure database")?; 177 + 178 + Ok(conn) 179 + } 180 + 181 + pub async fn run_daemon(socket_path: PathBuf, db: Arc<Mutex<Connection>>) -> Result<()> { 182 + let state = Arc::new(Mutex::new(State { 183 + active_activities: HashMap::new(), 184 + db, 185 + })); 186 + 187 + if socket_path.exists() { 188 + std::fs::remove_file(&socket_path) 189 + .with_context(|| format!("Failed to remove existing socket at {}", socket_path.display()))?; 190 + } 191 + 192 + let listener = UnixListener::bind(&socket_path) 193 + .with_context(|| format!("Failed to bind to socket at {}", socket_path.display()))?; 194 + info!("Daemon listening on {}", socket_path.display()); 195 + 196 + loop { 197 + let (stream, _) = listener.accept().await?; 198 + let state = Arc::clone(&state); 199 + tokio::spawn(async move { 200 + if let Err(e) = handle_connection(stream, state).await { 201 + error!("Connection error: {}", e); 202 + } 203 + }); 204 + } 205 + } 206 + 207 + async fn handle_connection(mut stream: UnixStream, state: Arc<Mutex<State>>) -> Result<()> { 208 + let (reader, mut writer) = stream.split(); 209 + let mut reader = BufReader::new(reader); 210 + let mut line = String::new(); 211 + loop { 212 + line.clear(); 213 + if reader.read_line(&mut line).await? == 0 { break; } 214 + let cmd = line.trim(); 215 + 216 + if cmd.starts_with("get_stats") { 217 + let since = cmd.split_whitespace().nth(1) 218 + .unwrap_or("1970-01-01T00:00:00+00:00") 219 + .to_string(); 220 + let db = state.lock().unwrap().db.clone(); 221 + let stats = tokio::task::spawn_blocking(move || collect_stats(&db, &since)) 222 + .await??; 223 + writer.write_all((serde_json::to_string(&stats)? + "\n").as_bytes()).await?; 224 + break; 225 + } 226 + if cmd == "clean" { 227 + let db = state.lock().unwrap().db.clone(); 228 + tokio::task::spawn_blocking(move || -> Result<()> { 229 + let conn = db.lock().unwrap(); 230 + conn.execute_batch("DELETE FROM events; VACUUM; PRAGMA wal_checkpoint(TRUNCATE);")?; 231 + Ok(()) 232 + }).await??; 233 + writer.write_all(b"ok\n").await?; 234 + info!("Database cleared via socket command"); 235 + break; 236 + } 237 + 238 + if let Ok(event) = serde_json::from_str::<NixEvent>(cmd) { 239 + process_event(event, &state)?; 240 + } 241 + } 242 + Ok(()) 243 + } 244 + 245 + fn process_event(event: NixEvent, state: &Arc<Mutex<State>>) -> Result<()> { 246 + let mut s = state.lock().unwrap(); 247 + 248 + match event.action.as_str() { 249 + "start" => { 250 + let act_type = ActivityType::from(event.event_type); 251 + let text = if !event.text.is_empty() { 252 + event.text.clone() 253 + } else { 254 + event.fields.get(0).and_then(|v| v.as_str()).unwrap_or("").to_string() 255 + }; 256 + 257 + info!( 258 + id = event.id, 259 + parent = event.parent, 260 + act_type = %act_type, 261 + text = %text, 262 + fields = ?event.fields, 263 + "start" 264 + ); 265 + 266 + s.active_activities.insert(event.id, Activity { 267 + id: event.id, 268 + parent_id: event.parent, 269 + event_type: event.event_type, 270 + text, 271 + start_time: Utc::now(), 272 + fields: event.fields, 273 + total_bytes: 0, 274 + }); 275 + } 276 + "result" => { 277 + let res_type = ResultType::from(event.event_type); 278 + 279 + if res_type != ResultType::BuildLogLine && res_type != ResultType::Progress && res_type != ResultType::SetExpected { 280 + info!( 281 + id = event.id, 282 + res_type = %res_type, 283 + fields = ?event.fields, 284 + "result" 285 + ); 286 + } 287 + 288 + if let Some(act) = s.active_activities.get_mut(&event.id) { 289 + if res_type == ResultType::Progress { 290 + if let Some(total) = event.fields.get(1).and_then(|v| v.as_u64()) { 291 + if total > 0 { act.total_bytes = total; } 292 + } 293 + } 294 + } 295 + } 296 + "stop" => { 297 + if let Some(act) = s.active_activities.remove(&event.id) { 298 + let act_type = ActivityType::from(act.event_type); 299 + let end_time = Utc::now(); 300 + let duration_ms = end_time.signed_duration_since(act.start_time).num_milliseconds(); 301 + 302 + info!( 303 + id = act.id, 304 + act_type = %act_type, 305 + duration_ms, 306 + total_bytes = act.total_bytes, 307 + text = %act.text, 308 + fields = ?act.fields, 309 + "stop" 310 + ); 311 + 312 + let drv_path = act.fields.get(0).and_then(|v| v.as_str()).map(|s| s.to_string()); 313 + // Both Substitute (fields[1] = substituter URL) and QueryPathInfo (fields[1] = cache URL) 314 + // have the same layout. Store cache_url for both. 315 + let cache_url = match act_type { 316 + ActivityType::Substitute | ActivityType::QueryPathInfo => { 317 + act.fields.get(1).and_then(|v| v.as_str()).map(|s| s.to_string()) 318 + } 319 + _ => None, 320 + }; 321 + 322 + let conn = s.db.lock().unwrap(); 323 + conn.execute( 324 + "INSERT INTO events (nix_id, parent_id, event_type, text, drv_path, cache_url, start_time, end_time, duration_ms, total_bytes) 325 + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)", 326 + rusqlite::params![ 327 + act.id as i64, 328 + act.parent_id as i64, 329 + act.event_type as i64, 330 + act.text, 331 + drv_path, 332 + cache_url, 333 + act.start_time.to_rfc3339(), 334 + end_time.to_rfc3339(), 335 + duration_ms, 336 + act.total_bytes as i64, 337 + ], 338 + ).context("Failed to insert event")?; 339 + } 340 + } 341 + _ => { 342 + info!(action = %event.action, id = event.id, "unknown action"); 343 + } 344 + } 345 + Ok(()) 346 + }
+7 -456
src/main.rs
··· 1 + mod daemon; 2 + mod stats; 3 + 1 4 use anyhow::{Context, Result}; 2 - use chrono::{DateTime, Utc}; 5 + use chrono::Utc; 3 6 use clap::{Parser, Subcommand}; 4 7 use directories::ProjectDirs; 5 - use rusqlite::Connection; 6 - use serde::{Deserialize, Serialize}; 7 - use std::collections::HashMap; 8 - use std::fmt; 9 8 use std::path::PathBuf; 10 9 use std::sync::{Arc, Mutex}; 11 10 use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; 12 - use tokio::net::{UnixListener, UnixStream}; 11 + use tokio::net::UnixStream; 13 12 use tracing::{error, info}; 14 13 15 - // Schema version - increment when the schema changes incompatibly. 16 - const SCHEMA_VERSION: u32 = 1; 17 - 18 - const SCHEMA: &str = " 19 - CREATE TABLE IF NOT EXISTS events ( 20 - id INTEGER PRIMARY KEY AUTOINCREMENT, 21 - nix_id INTEGER, 22 - parent_id INTEGER, 23 - event_type INTEGER, 24 - text TEXT, 25 - drv_path TEXT, 26 - cache_url TEXT, 27 - start_time TEXT, 28 - end_time TEXT, 29 - duration_ms INTEGER, 30 - total_bytes INTEGER 31 - ); 32 - "; 33 - 34 - #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] 35 - #[repr(u64)] 36 - enum ActivityType { 37 - Unknown = 0, 38 - CopyPath = 100, 39 - FileTransfer = 101, // Actual HTTP download 40 - Realise = 102, 41 - CopyPaths = 103, 42 - Builds = 104, 43 - Build = 105, // Individual derivation build 44 - OptimiseStore = 106, 45 - VerifyPaths = 107, 46 - Substitute = 108, // High-level cache substitution 47 - QueryPathInfo = 109, // Checking if path exists on cache 48 - PostBuildHook = 110, 49 - BuildWaiting = 111, 50 - FetchTree = 112, 51 - } 52 - 53 - impl From<u64> for ActivityType { 54 - fn from(n: u64) -> Self { 55 - match n { 56 - 100 => Self::CopyPath, 57 - 101 => Self::FileTransfer, 58 - 102 => Self::Realise, 59 - 103 => Self::CopyPaths, 60 - 104 => Self::Builds, 61 - 105 => Self::Build, 62 - 106 => Self::OptimiseStore, 63 - 107 => Self::VerifyPaths, 64 - 108 => Self::Substitute, 65 - 109 => Self::QueryPathInfo, 66 - 110 => Self::PostBuildHook, 67 - 111 => Self::BuildWaiting, 68 - 112 => Self::FetchTree, 69 - _ => Self::Unknown, 70 - } 71 - } 72 - } 73 - 74 - impl fmt::Display for ActivityType { 75 - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 76 - write!(f, "{:?}", self) 77 - } 78 - } 79 - 80 - #[derive(Debug, Clone, Copy, PartialEq, Eq)] 81 - enum ResultType { 82 - Unknown, 83 - FileLinked, 84 - BuildLogLine, 85 - UntrustedPath, 86 - CorruptedPath, 87 - SetPhase, 88 - Progress, 89 - SetExpected, 90 - PostBuildLogLine, 91 - FetchStatus, 92 - } 93 - 94 - impl From<u64> for ResultType { 95 - fn from(n: u64) -> Self { 96 - match n { 97 - 100 => Self::FileLinked, 98 - 101 => Self::BuildLogLine, 99 - 102 => Self::UntrustedPath, 100 - 103 => Self::CorruptedPath, 101 - 104 => Self::SetPhase, 102 - 105 => Self::Progress, 103 - 106 => Self::SetExpected, 104 - 107 => Self::PostBuildLogLine, 105 - 108 => Self::FetchStatus, 106 - _ => Self::Unknown, 107 - } 108 - } 109 - } 110 - 111 - impl fmt::Display for ResultType { 112 - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 113 - write!(f, "{:?}", self) 114 - } 115 - } 14 + use daemon::{open_db, run_daemon}; 15 + use stats::{display_stats, Stats}; 116 16 117 17 #[derive(Parser)] 118 18 struct Cli { ··· 154 54 }, 155 55 } 156 56 157 - #[derive(Debug, Deserialize, Serialize)] 158 - struct NixEvent { 159 - action: String, 160 - id: u64, 161 - // Raw number: ActivityType for "start", ResultType for "result", absent on "stop" 162 - #[serde(default, rename = "type")] 163 - event_type: u64, 164 - #[serde(default)] 165 - text: String, 166 - #[serde(default)] 167 - fields: Vec<serde_json::Value>, 168 - #[serde(default)] 169 - parent: u64, 170 - } 171 - 172 - // Sent over the socket as JSON - all computed values, no raw types 173 - #[derive(Debug, Serialize, Deserialize)] 174 - struct Stats { 175 - build_count: i64, 176 - build_total_ms: i64, 177 - subst_count: i64, 178 - subst_total_ms: i64, 179 - download_bytes: i64, 180 - download_ms: i64, 181 - slowest_builds: Vec<SlowBuild>, 182 - cache_latency: Vec<CacheStat>, 183 - } 184 - 185 - #[derive(Debug, Serialize, Deserialize)] 186 - struct SlowBuild { 187 - duration_ms: i64, 188 - drv_path: Option<String>, 189 - text: Option<String>, 190 - } 191 - 192 - #[derive(Debug, Serialize, Deserialize)] 193 - struct CacheStat { 194 - cache_url: String, 195 - avg_ms: f64, 196 - count: i64, 197 - } 198 - 199 - struct Activity { 200 - id: u64, 201 - parent_id: u64, 202 - event_type: u64, 203 - text: String, 204 - start_time: DateTime<Utc>, 205 - fields: Vec<serde_json::Value>, 206 - total_bytes: u64, 207 - } 208 - 209 - struct State { 210 - active_activities: HashMap<u64, Activity>, 211 - db: Arc<Mutex<Connection>>, 212 - } 213 - 214 - fn open_db(path: &PathBuf) -> Result<Connection> { 215 - let conn = Connection::open(path) 216 - .with_context(|| format!("Failed to open database at {}", path.display()))?; 217 - 218 - // Check schema version; reset if mismatched. 219 - let version: u32 = conn 220 - .query_row("PRAGMA user_version", [], |r| r.get(0)) 221 - .unwrap_or(0); 222 - 223 - if version != 0 && version != SCHEMA_VERSION { 224 - info!(current = version, expected = SCHEMA_VERSION, "Schema version mismatch, resetting database"); 225 - drop(conn); 226 - std::fs::remove_file(path) 227 - .with_context(|| format!("Failed to remove stale database at {}", path.display()))?; 228 - return open_db(path); 229 - } 230 - 231 - conn.execute_batch(SCHEMA) 232 - .context("Failed to create schema")?; 233 - conn.execute_batch(&format!("PRAGMA user_version = {}", SCHEMA_VERSION)) 234 - .context("Failed to set schema version")?; 235 - conn.execute_batch(" 236 - PRAGMA journal_mode = WAL; 237 - PRAGMA synchronous = NORMAL; 238 - PRAGMA locking_mode = EXCLUSIVE; 239 - PRAGMA temp_store = MEMORY; 240 - PRAGMA mmap_size = 134217728; 241 - PRAGMA cache_size = -8000; 242 - PRAGMA wal_autocheckpoint = 0; 243 - ").context("Failed to configure database")?; 244 - 245 - Ok(conn) 246 - } 247 - 248 57 #[tokio::main] 249 58 async fn main() -> Result<()> { 250 59 tracing_subscriber::fmt::init(); ··· 343 152 344 153 Ok(()) 345 154 } 346 - 347 - async fn run_daemon(socket_path: PathBuf, db: Arc<Mutex<Connection>>) -> Result<()> { 348 - let state = Arc::new(Mutex::new(State { 349 - active_activities: HashMap::new(), 350 - db, 351 - })); 352 - 353 - if socket_path.exists() { 354 - std::fs::remove_file(&socket_path) 355 - .with_context(|| format!("Failed to remove existing socket at {}", socket_path.display()))?; 356 - } 357 - 358 - let listener = UnixListener::bind(&socket_path) 359 - .with_context(|| format!("Failed to bind to socket at {}", socket_path.display()))?; 360 - info!("Daemon listening on {}", socket_path.display()); 361 - 362 - loop { 363 - let (stream, _) = listener.accept().await?; 364 - let state = Arc::clone(&state); 365 - tokio::spawn(async move { 366 - if let Err(e) = handle_connection(stream, state).await { 367 - error!("Connection error: {}", e); 368 - } 369 - }); 370 - } 371 - } 372 - 373 - async fn handle_connection(mut stream: UnixStream, state: Arc<Mutex<State>>) -> Result<()> { 374 - let (reader, mut writer) = stream.split(); 375 - let mut reader = BufReader::new(reader); 376 - let mut line = String::new(); 377 - loop { 378 - line.clear(); 379 - if reader.read_line(&mut line).await? == 0 { break; } 380 - let cmd = line.trim(); 381 - 382 - if cmd.starts_with("get_stats") { 383 - let since = cmd.split_whitespace().nth(1) 384 - .unwrap_or("1970-01-01T00:00:00+00:00") 385 - .to_string(); 386 - let db = state.lock().unwrap().db.clone(); 387 - let stats = tokio::task::spawn_blocking(move || collect_stats(&db, &since)) 388 - .await??; 389 - writer.write_all((serde_json::to_string(&stats)? + "\n").as_bytes()).await?; 390 - break; 391 - } 392 - if cmd == "clean" { 393 - let db = state.lock().unwrap().db.clone(); 394 - tokio::task::spawn_blocking(move || -> Result<()> { 395 - let conn = db.lock().unwrap(); 396 - conn.execute_batch("DELETE FROM events; VACUUM; PRAGMA wal_checkpoint(TRUNCATE);")?; 397 - Ok(()) 398 - }).await??; 399 - writer.write_all(b"ok\n").await?; 400 - info!("Database cleared via socket command"); 401 - break; 402 - } 403 - 404 - if let Ok(event) = serde_json::from_str::<NixEvent>(cmd) { 405 - process_event(event, &state)?; 406 - } 407 - } 408 - Ok(()) 409 - } 410 - 411 - fn process_event(event: NixEvent, state: &Arc<Mutex<State>>) -> Result<()> { 412 - let mut s = state.lock().unwrap(); 413 - 414 - match event.action.as_str() { 415 - "start" => { 416 - let act_type = ActivityType::from(event.event_type); 417 - let text = if !event.text.is_empty() { 418 - event.text.clone() 419 - } else { 420 - event.fields.get(0).and_then(|v| v.as_str()).unwrap_or("").to_string() 421 - }; 422 - 423 - info!( 424 - id = event.id, 425 - parent = event.parent, 426 - act_type = %act_type, 427 - text = %text, 428 - fields = ?event.fields, 429 - "start" 430 - ); 431 - 432 - s.active_activities.insert(event.id, Activity { 433 - id: event.id, 434 - parent_id: event.parent, 435 - event_type: event.event_type, 436 - text, 437 - start_time: Utc::now(), 438 - fields: event.fields, 439 - total_bytes: 0, 440 - }); 441 - } 442 - "result" => { 443 - let res_type = ResultType::from(event.event_type); 444 - 445 - if res_type != ResultType::BuildLogLine && res_type != ResultType::Progress && res_type != ResultType::SetExpected { 446 - info!( 447 - id = event.id, 448 - res_type = %res_type, 449 - fields = ?event.fields, 450 - "result" 451 - ); 452 - } 453 - 454 - if let Some(act) = s.active_activities.get_mut(&event.id) { 455 - if res_type == ResultType::Progress { 456 - if let Some(total) = event.fields.get(1).and_then(|v| v.as_u64()) { 457 - if total > 0 { act.total_bytes = total; } 458 - } 459 - } 460 - } 461 - } 462 - "stop" => { 463 - if let Some(act) = s.active_activities.remove(&event.id) { 464 - let act_type = ActivityType::from(act.event_type); 465 - let end_time = Utc::now(); 466 - let duration_ms = end_time.signed_duration_since(act.start_time).num_milliseconds(); 467 - 468 - info!( 469 - id = act.id, 470 - act_type = %act_type, 471 - duration_ms, 472 - total_bytes = act.total_bytes, 473 - text = %act.text, 474 - fields = ?act.fields, 475 - "stop" 476 - ); 477 - 478 - let drv_path = act.fields.get(0).and_then(|v| v.as_str()).map(|s| s.to_string()); 479 - // Both Substitute (fields[1] = substituter URL) and QueryPathInfo (fields[1] = cache URL) 480 - // have the same layout. Store cache_url for both. 481 - let cache_url = match act_type { 482 - ActivityType::Substitute | ActivityType::QueryPathInfo => { 483 - act.fields.get(1).and_then(|v| v.as_str()).map(|s| s.to_string()) 484 - } 485 - _ => None, 486 - }; 487 - 488 - let conn = s.db.lock().unwrap(); 489 - conn.execute( 490 - "INSERT INTO events (nix_id, parent_id, event_type, text, drv_path, cache_url, start_time, end_time, duration_ms, total_bytes) 491 - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)", 492 - rusqlite::params![ 493 - act.id as i64, 494 - act.parent_id as i64, 495 - act.event_type as i64, 496 - act.text, 497 - drv_path, 498 - cache_url, 499 - act.start_time.to_rfc3339(), 500 - end_time.to_rfc3339(), 501 - duration_ms, 502 - act.total_bytes as i64, 503 - ], 504 - ).context("Failed to insert event")?; 505 - } 506 - } 507 - _ => { 508 - info!(action = %event.action, id = event.id, "unknown action"); 509 - } 510 - } 511 - Ok(()) 512 - } 513 - 514 - fn collect_stats(db: &Arc<Mutex<Connection>>, since: &str) -> Result<Stats> { 515 - let conn = db.lock().unwrap(); 516 - 517 - let (build_count, build_total_ms, subst_count, subst_total_ms, download_bytes, download_ms) = 518 - conn.query_row( 519 - "SELECT 520 - COUNT(*) FILTER (WHERE event_type = 105), 521 - COALESCE(SUM(duration_ms) FILTER (WHERE event_type = 105), 0), 522 - COUNT(*) FILTER (WHERE event_type = 108), 523 - COALESCE(SUM(duration_ms) FILTER (WHERE event_type = 108), 0), 524 - COALESCE(SUM(total_bytes) FILTER (WHERE event_type = 101), 0), 525 - COALESCE(SUM(duration_ms) FILTER (WHERE event_type = 101), 0) 526 - FROM events WHERE start_time >= ?1", 527 - [since], 528 - |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?, r.get::<_, i64>(2)?, 529 - r.get::<_, i64>(3)?, r.get::<_, i64>(4)?, r.get::<_, i64>(5)?)), 530 - ).context("Failed to query summary stats")?; 531 - 532 - let mut stmt = conn.prepare( 533 - "SELECT duration_ms, drv_path, text 534 - FROM events WHERE event_type = 105 AND start_time >= ?1 535 - ORDER BY duration_ms DESC LIMIT 10", 536 - ).context("Failed to prepare slowest builds query")?; 537 - let slowest_builds: Vec<SlowBuild> = stmt.query_map([since], |r| { 538 - Ok(SlowBuild { 539 - duration_ms: r.get(0)?, 540 - drv_path: r.get(1)?, 541 - text: r.get(2)?, 542 - }) 543 - })?.filter_map(|r| r.ok()).collect(); 544 - 545 - // Use Substitute (108) events for cache latency — these measure full download time 546 - // per substituter, not just metadata query time (QueryPathInfo). 547 - let mut stmt = conn.prepare( 548 - "SELECT cache_url, AVG(duration_ms), COUNT(*) 549 - FROM events WHERE event_type = 108 AND cache_url IS NOT NULL AND start_time >= ?1 550 - GROUP BY cache_url ORDER BY AVG(duration_ms) DESC", 551 - ).context("Failed to prepare cache latency query")?; 552 - let cache_latency: Vec<CacheStat> = stmt.query_map([since], |r| { 553 - Ok(CacheStat { 554 - cache_url: r.get(0)?, 555 - avg_ms: r.get(1)?, 556 - count: r.get(2)?, 557 - }) 558 - })?.filter_map(|r| r.ok()).collect(); 559 - 560 - Ok(Stats { build_count, build_total_ms, subst_count, subst_total_ms, download_bytes, download_ms, slowest_builds, cache_latency }) 561 - } 562 - 563 - fn fmt_ms(ms: i64) -> String { 564 - if ms < 1000 { 565 - format!("{}ms", ms) 566 - } else if ms < 60_000 { 567 - format!("{:.1}s", ms as f64 / 1000.0) 568 - } else { 569 - format!("{}m{:.1}s", ms / 60_000, (ms % 60_000) as f64 / 1000.0) 570 - } 571 - } 572 - 573 - // Strip /nix/store/ prefix, keep the hash and package name. 574 - fn drv_name(path: &str) -> &str { 575 - path.strip_prefix("/nix/store/").unwrap_or(path) 576 - } 577 - 578 - fn display_stats(stats: Stats) { 579 - let build_avg = if stats.build_count > 0 { stats.build_total_ms / stats.build_count } else { 0 }; 580 - let subst_avg = if stats.subst_count > 0 { stats.subst_total_ms / stats.subst_count } else { 0 }; 581 - let mb = stats.download_bytes as f64 / 1_048_576.0; 582 - let dl_speed = if stats.download_ms > 0 { mb / (stats.download_ms as f64 / 1000.0) } else { 0.0 }; 583 - 584 - println!("{:<14} {:>6} total {:>9} avg {:>9}", "built", stats.build_count, fmt_ms(stats.build_total_ms), fmt_ms(build_avg)); 585 - println!("{:<14} {:>6} total {:>9} avg {:>9}", "substituted", stats.subst_count, fmt_ms(stats.subst_total_ms), fmt_ms(subst_avg)); 586 - println!("{:<14} {:>8.1} MB avg {:>6.1} MB/s", "downloaded", mb, dl_speed); 587 - 588 - if !stats.slowest_builds.is_empty() { 589 - println!(); 590 - for row in &stats.slowest_builds { 591 - let path = row.drv_path.as_deref().or(row.text.as_deref()).unwrap_or("?"); 592 - println!("{:>9} {}", fmt_ms(row.duration_ms), drv_name(path)); 593 - } 594 - } 595 - 596 - if !stats.cache_latency.is_empty() { 597 - let url_w = stats.cache_latency.iter().map(|r| r.cache_url.len()).max().unwrap_or(0); 598 - println!(); 599 - for row in &stats.cache_latency { 600 - println!("{:<width$} avg {:>7} {:>6} queries", row.cache_url, fmt_ms(row.avg_ms as i64), row.count, width = url_w); 601 - } 602 - } 603 - }
+122
src/stats.rs
··· 1 + use anyhow::{Context, Result}; 2 + use rusqlite::Connection; 3 + use serde::{Deserialize, Serialize}; 4 + use std::sync::{Arc, Mutex}; 5 + 6 + // Sent over the socket as JSON - all computed values, no raw types 7 + #[derive(Debug, Serialize, Deserialize)] 8 + pub struct Stats { 9 + pub build_count: i64, 10 + pub build_total_ms: i64, 11 + pub subst_count: i64, 12 + pub subst_total_ms: i64, 13 + pub download_bytes: i64, 14 + pub download_ms: i64, 15 + pub slowest_builds: Vec<SlowBuild>, 16 + pub cache_latency: Vec<CacheStat>, 17 + } 18 + 19 + #[derive(Debug, Serialize, Deserialize)] 20 + pub struct SlowBuild { 21 + pub duration_ms: i64, 22 + pub drv_path: Option<String>, 23 + pub text: Option<String>, 24 + } 25 + 26 + #[derive(Debug, Serialize, Deserialize)] 27 + pub struct CacheStat { 28 + pub cache_url: String, 29 + pub avg_ms: f64, 30 + pub count: i64, 31 + } 32 + 33 + pub fn collect_stats(db: &Arc<Mutex<Connection>>, since: &str) -> Result<Stats> { 34 + let conn = db.lock().unwrap(); 35 + 36 + let (build_count, build_total_ms, subst_count, subst_total_ms, download_bytes, download_ms) = 37 + conn.query_row( 38 + "SELECT 39 + COUNT(*) FILTER (WHERE event_type = 105), 40 + COALESCE(SUM(duration_ms) FILTER (WHERE event_type = 105), 0), 41 + COUNT(*) FILTER (WHERE event_type = 108), 42 + COALESCE(SUM(duration_ms) FILTER (WHERE event_type = 108), 0), 43 + COALESCE(SUM(total_bytes) FILTER (WHERE event_type = 101), 0), 44 + COALESCE(SUM(duration_ms) FILTER (WHERE event_type = 101), 0) 45 + FROM events WHERE start_time >= ?1", 46 + [since], 47 + |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?, r.get::<_, i64>(2)?, 48 + r.get::<_, i64>(3)?, r.get::<_, i64>(4)?, r.get::<_, i64>(5)?)), 49 + ).context("Failed to query summary stats")?; 50 + 51 + let mut stmt = conn.prepare( 52 + "SELECT duration_ms, drv_path, text 53 + FROM events WHERE event_type = 105 AND start_time >= ?1 54 + ORDER BY duration_ms DESC LIMIT 10", 55 + ).context("Failed to prepare slowest builds query")?; 56 + let slowest_builds: Vec<SlowBuild> = stmt.query_map([since], |r| { 57 + Ok(SlowBuild { 58 + duration_ms: r.get(0)?, 59 + drv_path: r.get(1)?, 60 + text: r.get(2)?, 61 + }) 62 + })?.filter_map(|r| r.ok()).collect(); 63 + 64 + // Use Substitute (108) events for cache latency — these measure full download time 65 + // per substituter, not just metadata query time (QueryPathInfo). 66 + let mut stmt = conn.prepare( 67 + "SELECT cache_url, AVG(duration_ms), COUNT(*) 68 + FROM events WHERE event_type = 108 AND cache_url IS NOT NULL AND start_time >= ?1 69 + GROUP BY cache_url ORDER BY AVG(duration_ms) DESC", 70 + ).context("Failed to prepare cache latency query")?; 71 + let cache_latency: Vec<CacheStat> = stmt.query_map([since], |r| { 72 + Ok(CacheStat { 73 + cache_url: r.get(0)?, 74 + avg_ms: r.get(1)?, 75 + count: r.get(2)?, 76 + }) 77 + })?.filter_map(|r| r.ok()).collect(); 78 + 79 + Ok(Stats { build_count, build_total_ms, subst_count, subst_total_ms, download_bytes, download_ms, slowest_builds, cache_latency }) 80 + } 81 + 82 + fn fmt_ms(ms: i64) -> String { 83 + if ms < 1000 { 84 + format!("{}ms", ms) 85 + } else if ms < 60_000 { 86 + format!("{:.1}s", ms as f64 / 1000.0) 87 + } else { 88 + format!("{}m{:.1}s", ms / 60_000, (ms % 60_000) as f64 / 1000.0) 89 + } 90 + } 91 + 92 + // Strip /nix/store/ prefix, keep the hash and package name. 93 + fn drv_name(path: &str) -> &str { 94 + path.strip_prefix("/nix/store/").unwrap_or(path) 95 + } 96 + 97 + pub fn display_stats(stats: Stats) { 98 + let build_avg = if stats.build_count > 0 { stats.build_total_ms / stats.build_count } else { 0 }; 99 + let subst_avg = if stats.subst_count > 0 { stats.subst_total_ms / stats.subst_count } else { 0 }; 100 + let mb = stats.download_bytes as f64 / 1_048_576.0; 101 + let dl_speed = if stats.download_ms > 0 { mb / (stats.download_ms as f64 / 1000.0) } else { 0.0 }; 102 + 103 + println!("{:<14} {:>6} total {:>9} avg {:>9}", "built", stats.build_count, fmt_ms(stats.build_total_ms), fmt_ms(build_avg)); 104 + println!("{:<14} {:>6} total {:>9} avg {:>9}", "substituted", stats.subst_count, fmt_ms(stats.subst_total_ms), fmt_ms(subst_avg)); 105 + println!("{:<14} {:>8.1} MB avg {:>6.1} MB/s", "downloaded", mb, dl_speed); 106 + 107 + if !stats.slowest_builds.is_empty() { 108 + println!(); 109 + for row in &stats.slowest_builds { 110 + let path = row.drv_path.as_deref().or(row.text.as_deref()).unwrap_or("?"); 111 + println!("{:>9} {}", fmt_ms(row.duration_ms), drv_name(path)); 112 + } 113 + } 114 + 115 + if !stats.cache_latency.is_empty() { 116 + let url_w = stats.cache_latency.iter().map(|r| r.cache_url.len()).max().unwrap_or(0); 117 + println!(); 118 + for row in &stats.cache_latency { 119 + println!("{:<width$} avg {:>7} {:>6} queries", row.cache_url, fmt_ms(row.avg_ms as i64), row.count, width = url_w); 120 + } 121 + } 122 + }