Nix Observability Daemon
observability nix
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

add date filtering on stats command

+32 -11
+32 -11
src/main.rs
··· 136 136 /// Path to the Unix socket 137 137 #[arg(short, long, env = "NOD_SOCKET")] 138 138 socket: Option<PathBuf>, 139 + /// Limit to last N days 140 + #[arg(short = 'd', long)] 141 + days: Option<u32>, 142 + /// Limit to last N months 143 + #[arg(short = 'm', long)] 144 + months: Option<u32>, 145 + /// Limit to last N years 146 + #[arg(short = 'y', long)] 147 + years: Option<u32>, 139 148 }, 140 149 /// Clear all data from the database 141 150 Clean { ··· 277 286 let conn = open_db(&db_path)?; 278 287 run_daemon(socket_path, Arc::new(Mutex::new(conn))).await.context("Daemon failed")? 279 288 } 280 - Commands::Stats { socket } => { 289 + Commands::Stats { socket, days, months, years } => { 281 290 let socket_path = socket.unwrap_or_else(|| { 282 291 project_dirs.as_ref() 283 292 .and_then(|d| d.runtime_dir()) ··· 285 294 .unwrap_or_else(|| PathBuf::from("/tmp/nod.sock")) 286 295 }); 287 296 297 + // Compute the since timestamp. Flags are additive (e.g. -y 1 -d 3 = 1 year + 3 days ago). 298 + let mut since = Utc::now(); 299 + if let Some(y) = years { since = since - chrono::Months::new(y * 12); } 300 + if let Some(m) = months { since = since - chrono::Months::new(m); } 301 + if let Some(d) = days { since = since - chrono::Duration::days(d as i64); } 302 + let has_filter = days.is_some() || months.is_some() || years.is_some(); 303 + // Epoch as the "no filter" sentinel — a WHERE start_time >= epoch matches everything. 304 + let since_str = if has_filter { since.to_rfc3339() } else { "1970-01-01T00:00:00+00:00".to_string() }; 305 + 288 306 let mut stream = UnixStream::connect(&socket_path) 289 307 .await 290 308 .with_context(|| format!("Failed to connect to daemon at {}", socket_path.display()))?; 291 309 292 - stream.write_all(b"get_stats\n").await?; 310 + stream.write_all(format!("get_stats {}\n", since_str).as_bytes()).await?; 293 311 294 312 let mut reader = BufReader::new(stream); 295 313 let mut line = String::new(); ··· 361 379 if reader.read_line(&mut line).await? == 0 { break; } 362 380 let cmd = line.trim(); 363 381 364 - if cmd == "get_stats" { 382 + if cmd.starts_with("get_stats") { 383 + let since = cmd.split_whitespace().nth(1) 384 + .unwrap_or("1970-01-01T00:00:00+00:00") 385 + .to_string(); 365 386 let db = state.lock().unwrap().db.clone(); 366 - let stats = tokio::task::spawn_blocking(move || collect_stats(&db)) 387 + let stats = tokio::task::spawn_blocking(move || collect_stats(&db, &since)) 367 388 .await??; 368 389 writer.write_all((serde_json::to_string(&stats)? + "\n").as_bytes()).await?; 369 390 break; ··· 490 511 Ok(()) 491 512 } 492 513 493 - fn collect_stats(db: &Arc<Mutex<Connection>>) -> Result<Stats> { 514 + fn collect_stats(db: &Arc<Mutex<Connection>>, since: &str) -> Result<Stats> { 494 515 let conn = db.lock().unwrap(); 495 516 496 517 let (build_count, build_total_ms, subst_count, subst_total_ms, download_bytes, download_ms) = ··· 502 523 COALESCE(SUM(duration_ms) FILTER (WHERE event_type = 108), 0), 503 524 COALESCE(SUM(total_bytes) FILTER (WHERE event_type = 101), 0), 504 525 COALESCE(SUM(duration_ms) FILTER (WHERE event_type = 101), 0) 505 - FROM events", 506 - [], 526 + FROM events WHERE start_time >= ?1", 527 + [since], 507 528 |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?, r.get::<_, i64>(2)?, 508 529 r.get::<_, i64>(3)?, r.get::<_, i64>(4)?, r.get::<_, i64>(5)?)), 509 530 ).context("Failed to query summary stats")?; 510 531 511 532 let mut stmt = conn.prepare( 512 533 "SELECT duration_ms, drv_path, text 513 - FROM events WHERE event_type = 105 534 + FROM events WHERE event_type = 105 AND start_time >= ?1 514 535 ORDER BY duration_ms DESC LIMIT 10", 515 536 ).context("Failed to prepare slowest builds query")?; 516 - let slowest_builds: Vec<SlowBuild> = stmt.query_map([], |r| { 537 + let slowest_builds: Vec<SlowBuild> = stmt.query_map([since], |r| { 517 538 Ok(SlowBuild { 518 539 duration_ms: r.get(0)?, 519 540 drv_path: r.get(1)?, ··· 525 546 // per substituter, not just metadata query time (QueryPathInfo). 526 547 let mut stmt = conn.prepare( 527 548 "SELECT cache_url, AVG(duration_ms), COUNT(*) 528 - FROM events WHERE event_type = 108 AND cache_url IS NOT NULL 549 + FROM events WHERE event_type = 108 AND cache_url IS NOT NULL AND start_time >= ?1 529 550 GROUP BY cache_url ORDER BY AVG(duration_ms) DESC", 530 551 ).context("Failed to prepare cache latency query")?; 531 - let cache_latency: Vec<CacheStat> = stmt.query_map([], |r| { 552 + let cache_latency: Vec<CacheStat> = stmt.query_map([since], |r| { 532 553 Ok(CacheStat { 533 554 cache_url: r.get(0)?, 534 555 avg_ms: r.get(1)?,