Nix Observability Daemon
observability nix
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

better cli

+424 -201
+84 -11
src/daemon.rs
··· 10 10 use tokio::net::{UnixListener, UnixStream}; 11 11 use tracing::{error, info}; 12 12 13 - use crate::stats::{collect_stats, collect_trend, BucketSize}; 13 + use crate::stats::{collect_stats, collect_trend, BucketSize, SortField}; 14 14 15 15 const fn schema_hash(s: &[u8]) -> u32 { 16 16 let mut h: u32 = 2166136261; ··· 187 187 #[derive(Debug, Deserialize)] 188 188 #[serde(tag = "action", rename_all = "snake_case")] 189 189 enum ClientCommand { 190 - GetStats { since: Option<i64> }, 191 - GetTrend { since: Option<i64>, bucket: BucketSize, drv: Option<String> }, 192 - Clean, 190 + GetStats { 191 + since: Option<i64>, 192 + #[serde(default)] 193 + drv: Option<String>, 194 + #[serde(default)] 195 + sort: SortField, 196 + #[serde(default = "default_limit")] 197 + limit: u32, 198 + #[serde(default)] 199 + group: bool, 200 + }, 201 + GetTrend { 202 + since: Option<i64>, 203 + bucket: BucketSize, 204 + drv: Option<String>, 205 + }, 206 + Clean { 207 + // Unix timestamp; None means delete everything. 208 + #[serde(default)] 209 + before: Option<i64>, 210 + }, 193 211 } 212 + 213 + fn default_limit() -> u32 { 10 } 194 214 195 215 // Untagged outer enum: serde tries ClientCommand first, then NixEvent. 196 216 // The action values never overlap so routing is unambiguous. ··· 276 296 Ok(conn) 277 297 } 278 298 279 - pub async fn run_daemon(socket_path: PathBuf, db: Arc<DbConnections>) -> Result<()> { 299 + fn run_retention(conn: &Connection, retain_days: u32) -> Result<()> { 300 + assert!(retain_days > 0, "retain_days must be > 0"); 301 + let cutoff = Utc::now().timestamp() - retain_days as i64 * 86400; 302 + let deleted = conn.execute("DELETE FROM events WHERE start_time < ?1", [cutoff]) 303 + .context("Retention DELETE failed")?; 304 + conn.execute_batch("PRAGMA wal_checkpoint(PASSIVE)") 305 + .context("Retention checkpoint failed")?; 306 + if deleted > 0 { 307 + info!(deleted, retain_days, "Retention cleanup removed rows"); 308 + } 309 + Ok(()) 310 + } 311 + 312 + pub async fn run_daemon( 313 + socket_path: PathBuf, 314 + db: Arc<DbConnections>, 315 + retain_days: Option<u32>, 316 + ) -> Result<()> { 280 317 let state = Arc::new(Mutex::new(State { 281 318 active_activities: HashMap::new(), 282 319 })); ··· 286 323 .with_context(|| format!("Failed to remove existing socket at {}", socket_path.display()))?; 287 324 } 288 325 326 + // Run retention on startup, then every 6 hours. 327 + if let Some(days) = retain_days { 328 + assert!(days > 0, "retain_days must be > 0"); 329 + let db_startup = Arc::clone(&db); 330 + tokio::task::spawn_blocking(move || { 331 + run_retention(&db_startup.writer.lock().unwrap(), days) 332 + }).await??; 333 + 334 + let db_timer = Arc::clone(&db); 335 + tokio::spawn(async move { 336 + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(6 * 3600)); 337 + interval.tick().await; // first tick fires immediately; skip it (already ran above) 338 + loop { 339 + interval.tick().await; 340 + let db2 = Arc::clone(&db_timer); 341 + if let Err(e) = tokio::task::spawn_blocking(move || { 342 + run_retention(&db2.writer.lock().unwrap(), days) 343 + }).await { 344 + error!("Retention timer task failed: {}", e); 345 + } 346 + } 347 + }); 348 + } 349 + 289 350 let listener = UnixListener::bind(&socket_path) 290 351 .with_context(|| format!("Failed to bind to socket at {}", socket_path.display()))?; 291 352 info!("Daemon listening on {}", socket_path.display()); ··· 311 372 if reader.read_line(&mut line).await? == 0 { break; } 312 373 313 374 match serde_json::from_str::<SocketMessage>(line.trim()) { 314 - Ok(SocketMessage::Command(ClientCommand::GetStats { since })) => { 375 + Ok(SocketMessage::Command(ClientCommand::GetStats { since, drv, sort, limit, group })) => { 315 376 let db = Arc::clone(&db); 316 - let stats = tokio::task::spawn_blocking(move || collect_stats(&db.reader, since)) 317 - .await??; 377 + let stats = tokio::task::spawn_blocking(move || { 378 + collect_stats(&db.reader, since, drv.as_deref(), sort, limit, group) 379 + }).await??; 318 380 writer.write_all((serde_json::to_string(&stats)? + "\n").as_bytes()).await?; 319 381 break; 320 382 } ··· 325 387 writer.write_all((serde_json::to_string(&trend)? + "\n").as_bytes()).await?; 326 388 break; 327 389 } 328 - Ok(SocketMessage::Command(ClientCommand::Clean)) => { 390 + Ok(SocketMessage::Command(ClientCommand::Clean { before })) => { 329 391 let db = Arc::clone(&db); 330 392 tokio::task::spawn_blocking(move || -> Result<()> { 331 393 let conn = db.writer.lock().unwrap(); 332 - conn.execute_batch("DELETE FROM events; VACUUM; PRAGMA wal_checkpoint(TRUNCATE);")?; 394 + if let Some(ts) = before { 395 + assert!(ts > 0, "before timestamp must be positive"); 396 + let deleted = conn.execute( 397 + "DELETE FROM events WHERE start_time < ?1", [ts], 398 + ).context("Partial clean DELETE failed")?; 399 + conn.execute_batch("PRAGMA wal_checkpoint(PASSIVE)") 400 + .context("Partial clean checkpoint failed")?; 401 + info!(deleted, before = ts, "Partial database clean via socket command"); 402 + } else { 403 + conn.execute_batch("DELETE FROM events; VACUUM; PRAGMA wal_checkpoint(TRUNCATE);") 404 + .context("Full clean failed")?; 405 + info!("Database cleared via socket command"); 406 + } 333 407 Ok(()) 334 408 }).await??; 335 409 writer.write_all(b"ok\n").await?; 336 - info!("Database cleared via socket command"); 337 410 break; 338 411 } 339 412 Ok(SocketMessage::Event(event)) => {
+251 -165
src/main.rs
··· 5 5 use chrono::Utc; 6 6 use clap::{Parser, Subcommand}; 7 7 use directories::ProjectDirs; 8 + use rusqlite::{Connection, OpenFlags}; 9 + use std::io::ErrorKind; 8 10 use std::path::PathBuf; 9 - use std::sync::{Arc, Mutex}; 11 + use std::sync::Mutex; 12 + use std::sync::Arc; 10 13 use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; 11 14 use tokio::net::UnixStream; 12 15 use tracing::{error, info}; 13 16 14 17 use daemon::{open_db, run_daemon, DbConnections}; 15 - use stats::{display_stats, display_trend, display_trend_test, output_csv_trend, BucketSize, Stats, Trend}; 18 + use stats::{ 19 + collect_trend, display_stats, display_trend, display_trend_test, output_csv_trend, 20 + BucketSize, SortField, Stats, Trend, 21 + }; 16 22 17 23 #[derive(Clone, clap::ValueEnum)] 18 - enum BucketArg { 19 - Hour, 20 - Day, 21 - Week, 22 - Month, 24 + enum OutputFormat { 25 + Table, 26 + Test, 27 + Csv, 23 28 } 24 29 25 30 #[derive(Parser)] 26 31 struct Cli { 32 + /// Path to the Unix socket 33 + #[arg(long, env = "NOD_SOCKET", global = true)] 34 + socket: Option<PathBuf>, 35 + 36 + /// Path to the SQLite database (used as read-only fallback when socket is unavailable) 37 + #[arg(long, env = "NOD_DB", global = true)] 38 + db: Option<PathBuf>, 39 + 27 40 #[command(subcommand)] 28 41 command: Option<Commands>, 42 + 43 + // Query mode args — active when no subcommand is given. 44 + /// Limit to last N days 45 + #[arg(short = 'd', long)] 46 + days: Option<u32>, 47 + 48 + /// Limit to last N months 49 + #[arg(short = 'm', long)] 50 + months: Option<u32>, 51 + 52 + /// Limit to last N years 53 + #[arg(short = 'y', long)] 54 + years: Option<u32>, 55 + 56 + /// Filter to derivations matching this substring 57 + #[arg(long)] 58 + drv: Option<String>, 59 + 60 + /// Time bucket granularity — when set, shows a time series instead of aggregate stats 61 + #[arg(short = 'b', long, value_enum)] 62 + bucket: Option<BucketSize>, 63 + 64 + /// Sort the slowest builds list by this field (aggregate mode only) 65 + #[arg(long, value_enum, default_value = "duration")] 66 + sort: SortField, 67 + 68 + /// Number of entries in the slowest builds list (aggregate mode only) 69 + #[arg(short = 'n', long, default_value = "10")] 70 + limit: u32, 71 + 72 + /// Group slowest builds by derivation path and show aggregate stats (aggregate mode only) 73 + #[arg(long)] 74 + group: bool, 75 + 76 + /// Output format; test requires --bucket 77 + #[arg(short = 'o', long, value_enum, default_value = "table")] 78 + output: OutputFormat, 29 79 } 30 80 31 81 #[derive(Subcommand)] 32 82 enum Commands { 33 83 /// Run the observability daemon 34 84 Daemon { 35 - /// Path to the SQLite database file 36 - #[arg(short, long, env = "NOD_DB")] 37 - db: Option<PathBuf>, 38 - /// Path to the Unix socket 39 - #[arg(short, long, env = "NOD_SOCKET")] 40 - socket: Option<PathBuf>, 41 - }, 42 - /// Show statistics from the daemon 43 - Stats { 44 - /// Path to the Unix socket 45 - #[arg(short, long, env = "NOD_SOCKET")] 46 - socket: Option<PathBuf>, 47 - /// Limit to last N days 48 - #[arg(short = 'd', long)] 49 - days: Option<u32>, 50 - /// Limit to last N months 51 - #[arg(short = 'm', long)] 52 - months: Option<u32>, 53 - /// Limit to last N years 54 - #[arg(short = 'y', long)] 55 - years: Option<u32>, 56 - }, 57 - /// Show how builds, substitutions, and downloads trend over time 58 - Trend { 59 - /// Path to the Unix socket 60 - #[arg(short, long, env = "NOD_SOCKET")] 61 - socket: Option<PathBuf>, 62 - /// Limit to last N days 63 - #[arg(short = 'd', long)] 64 - days: Option<u32>, 65 - /// Limit to last N months 66 - #[arg(short = 'm', long)] 67 - months: Option<u32>, 68 - /// Limit to last N years 69 - #[arg(short = 'y', long)] 70 - years: Option<u32>, 71 - /// Time bucket granularity (auto-detected from period if omitted) 72 - #[arg(short = 'b', long, value_enum)] 73 - bucket: Option<BucketArg>, 74 - /// Filter builds and substitutions to derivations matching this substring 75 - #[arg(long)] 76 - drv: Option<String>, 77 - /// Run Mann-Whitney U test between adjacent periods instead of showing the plain table 78 - #[arg(long)] 79 - test: bool, 80 - /// Output as CSV 81 - #[arg(long)] 82 - csv: bool, 85 + /// Delete events older than N days; cleanup runs on startup and every 6 hours 86 + #[arg(long, env = "NOD_RETAIN_DAYS")] 87 + retain_days: Option<u32>, 83 88 }, 84 - /// Clear all data from the database 89 + /// Clear data from the database 85 90 Clean { 86 - /// Path to the Unix socket 87 - #[arg(short, long, env = "NOD_SOCKET")] 88 - socket: Option<PathBuf>, 91 + /// Delete only events older than N days (default: delete everything) 92 + #[arg(long)] 93 + before_days: Option<u32>, 89 94 }, 90 95 } 91 96 97 + fn resolve_db_path(db: Option<PathBuf>, project_dirs: &Option<ProjectDirs>) -> PathBuf { 98 + db.unwrap_or_else(|| { 99 + project_dirs.as_ref() 100 + .map(|d| d.data_dir().join("nod.db")) 101 + .unwrap_or_else(|| PathBuf::from("nod.db")) 102 + }) 103 + } 104 + 105 + fn resolve_socket_path(socket: Option<PathBuf>, project_dirs: &Option<ProjectDirs>) -> PathBuf { 106 + socket.unwrap_or_else(|| { 107 + project_dirs.as_ref() 108 + .and_then(|d| d.runtime_dir()) 109 + .map(|d| d.join("nod.sock")) 110 + .unwrap_or_else(|| PathBuf::from("/tmp/nod.sock")) 111 + }) 112 + } 113 + 114 + fn is_connection_error(e: &std::io::Error) -> bool { 115 + matches!(e.kind(), ErrorKind::NotFound | ErrorKind::ConnectionRefused) 116 + } 117 + 118 + fn open_db_readonly(path: &PathBuf) -> Result<Connection> { 119 + let conn = Connection::open_with_flags( 120 + path, 121 + OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX, 122 + ).with_context(|| format!("Failed to open database read-only at {}", path.display()))?; 123 + 124 + // Connection-level pragmas are not persisted in the file — apply on every open. 125 + conn.execute_batch(" 126 + PRAGMA journal_mode = WAL; 127 + PRAGMA temp_store = MEMORY; 128 + PRAGMA mmap_size = 134217728; 129 + PRAGMA cache_size = -8000; 130 + ").context("Failed to configure read-only database")?; 131 + 132 + Ok(conn) 133 + } 134 + 135 + fn compute_since(days: Option<u32>, months: Option<u32>, years: Option<u32>) -> Option<i64> { 136 + if days.is_none() && months.is_none() && years.is_none() { 137 + return None; 138 + } 139 + let mut t = Utc::now(); 140 + if let Some(y) = years { t = t - chrono::Months::new(y * 12); } 141 + if let Some(m) = months { t = t - chrono::Months::new(m); } 142 + if let Some(d) = days { t = t - chrono::Duration::days(d as i64); } 143 + Some(t.timestamp()) 144 + } 145 + 146 + fn display_trend_output(trend: &Trend, output: &OutputFormat) { 147 + match output { 148 + OutputFormat::Table => display_trend(trend), 149 + OutputFormat::Test => display_trend_test(trend), 150 + OutputFormat::Csv => output_csv_trend(trend), 151 + } 152 + } 153 + 92 154 #[tokio::main] 93 155 async fn main() -> Result<()> { 94 156 tracing_subscriber::fmt::init(); ··· 96 158 97 159 let project_dirs = ProjectDirs::from("org", "nixos", "nod"); 98 160 99 - let command = cli.command.unwrap_or(Commands::Stats { socket: None, days: None, months: None, years: None }); 100 - 101 - match command { 102 - Commands::Daemon { db, socket } => { 103 - let db_path = db.unwrap_or_else(|| { 104 - project_dirs.as_ref() 105 - .map(|d| d.data_dir().join("nod.db")) 106 - .unwrap_or_else(|| PathBuf::from("nod.db")) 107 - }); 161 + match cli.command { 162 + Some(Commands::Daemon { retain_days }) => { 163 + if let Some(days) = retain_days { 164 + assert!(days > 0, "--retain-days must be > 0"); 165 + } 108 166 109 - let socket_path = socket.unwrap_or_else(|| { 110 - project_dirs.as_ref() 111 - .and_then(|d| d.runtime_dir()) 112 - .map(|d| d.join("nod.sock")) 113 - .unwrap_or_else(|| PathBuf::from("/tmp/nod.sock")) 114 - }); 167 + let db_path = resolve_db_path(cli.db, &project_dirs); 168 + let socket_path = resolve_socket_path(cli.socket, &project_dirs); 115 169 116 170 if let Some(parent) = db_path.parent() { 117 171 if !parent.as_os_str().is_empty() { ··· 119 173 .with_context(|| format!("Failed to create database directory: {}", parent.display()))?; 120 174 } 121 175 } 122 - 123 176 if let Some(parent) = socket_path.parent() { 124 177 if !parent.as_os_str().is_empty() { 125 178 std::fs::create_dir_all(parent) ··· 139 192 writer: Mutex::new(writer), 140 193 reader: Mutex::new(reader), 141 194 }); 142 - run_daemon(socket_path, db).await.context("Daemon failed")? 195 + run_daemon(socket_path, db, retain_days).await.context("Daemon failed")? 143 196 } 144 - Commands::Stats { socket, days, months, years } => { 145 - let socket_path = socket.unwrap_or_else(|| { 146 - project_dirs.as_ref() 147 - .and_then(|d| d.runtime_dir()) 148 - .map(|d| d.join("nod.sock")) 149 - .unwrap_or_else(|| PathBuf::from("/tmp/nod.sock")) 197 + 198 + Some(Commands::Clean { before_days }) => { 199 + let socket_path = resolve_socket_path(cli.socket, &project_dirs); 200 + let before: Option<i64> = before_days.map(|d| { 201 + assert!(d > 0, "--before-days must be > 0"); 202 + (Utc::now() - chrono::Duration::days(d as i64)).timestamp() 150 203 }); 151 204 152 - let since: Option<i64> = if days.is_some() || months.is_some() || years.is_some() { 153 - let mut t = Utc::now(); 154 - if let Some(y) = years { t = t - chrono::Months::new(y * 12); } 155 - if let Some(m) = months { t = t - chrono::Months::new(m); } 156 - if let Some(d) = days { t = t - chrono::Duration::days(d as i64); } 157 - Some(t.timestamp()) 158 - } else { 159 - None 160 - }; 205 + match UnixStream::connect(&socket_path).await { 206 + Ok(mut stream) => { 207 + let cmd = serde_json::json!({"action": "clean", "before": before}); 208 + stream.write_all((cmd.to_string() + "\n").as_bytes()).await?; 161 209 162 - let mut stream = UnixStream::connect(&socket_path) 163 - .await 164 - .with_context(|| format!("Failed to connect to daemon at {}", socket_path.display()))?; 210 + let mut reader = BufReader::new(stream); 211 + let mut line = String::new(); 212 + reader.read_line(&mut line).await.context("Daemon closed connection")?; 165 213 166 - let cmd = serde_json::json!({"action": "get_stats", "since": since}); 167 - stream.write_all((cmd.to_string() + "\n").as_bytes()).await?; 214 + if line.trim() == "ok" { 215 + info!("Database cleaned successfully."); 216 + } else { 217 + error!("Daemon failed to clean database."); 218 + } 219 + } 220 + Err(e) if is_connection_error(&e) => { 221 + // Daemon not running — operate directly on the database. 222 + let db_path = resolve_db_path(cli.db, &project_dirs); 223 + let conn = open_db(&db_path)?; 224 + if let Some(ts) = before { 225 + let deleted = conn.execute( 226 + "DELETE FROM events WHERE start_time < ?1", [ts], 227 + ).context("Partial clean failed")?; 228 + conn.execute_batch("PRAGMA wal_checkpoint(PASSIVE)")?; 229 + info!(deleted, "Partial clean complete (direct)"); 230 + } else { 231 + conn.execute_batch("DELETE FROM events; VACUUM; PRAGMA wal_checkpoint(TRUNCATE);")?; 232 + info!("Database cleared (direct)"); 233 + } 234 + } 235 + Err(e) => return Err(e).context("Failed to connect to daemon"), 236 + } 237 + } 168 238 169 - let mut reader = BufReader::new(stream); 170 - let mut line = String::new(); 171 - reader.read_line(&mut line).await.context("Daemon closed connection without response")?; 172 - let stats: Stats = serde_json::from_str(&line).context("Invalid response from daemon")?; 173 - display_stats(stats); 239 + None => { 240 + // Query mode: aggregate stats (no --bucket) or time series (--bucket). 241 + assert!(cli.limit > 0, "--limit must be > 0"); 242 + 243 + if matches!(cli.output, OutputFormat::Test) && cli.bucket.is_none() { 244 + anyhow::bail!("--output test requires --bucket"); 245 + } 246 + 247 + let since = compute_since(cli.days, cli.months, cli.years); 248 + let socket_path = resolve_socket_path(cli.socket, &project_dirs); 249 + 250 + match UnixStream::connect(&socket_path).await { 251 + Ok(stream) => { 252 + query_via_socket(stream, since, cli.drv, cli.bucket, cli.sort, cli.limit, cli.group, cli.output).await?; 253 + } 254 + Err(e) if is_connection_error(&e) => { 255 + let db_path = resolve_db_path(cli.db, &project_dirs); 256 + query_direct(&db_path, since, cli.drv, cli.bucket, cli.sort, cli.limit, cli.group, &cli.output)?; 257 + } 258 + Err(e) => return Err(e).context("Failed to connect to daemon"), 259 + } 174 260 } 175 - Commands::Trend { socket, days, months, years, bucket, drv, test, csv } => { 176 - let socket_path = socket.unwrap_or_else(|| { 177 - project_dirs.as_ref() 178 - .and_then(|d| d.runtime_dir()) 179 - .map(|d| d.join("nod.sock")) 180 - .unwrap_or_else(|| PathBuf::from("/tmp/nod.sock")) 181 - }); 261 + } 182 262 183 - let since: Option<i64> = if days.is_some() || months.is_some() || years.is_some() { 184 - let mut t = Utc::now(); 185 - if let Some(y) = years { t = t - chrono::Months::new(y * 12); } 186 - if let Some(m) = months { t = t - chrono::Months::new(m); } 187 - if let Some(d) = days { t = t - chrono::Duration::days(d as i64); } 188 - Some(t.timestamp()) 189 - } else { 190 - None 191 - }; 263 + Ok(()) 264 + } 192 265 193 - // Auto-detect bucket granularity from the period length when not specified. 194 - let bucket_size = match bucket { 195 - Some(BucketArg::Hour) => BucketSize::Hour, 196 - Some(BucketArg::Day) => BucketSize::Day, 197 - Some(BucketArg::Week) => BucketSize::Week, 198 - Some(BucketArg::Month) => BucketSize::Month, 199 - None => match since { 200 - None => BucketSize::Month, 201 - Some(ts) => { 202 - let days_span = (Utc::now().timestamp() - ts) / 86400; 203 - if days_span <= 2 { BucketSize::Hour } 204 - else if days_span <= 60 { BucketSize::Day } 205 - else if days_span <= 365 { BucketSize::Week } 206 - else { BucketSize::Month } 207 - } 208 - }, 209 - }; 266 + async fn query_via_socket( 267 + mut stream: UnixStream, 268 + since: Option<i64>, 269 + drv: Option<String>, 270 + bucket: Option<BucketSize>, 271 + sort: SortField, 272 + limit: u32, 273 + group: bool, 274 + output: OutputFormat, 275 + ) -> Result<()> { 276 + assert!(limit > 0); 210 277 211 - let mut stream = UnixStream::connect(&socket_path) 212 - .await 213 - .with_context(|| format!("Failed to connect to daemon at {}", socket_path.display()))?; 278 + if let Some(bucket_size) = bucket { 279 + let cmd = serde_json::json!({ 280 + "action": "get_trend", 281 + "since": since, 282 + "bucket": bucket_size, 283 + "drv": drv, 284 + }); 285 + stream.write_all((cmd.to_string() + "\n").as_bytes()).await?; 214 286 215 - let cmd = serde_json::json!({"action": "get_trend", "since": since, "bucket": bucket_size, "drv": drv}); 216 - stream.write_all((cmd.to_string() + "\n").as_bytes()).await?; 287 + let mut reader = BufReader::new(stream); 288 + let mut line = String::new(); 289 + reader.read_line(&mut line).await.context("Daemon closed connection without response")?; 290 + let trend: Trend = serde_json::from_str(&line).context("Invalid response from daemon")?; 291 + display_trend_output(&trend, &output); 292 + } else { 293 + let cmd = serde_json::json!({ 294 + "action": "get_stats", 295 + "since": since, 296 + "drv": drv, 297 + "sort": sort, 298 + "limit": limit, 299 + "group": group, 300 + }); 301 + stream.write_all((cmd.to_string() + "\n").as_bytes()).await?; 217 302 218 - let mut reader = BufReader::new(stream); 219 - let mut line = String::new(); 220 - reader.read_line(&mut line).await.context("Daemon closed connection without response")?; 221 - let trend: Trend = serde_json::from_str(&line).context("Invalid response from daemon")?; 222 - if csv { output_csv_trend(&trend); } else if test { display_trend_test(&trend); } else { display_trend(&trend); } 223 - } 224 - Commands::Clean { socket } => { 225 - let socket_path = socket.unwrap_or_else(|| { 226 - project_dirs.as_ref() 227 - .and_then(|d| d.runtime_dir()) 228 - .map(|d| d.join("nod.sock")) 229 - .unwrap_or_else(|| PathBuf::from("/tmp/nod.sock")) 230 - }); 303 + let mut reader = BufReader::new(stream); 304 + let mut line = String::new(); 305 + reader.read_line(&mut line).await.context("Daemon closed connection without response")?; 306 + let stats: Stats = serde_json::from_str(&line).context("Invalid response from daemon")?; 307 + display_stats(stats); 308 + } 231 309 232 - let mut stream = UnixStream::connect(&socket_path) 233 - .await 234 - .with_context(|| format!("Failed to connect to daemon at {}", socket_path.display()))?; 310 + Ok(()) 311 + } 235 312 236 - stream.write_all(b"{\"action\":\"clean\"}\n").await?; 313 + fn query_direct( 314 + db_path: &PathBuf, 315 + since: Option<i64>, 316 + drv: Option<String>, 317 + bucket: Option<BucketSize>, 318 + sort: SortField, 319 + limit: u32, 320 + group: bool, 321 + output: &OutputFormat, 322 + ) -> Result<()> { 323 + assert!(limit > 0); 237 324 238 - let mut reader = BufReader::new(stream); 239 - let mut line = String::new(); 240 - reader.read_line(&mut line).await.context("Daemon closed connection")?; 325 + let conn = open_db_readonly(db_path)?; 326 + let conn = Mutex::new(conn); 241 327 242 - if line.trim() == "ok" { 243 - info!("Database cleared successfully."); 244 - } else { 245 - error!("Daemon failed to clear database."); 246 - } 247 - } 328 + if let Some(bucket_size) = bucket { 329 + let trend = collect_trend(&conn, since, bucket_size, drv)?; 330 + display_trend_output(&trend, output); 331 + } else { 332 + let s = stats::collect_stats(&conn, since, drv.as_deref(), sort, limit, group)?; 333 + display_stats(s); 248 334 } 249 335 250 336 Ok(())
+89 -25
src/stats.rs
··· 3 3 use serde::{Deserialize, Serialize}; 4 4 use std::sync::Mutex; 5 5 6 + #[derive(Debug, Clone, Default, Serialize, Deserialize, clap::ValueEnum)] 7 + #[serde(rename_all = "snake_case")] 8 + pub enum SortField { 9 + #[default] 10 + Duration, 11 + Count, 12 + Name, 13 + } 14 + 6 15 #[derive(Debug, Serialize, Deserialize)] 7 16 pub struct Stats { 8 17 pub build_count: i64, ··· 18 27 #[derive(Debug, Serialize, Deserialize)] 19 28 pub struct SlowBuild { 20 29 pub duration_ms: i64, 30 + pub count: Option<i64>, 21 31 pub drv_path: Option<String>, 22 32 pub text: Option<String>, 23 33 } ··· 29 39 pub count: i64, 30 40 } 31 41 32 - pub fn collect_stats(db: &Mutex<Connection>, since: Option<i64>) -> Result<Stats> { 33 - let conn = db.lock().unwrap(); 42 + pub fn collect_stats( 43 + db: &Mutex<Connection>, 44 + since: Option<i64>, 45 + drv: Option<&str>, 46 + sort: SortField, 47 + limit: u32, 48 + group: bool, 49 + ) -> Result<Stats> { 50 + assert!(limit > 0, "limit must be > 0"); 34 51 35 - let p: Option<i64> = since; 52 + let conn = db.lock().unwrap(); 36 53 37 54 let (build_count, build_total_ms, subst_count, subst_total_ms, download_bytes, download_ms) = 38 55 conn.query_row( ··· 43 60 COALESCE(SUM(duration_ms) FILTER (WHERE event_type = 108), 0), 44 61 COALESCE(SUM(total_bytes) FILTER (WHERE event_type = 101), 0), 45 62 COALESCE(SUM(duration_ms) FILTER (WHERE event_type = 101), 0) 46 - FROM events WHERE (?1 IS NULL OR start_time >= ?1)", 47 - rusqlite::params![p], 63 + FROM events 64 + WHERE (?1 IS NULL OR start_time >= ?1) 65 + AND (?2 IS NULL OR drv_path LIKE '%' || ?2 || '%')", 66 + rusqlite::params![since, drv], 48 67 |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?, r.get::<_, i64>(2)?, 49 68 r.get::<_, i64>(3)?, r.get::<_, i64>(4)?, r.get::<_, i64>(5)?)), 50 69 ).context("Failed to query summary stats")?; 51 70 52 - let mut stmt = conn.prepare( 53 - "SELECT duration_ms, drv_path, text 54 - FROM events WHERE event_type = 105 AND (?1 IS NULL OR start_time >= ?1) 55 - ORDER BY duration_ms DESC LIMIT 10", 56 - ).context("Failed to prepare slowest builds query")?; 57 - let slowest_builds: Vec<SlowBuild> = stmt.query_map(rusqlite::params![p], |r| { 58 - Ok(SlowBuild { 59 - duration_ms: r.get(0)?, 60 - drv_path: r.get(1)?, 61 - text: r.get(2)?, 62 - }) 63 - })?.filter_map(|r| r.ok()).collect(); 71 + let slowest_builds: Vec<SlowBuild> = if group { 72 + let sort_col = match sort { 73 + SortField::Duration => "avg_ms", 74 + SortField::Count => "cnt", 75 + SortField::Name => "drv_path", 76 + }; 77 + // sort_col is an internal identifier, not user input — safe to interpolate. 78 + let sql = format!( 79 + "SELECT drv_path, CAST(ROUND(AVG(duration_ms)) AS INTEGER) as avg_ms, COUNT(*) as cnt 80 + FROM events 81 + WHERE event_type = 105 82 + AND (?1 IS NULL OR start_time >= ?1) 83 + AND (?2 IS NULL OR drv_path LIKE '%' || ?2 || '%') 84 + GROUP BY drv_path 85 + ORDER BY {sort_col} DESC 86 + LIMIT ?3" 87 + ); 88 + let mut stmt = conn.prepare(&sql).context("Failed to prepare grouped builds query")?; 89 + stmt.query_map(rusqlite::params![since, drv, limit], |r| { 90 + Ok(SlowBuild { 91 + drv_path: r.get(0)?, 92 + duration_ms: r.get(1)?, 93 + count: Some(r.get(2)?), 94 + text: None, 95 + }) 96 + })?.filter_map(|r| r.ok()).collect() 97 + } else { 98 + let sort_col = match sort { 99 + SortField::Duration | SortField::Count => "duration_ms", 100 + SortField::Name => "drv_path", 101 + }; 102 + let sql = format!( 103 + "SELECT duration_ms, drv_path, text 104 + FROM events 105 + WHERE event_type = 105 106 + AND (?1 IS NULL OR start_time >= ?1) 107 + AND (?2 IS NULL OR drv_path LIKE '%' || ?2 || '%') 108 + ORDER BY {sort_col} DESC 109 + LIMIT ?3" 110 + ); 111 + let mut stmt = conn.prepare(&sql).context("Failed to prepare slowest builds query")?; 112 + stmt.query_map(rusqlite::params![since, drv, limit], |r| { 113 + Ok(SlowBuild { 114 + duration_ms: r.get(0)?, 115 + count: None, 116 + drv_path: r.get(1)?, 117 + text: r.get(2)?, 118 + }) 119 + })?.filter_map(|r| r.ok()).collect() 120 + }; 64 121 65 122 // Substitute (108) measures full substitution time per cache; QueryPathInfo (109) 66 123 // only measures metadata lookup and would undercount latency. 67 124 let mut stmt = conn.prepare( 68 125 "SELECT cache_url, AVG(duration_ms), COUNT(*) 69 - FROM events WHERE event_type = 108 AND cache_url IS NOT NULL AND (?1 IS NULL OR start_time >= ?1) 126 + FROM events 127 + WHERE event_type = 108 AND cache_url IS NOT NULL 128 + AND (?1 IS NULL OR start_time >= ?1) 70 129 GROUP BY cache_url ORDER BY AVG(duration_ms) DESC", 71 130 ).context("Failed to prepare cache latency query")?; 72 - let cache_latency: Vec<CacheStat> = stmt.query_map(rusqlite::params![p], |r| { 131 + let cache_latency: Vec<CacheStat> = stmt.query_map(rusqlite::params![since], |r| { 73 132 Ok(CacheStat { 74 133 cache_url: r.get(0)?, 75 - avg_ms: r.get(1)?, 76 - count: r.get(2)?, 134 + avg_ms: r.get(1)?, 135 + count: r.get(2)?, 77 136 }) 78 137 })?.filter_map(|r| r.ok()).collect(); 79 138 ··· 192 251 path.strip_prefix("/nix/store/").unwrap_or(path) 193 252 } 194 253 195 - #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] 254 + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, clap::ValueEnum)] 196 255 #[serde(rename_all = "snake_case")] 197 256 pub enum BucketSize { 198 257 Hour, ··· 250 309 251 310 let conn = db.lock().unwrap(); 252 311 253 - let p: Option<i64> = since; 254 312 let drv_ref = drv.as_deref(); 255 313 let fmt = bucket.strftime_fmt(); 256 314 ··· 265 323 ).context("Failed to prepare trend query")?; 266 324 267 325 let mut buckets: Vec<TrendBucket> = vec![]; 268 - for row in stmt.query_map(rusqlite::params![p, drv_ref, fmt], |r| { 326 + for row in stmt.query_map(rusqlite::params![since, drv_ref, fmt], |r| { 269 327 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)?, r.get::<_, i64>(2)?, r.get::<_, i64>(3)?)) 270 328 })?.filter_map(|r| r.ok()) { 271 329 let (b, etype, dur, bytes) = row; ··· 401 459 println!("{:<14} {:>8.1} MB avg {:>6.1} MB/s", "downloaded", mb, dl_speed); 402 460 403 461 if !stats.slowest_builds.is_empty() { 462 + let grouped = stats.slowest_builds.first().map(|b| b.count.is_some()).unwrap_or(false); 404 463 println!(); 405 464 for row in &stats.slowest_builds { 406 465 let path = row.drv_path.as_deref().or(row.text.as_deref()).unwrap_or("?"); 407 - println!("{:>9} {}", fmt_ms(row.duration_ms), drv_name(path)); 466 + if grouped { 467 + let cnt = row.count.unwrap_or(0); 468 + println!("{:>9} ({:>3}x) {}", fmt_ms(row.duration_ms), cnt, drv_name(path)); 469 + } else { 470 + println!("{:>9} {}", fmt_ms(row.duration_ms), drv_name(path)); 471 + } 408 472 } 409 473 } 410 474