Nix Observability Daemon
observability nix
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

aggregation based query approach

+512 -56
+47 -8
benches/queries.rs
··· 1 1 use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; 2 - use nod::stats::{collect_stats, collect_trend, BucketSize, SortField}; 2 + use nod::stats::{collect_stats, collect_trend, BucketSize, SortField, TodaySummary}; 3 3 use rusqlite::Connection; 4 4 use std::sync::Mutex; 5 5 ··· 17 17 start_time INTEGER, end_time INTEGER, 18 18 duration_ms INTEGER, total_bytes INTEGER 19 19 ); 20 - CREATE INDEX idx_events_type_start ON events(event_type, start_time); 21 - CREATE INDEX idx_events_start_time ON events(start_time); 20 + CREATE INDEX idx_events_type_start ON events(event_type, start_time, duration_ms, total_bytes); 21 + CREATE INDEX idx_events_start_cover ON events(start_time, event_type, duration_ms, total_bytes); 22 + CREATE TABLE daily_stats ( 23 + day INTEGER NOT NULL, event_type INTEGER NOT NULL, 24 + count INTEGER NOT NULL DEFAULT 0, total_ms INTEGER NOT NULL DEFAULT 0, 25 + total_bytes INTEGER NOT NULL DEFAULT 0, 26 + PRIMARY KEY (day, event_type) 27 + ); 28 + CREATE TABLE daily_cache_stats ( 29 + day INTEGER NOT NULL, cache_url TEXT NOT NULL, 30 + count INTEGER NOT NULL DEFAULT 0, total_ms INTEGER NOT NULL DEFAULT 0, 31 + PRIMARY KEY (day, cache_url) 32 + ); 22 33 PRAGMA journal_mode = WAL; 23 34 PRAGMA synchronous = NORMAL; 24 35 ").unwrap(); ··· 49 60 } 50 61 51 62 conn.execute_batch("COMMIT").unwrap(); 63 + 64 + // Backfill daily_stats from seeded events (mirrors daemon migration v5). 65 + conn.execute_batch(" 66 + INSERT INTO daily_stats (day, event_type, count, total_ms, total_bytes) 67 + SELECT start_time / 86400, event_type, 68 + COUNT(*), COALESCE(SUM(duration_ms), 0), COALESCE(SUM(total_bytes), 0) 69 + FROM events WHERE event_type IN (101, 105, 108) 70 + GROUP BY 1, 2 71 + ON CONFLICT (day, event_type) DO UPDATE SET 72 + count = count + excluded.count, 73 + total_ms = total_ms + excluded.total_ms, 74 + total_bytes = total_bytes + excluded.total_bytes; 75 + INSERT INTO daily_cache_stats (day, cache_url, count, total_ms) 76 + SELECT start_time / 86400, cache_url, COUNT(*), COALESCE(SUM(duration_ms), 0) 77 + FROM events WHERE event_type = 108 AND cache_url IS NOT NULL 78 + GROUP BY 1, 2 79 + ON CONFLICT (day, cache_url) DO UPDATE SET 80 + count = count + excluded.count, 81 + total_ms = total_ms + excluded.total_ms; 82 + ").unwrap(); 52 83 } 53 84 54 85 fn bench_collect_stats(c: &mut Criterion) { 55 86 let mut group = c.benchmark_group("collect_stats"); 56 87 88 + // All bench data is from 2023 (BASE_TIME). Using today's real Unix day means all 89 + // seeded rows are closed historical days in daily_stats; today snapshot is empty. 90 + // This matches normal daemon usage: summary from daily_stats, slowest_builds from events. 91 + let today_day = std::time::SystemTime::now() 92 + .duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() as i64 / 86400; 93 + let today_snap = TodaySummary { day: today_day, ..Default::default() }; 94 + 57 95 for n in [10_000usize, 100_000, 1_000_000] { 58 96 let conn = Connection::open_in_memory().unwrap(); 59 97 seed(&conn, n); 60 98 let db = Mutex::new(conn); 61 99 62 - // All-time: no index benefit on start_time, exercises full scan path. 100 + let t = today_snap.clone(); 63 101 group.bench_with_input(BenchmarkId::new("all_time", n), &n, |b, _| { 64 - b.iter(|| collect_stats(&db, None, None, SortField::Duration, 10, false).unwrap()) 102 + b.iter(|| collect_stats(&db, None, None, SortField::Duration, 10, false, Some(t.clone())).unwrap()) 65 103 }); 66 104 67 - // Recent 10%: composite index (event_type, start_time) prunes 90% of rows. 68 105 let since = Some(BASE_TIME + YEAR_SPAN * 9 / 10); 106 + let t = today_snap.clone(); 69 107 group.bench_with_input(BenchmarkId::new("recent_10pct", n), &n, |b, _| { 70 - b.iter(|| collect_stats(&db, since, None, SortField::Duration, 10, false).unwrap()) 108 + b.iter(|| collect_stats(&db, since, None, SortField::Duration, 10, false, Some(t.clone())).unwrap()) 71 109 }); 72 110 111 + let t = today_snap.clone(); 73 112 group.bench_with_input(BenchmarkId::new("grouped", n), &n, |b, _| { 74 - b.iter(|| collect_stats(&db, None, None, SortField::Count, 10, true).unwrap()) 113 + b.iter(|| collect_stats(&db, None, None, SortField::Count, 10, true, Some(t.clone())).unwrap()) 75 114 }); 76 115 } 77 116
+326 -16
src/daemon.rs
··· 10 10 use tokio::net::{UnixListener, UnixStream}; 11 11 use tracing::{error, info}; 12 12 13 - use crate::stats::{collect_stats, collect_trend, BucketSize, SortField}; 13 + use crate::stats::{collect_stats, collect_trend, BucketSize, SortField, TodaySummary}; 14 14 15 15 const fn schema_hash(s: &[u8]) -> u32 { 16 16 let mut h: u32 = 2166136261; ··· 34 34 duration_ms INTEGER, 35 35 total_bytes INTEGER 36 36 ); 37 - CREATE INDEX IF NOT EXISTS idx_events_type_start ON events(event_type, start_time); 38 - CREATE INDEX IF NOT EXISTS idx_events_start_time ON events(start_time); 37 + CREATE INDEX IF NOT EXISTS idx_events_type_start ON events(event_type, start_time, duration_ms, total_bytes); 38 + CREATE INDEX IF NOT EXISTS idx_events_start_cover ON events(start_time, event_type, duration_ms, total_bytes); 39 + CREATE TABLE IF NOT EXISTS daily_stats ( 40 + day INTEGER NOT NULL, 41 + event_type INTEGER NOT NULL, 42 + count INTEGER NOT NULL DEFAULT 0, 43 + total_ms INTEGER NOT NULL DEFAULT 0, 44 + total_bytes INTEGER NOT NULL DEFAULT 0, 45 + PRIMARY KEY (day, event_type) 46 + ); 47 + CREATE TABLE IF NOT EXISTS daily_cache_stats ( 48 + day INTEGER NOT NULL, 49 + cache_url TEXT NOT NULL, 50 + count INTEGER NOT NULL DEFAULT 0, 51 + total_ms INTEGER NOT NULL DEFAULT 0, 52 + PRIMARY KEY (day, cache_url) 53 + ); 39 54 "; 40 55 41 56 const SCHEMA_HASHES: &[u32] = &[ 42 57 0x9bc94a70, // v1: TEXT timestamps, no indexes 43 - 0xee061d32, // v2: INTEGER timestamps (Unix seconds), idx_events_type_start 44 - 0x7c09711e, // v3: idx_events_start_time 58 + 0xee061d32, // v2: INTEGER timestamps (Unix seconds), idx_events_type_start(event_type, start_time) 59 + 0x7c09711e, // v3: idx_events_start_time(start_time) 60 + 0x24268227, // v4: idx_events_type_start extended to cover (duration_ms, total_bytes); idx_events_start_cover added 61 + 0xfcbb3598, // v5: daily_stats + daily_cache_stats tables 45 62 ]; 46 63 const SCHEMA_VERSION: u32 = SCHEMA_HASHES.len() as u32; 47 64 const _: () = assert!( ··· 77 94 CREATE INDEX IF NOT EXISTS idx_events_type_start ON events(event_type, start_time); 78 95 "), 79 96 (3, "CREATE INDEX IF NOT EXISTS idx_events_start_time ON events(start_time);"), 97 + (4, " 98 + DROP INDEX IF EXISTS idx_events_type_start; 99 + DROP INDEX IF EXISTS idx_events_start_time; 100 + CREATE INDEX idx_events_type_start ON events(event_type, start_time, duration_ms, total_bytes); 101 + CREATE INDEX idx_events_start_cover ON events(start_time, event_type, duration_ms, total_bytes); 102 + "), 103 + // v5: create daily aggregate tables and backfill all closed days from events. 104 + // Today's data is excluded (day < today) — the daemon rebuilds today from events on startup. 105 + // This migration may take several seconds on large databases. 106 + (5, " 107 + CREATE TABLE IF NOT EXISTS daily_stats ( 108 + day INTEGER NOT NULL, 109 + event_type INTEGER NOT NULL, 110 + count INTEGER NOT NULL DEFAULT 0, 111 + total_ms INTEGER NOT NULL DEFAULT 0, 112 + total_bytes INTEGER NOT NULL DEFAULT 0, 113 + PRIMARY KEY (day, event_type) 114 + ); 115 + CREATE TABLE IF NOT EXISTS daily_cache_stats ( 116 + day INTEGER NOT NULL, 117 + cache_url TEXT NOT NULL, 118 + count INTEGER NOT NULL DEFAULT 0, 119 + total_ms INTEGER NOT NULL DEFAULT 0, 120 + PRIMARY KEY (day, cache_url) 121 + ); 122 + INSERT INTO daily_stats (day, event_type, count, total_ms, total_bytes) 123 + SELECT start_time / 86400, event_type, 124 + COUNT(*), COALESCE(SUM(duration_ms), 0), COALESCE(SUM(total_bytes), 0) 125 + FROM events 126 + WHERE event_type IN (101, 105, 108) 127 + AND start_time / 86400 < strftime('%s', 'now') / 86400 128 + GROUP BY 1, 2 129 + ON CONFLICT (day, event_type) DO UPDATE SET 130 + count = count + excluded.count, 131 + total_ms = total_ms + excluded.total_ms, 132 + total_bytes = total_bytes + excluded.total_bytes; 133 + INSERT INTO daily_cache_stats (day, cache_url, count, total_ms) 134 + SELECT start_time / 86400, cache_url, COUNT(*), COALESCE(SUM(duration_ms), 0) 135 + FROM events 136 + WHERE event_type = 108 AND cache_url IS NOT NULL 137 + AND start_time / 86400 < strftime('%s', 'now') / 86400 138 + GROUP BY 1, 2 139 + ON CONFLICT (day, cache_url) DO UPDATE SET 140 + count = count + excluded.count, 141 + total_ms = total_ms + excluded.total_ms; 142 + "), 80 143 ]; 81 144 82 145 #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] ··· 235 298 total_bytes: u64, 236 299 } 237 300 301 + // Per-day aggregate cache, maintained in memory for the current calendar day (UTC). 302 + // Flushed to daily_stats/daily_cache_stats when the day rolls over. 303 + // Rebuilt from events on daemon startup via rebuild_today(). 304 + struct TodayStats { 305 + day: i64, // Unix day: start_time / 86400 306 + build_count: i64, 307 + build_total_ms: i64, 308 + subst_count: i64, 309 + subst_total_ms: i64, 310 + download_count: i64, 311 + download_bytes: i64, 312 + download_ms: i64, 313 + cache: HashMap<String, (i64, i64)>, // url → (total_ms, count) 314 + } 315 + 316 + impl TodayStats { 317 + fn new(day: i64) -> Self { 318 + assert!(day > 0); 319 + TodayStats { day, build_count: 0, build_total_ms: 0, subst_count: 0, 320 + subst_total_ms: 0, download_count: 0, download_bytes: 0, 321 + download_ms: 0, cache: HashMap::new() } 322 + } 323 + 324 + fn snapshot(&self) -> TodaySummary { 325 + TodaySummary { 326 + day: self.day, 327 + build_count: self.build_count, 328 + build_total_ms: self.build_total_ms, 329 + subst_count: self.subst_count, 330 + subst_total_ms: self.subst_total_ms, 331 + download_count: self.download_count, 332 + download_bytes: self.download_bytes, 333 + download_ms: self.download_ms, 334 + cache: self.cache.clone(), 335 + } 336 + } 337 + } 338 + 238 339 struct State { 239 340 active_activities: HashMap<u64, Activity>, 341 + today: TodayStats, 240 342 } 241 343 242 344 pub struct DbConnections { ··· 317 419 Ok(()) 318 420 } 319 421 422 + // Flush a completed day's in-memory stats to the persistent daily_stats tables. 423 + fn flush_today(conn: &Connection, today: &TodayStats) -> Result<()> { 424 + assert!(today.day > 0); 425 + let day = today.day; 426 + 427 + if today.build_count > 0 { 428 + conn.execute( 429 + "INSERT INTO daily_stats (day, event_type, count, total_ms, total_bytes) 430 + VALUES (?1, 105, ?2, ?3, 0) 431 + ON CONFLICT (day, event_type) DO UPDATE SET 432 + count = count + excluded.count, 433 + total_ms = total_ms + excluded.total_ms", 434 + rusqlite::params![day, today.build_count, today.build_total_ms], 435 + ).context("flush_today: build insert failed")?; 436 + } 437 + if today.subst_count > 0 { 438 + conn.execute( 439 + "INSERT INTO daily_stats (day, event_type, count, total_ms, total_bytes) 440 + VALUES (?1, 108, ?2, ?3, 0) 441 + ON CONFLICT (day, event_type) DO UPDATE SET 442 + count = count + excluded.count, 443 + total_ms = total_ms + excluded.total_ms", 444 + rusqlite::params![day, today.subst_count, today.subst_total_ms], 445 + ).context("flush_today: subst insert failed")?; 446 + } 447 + if today.download_count > 0 { 448 + conn.execute( 449 + "INSERT INTO daily_stats (day, event_type, count, total_ms, total_bytes) 450 + VALUES (?1, 101, ?2, ?3, ?4) 451 + ON CONFLICT (day, event_type) DO UPDATE SET 452 + count = count + excluded.count, 453 + total_ms = total_ms + excluded.total_ms, 454 + total_bytes = total_bytes + excluded.total_bytes", 455 + rusqlite::params![day, today.download_count, today.download_ms, today.download_bytes], 456 + ).context("flush_today: download insert failed")?; 457 + } 458 + for (url, &(ms, cnt)) in &today.cache { 459 + assert!(cnt > 0); 460 + conn.execute( 461 + "INSERT INTO daily_cache_stats (day, cache_url, count, total_ms) 462 + VALUES (?1, ?2, ?3, ?4) 463 + ON CONFLICT (day, cache_url) DO UPDATE SET 464 + count = count + excluded.count, 465 + total_ms = total_ms + excluded.total_ms", 466 + rusqlite::params![day, url, cnt, ms], 467 + ).context("flush_today: cache insert failed")?; 468 + } 469 + Ok(()) 470 + } 471 + 472 + // Upsert a single past-day event directly into daily_stats. 473 + // Called for events whose start_time predates the current calendar day 474 + // (e.g. a long-running build that started yesterday and finished today). 475 + fn upsert_daily( 476 + conn: &Connection, day: i64, event_type: i64, 477 + duration_ms: i64, total_bytes: i64, cache_url: Option<&str>, 478 + ) -> Result<()> { 479 + assert!(day > 0); 480 + conn.execute( 481 + "INSERT INTO daily_stats (day, event_type, count, total_ms, total_bytes) 482 + VALUES (?1, ?2, 1, ?3, ?4) 483 + ON CONFLICT (day, event_type) DO UPDATE SET 484 + count = count + 1, 485 + total_ms = total_ms + excluded.total_ms, 486 + total_bytes = total_bytes + excluded.total_bytes", 487 + rusqlite::params![day, event_type, duration_ms, total_bytes], 488 + ).context("upsert_daily: insert failed")?; 489 + 490 + if let Some(url) = cache_url { 491 + conn.execute( 492 + "INSERT INTO daily_cache_stats (day, cache_url, count, total_ms) 493 + VALUES (?1, ?2, 1, ?3) 494 + ON CONFLICT (day, cache_url) DO UPDATE SET 495 + count = count + 1, 496 + total_ms = total_ms + excluded.total_ms", 497 + rusqlite::params![day, url, duration_ms], 498 + ).context("upsert_daily: cache insert failed")?; 499 + } 500 + Ok(()) 501 + } 502 + 503 + // Rebuild today's in-memory stats from the events table for today's UTC day. 504 + // Runs once at daemon startup — scans only today's rows, which is fast. 505 + fn rebuild_today(conn: &Connection, today_day: i64) -> Result<TodayStats> { 506 + assert!(today_day > 0); 507 + let today_ts = today_day * 86400; 508 + let tomorrow_ts = today_ts + 86400; 509 + let mut t = TodayStats::new(today_day); 510 + 511 + let (bc, bms) = conn.query_row( 512 + "SELECT COUNT(*), COALESCE(SUM(duration_ms),0) 513 + FROM events INDEXED BY idx_events_type_start 514 + WHERE event_type = 105 AND start_time >= ?1 AND start_time < ?2", 515 + [today_ts, tomorrow_ts], 516 + |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?)), 517 + ).context("rebuild_today: build query failed")?; 518 + t.build_count = bc; t.build_total_ms = bms; 519 + 520 + let (sc, sms) = conn.query_row( 521 + "SELECT COUNT(*), COALESCE(SUM(duration_ms),0) 522 + FROM events INDEXED BY idx_events_type_start 523 + WHERE event_type = 108 AND start_time >= ?1 AND start_time < ?2", 524 + [today_ts, tomorrow_ts], 525 + |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?)), 526 + ).context("rebuild_today: subst query failed")?; 527 + t.subst_count = sc; t.subst_total_ms = sms; 528 + 529 + let (dc, db, dms) = conn.query_row( 530 + "SELECT COUNT(*), COALESCE(SUM(total_bytes),0), COALESCE(SUM(duration_ms),0) 531 + FROM events INDEXED BY idx_events_type_start 532 + WHERE event_type = 101 AND start_time >= ?1 AND start_time < ?2", 533 + [today_ts, tomorrow_ts], 534 + |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?, r.get::<_, i64>(2)?)), 535 + ).context("rebuild_today: download query failed")?; 536 + t.download_count = dc; t.download_bytes = db; t.download_ms = dms; 537 + 538 + let mut stmt = conn.prepare( 539 + "SELECT cache_url, SUM(duration_ms), COUNT(*) 540 + FROM events INDEXED BY idx_events_type_start 541 + WHERE event_type = 108 AND cache_url IS NOT NULL 542 + AND start_time >= ?1 AND start_time < ?2 543 + GROUP BY cache_url", 544 + ).context("rebuild_today: cache query failed")?; 545 + for row in stmt.query_map([today_ts, tomorrow_ts], |r| { 546 + Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)?, r.get::<_, i64>(2)?)) 547 + })?.filter_map(|r| r.ok()) { 548 + let (url, ms, cnt) = row; 549 + t.cache.insert(url, (ms, cnt)); 550 + } 551 + 552 + assert!(t.build_count >= 0); 553 + assert!(t.subst_count >= 0); 554 + assert!(t.download_count >= 0); 555 + info!(today_day, build_count = t.build_count, subst_count = t.subst_count, "Rebuilt today stats from events"); 556 + Ok(t) 557 + } 558 + 320 559 pub async fn run_daemon( 321 560 socket_path: PathBuf, 322 561 db: Arc<DbConnections>, 323 562 retain_days: Option<u32>, 324 563 ) -> Result<()> { 564 + let today_day = Utc::now().timestamp() / 86400; 565 + let today = { 566 + let conn = db.reader.lock().unwrap(); 567 + rebuild_today(&conn, today_day)? 568 + }; 569 + 325 570 let state = Arc::new(Mutex::new(State { 326 571 active_activities: HashMap::new(), 572 + today, 327 573 })); 328 574 329 575 if socket_path.exists() { ··· 381 627 382 628 match serde_json::from_str::<SocketMessage>(line.trim()) { 383 629 Ok(SocketMessage::Command(ClientCommand::GetStats { since, drv, sort, limit, group })) => { 630 + let today_snap = state.lock().unwrap().today.snapshot(); 384 631 let db = Arc::clone(&db); 385 632 let stats = tokio::task::spawn_blocking(move || { 386 - collect_stats(&db.reader, since, drv.as_deref(), sort, limit, group) 633 + collect_stats(&db.reader, since, drv.as_deref(), sort, limit, group, Some(today_snap)) 387 634 }).await??; 388 635 writer.write_all((serde_json::to_string(&stats)? + "\n").as_bytes()).await?; 389 636 break; ··· 489 736 _ => None, 490 737 }; 491 738 739 + let event_day = act.start_time.timestamp() / 86400; 740 + let real_today = end_time.timestamp() / 86400; 741 + assert!(event_day > 0); 742 + assert!(real_today >= event_day, "end_time must be >= start_time"); 743 + 744 + // Check for day rollover: real calendar day has advanced past our memory snapshot. 745 + let old_today = if s.today.day > 0 && s.today.day < real_today { 746 + Some(std::mem::replace(&mut s.today, TodayStats::new(real_today))) 747 + } else { 748 + if s.today.day == 0 { s.today.day = real_today; } 749 + None 750 + }; 751 + 752 + // Update in-memory today only for events whose start_time is today. 753 + // Past-day events (long-running builds that spanned midnight) are written 754 + // directly to daily_stats after the events insert. 755 + let is_today = event_day == real_today; 756 + if is_today { 757 + match act_type { 758 + ActivityType::Build => { 759 + s.today.build_count += 1; 760 + s.today.build_total_ms += duration_ms; 761 + } 762 + ActivityType::Substitute => { 763 + s.today.subst_count += 1; 764 + s.today.subst_total_ms += duration_ms; 765 + if let Some(ref url) = cache_url { 766 + let e = s.today.cache.entry(url.clone()).or_insert((0, 0)); 767 + e.0 += duration_ms; e.1 += 1; 768 + } 769 + } 770 + ActivityType::FileTransfer => { 771 + s.today.download_count += 1; 772 + s.today.download_bytes += act.total_bytes as i64; 773 + s.today.download_ms += duration_ms; 774 + } 775 + _ => {} 776 + } 777 + } 778 + 492 779 drop(s); 493 - db.writer.lock().unwrap().execute( 494 - "INSERT INTO events (nix_id, parent_id, event_type, text, drv_path, cache_url, start_time, end_time, duration_ms, total_bytes) 495 - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)", 496 - rusqlite::params![ 497 - act.id as i64, act.parent_id as i64, act.event_type as i64, 498 - act.text, drv_path, cache_url, 499 - act.start_time.timestamp(), end_time.timestamp(), 500 - duration_ms, act.total_bytes as i64, 501 - ], 502 - ).context("Failed to insert event")?; 780 + 781 + { 782 + let conn = db.writer.lock().unwrap(); 783 + conn.execute( 784 + "INSERT INTO events (nix_id, parent_id, event_type, text, drv_path, cache_url, start_time, end_time, duration_ms, total_bytes) 785 + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)", 786 + rusqlite::params![ 787 + act.id as i64, act.parent_id as i64, act.event_type as i64, 788 + act.text, drv_path, cache_url.as_deref(), 789 + act.start_time.timestamp(), end_time.timestamp(), 790 + duration_ms, act.total_bytes as i64, 791 + ], 792 + ).context("Failed to insert event")?; 793 + 794 + if let Some(ref old) = old_today { 795 + flush_today(&conn, old).context("Failed to flush old day to daily_stats")?; 796 + } 797 + if !is_today { 798 + upsert_daily(&conn, event_day, act.event_type as i64, 799 + duration_ms, act.total_bytes as i64, cache_url.as_deref()) 800 + .context("Failed to upsert past-day event to daily_stats")?; 801 + } 802 + } 503 803 } 504 804 } 505 805 } 506 806 507 807 Ok(()) 508 808 } 809 + 810 + #[cfg(test)] 811 + mod tests { 812 + use super::*; 813 + 814 + #[test] 815 + fn schema_hash_value() { 816 + println!("SCHEMA hash = {:#010x}", schema_hash(SCHEMA.as_bytes())); 817 + } 818 + }
+1 -1
src/main.rs
··· 326 326 let trend = collect_trend(&conn, since, bucket_size, drv)?; 327 327 display_trend_output(&trend, output); 328 328 } else { 329 - let s = nod::stats::collect_stats(&conn, since, drv.as_deref(), sort, limit, group)?; 329 + let s = nod::stats::collect_stats(&conn, since, drv.as_deref(), sort, limit, group, None)?; 330 330 display_stats(s); 331 331 } 332 332
+138 -31
src/stats.rs
··· 2 2 use chrono::prelude::*; 3 3 use rusqlite::Connection; 4 4 use serde::{Deserialize, Serialize}; 5 + use std::collections::HashMap; 5 6 use std::sync::Mutex; 6 7 7 8 #[derive(Debug, Clone, Default, Serialize, Deserialize, clap::ValueEnum)] ··· 40 41 pub count: i64, 41 42 } 42 43 43 - // Three separate queries — one per event type — so the composite index 44 - // (event_type, start_time) can be used for each rather than doing a full 45 - // table scan with per-aggregate FILTER conditions. 46 - pub fn collect_stats( 47 - db: &Mutex<Connection>, 44 + // Snapshot of the current day's in-memory aggregates, passed from the daemon 45 + // to collect_stats so the summary path never needs to scan the events table. 46 + // cache: substituter URL → (total_ms, count) 47 + #[derive(Clone, Default)] 48 + pub struct TodaySummary { 49 + pub day: i64, 50 + pub build_count: i64, 51 + pub build_total_ms: i64, 52 + pub subst_count: i64, 53 + pub subst_total_ms: i64, 54 + pub download_count: i64, 55 + pub download_bytes: i64, 56 + pub download_ms: i64, 57 + pub cache: HashMap<String, (i64, i64)>, 58 + } 59 + 60 + // Fast path: query daily_stats for closed days, merge today's in-memory snapshot. 61 + // Returns (build_count, build_ms, subst_count, subst_ms, dl_bytes, dl_ms, cache_latency). 62 + fn summary_from_cache( 63 + conn: &Connection, 48 64 since: Option<i64>, 49 - drv: Option<&str>, 50 - sort: SortField, 51 - limit: u32, 52 - group: bool, 53 - ) -> Result<Stats> { 54 - assert!(limit > 0, "limit must be > 0"); 65 + today: &TodaySummary, 66 + ) -> Result<(i64, i64, i64, i64, i64, i64, Vec<CacheStat>)> { 67 + assert!(today.day > 0); 68 + let since_day: Option<i64> = since.map(|ts| ts / 86400); 69 + let today_in_range = since_day.map_or(true, |sd| today.day >= sd); 70 + 71 + let (hbc, hbms) = conn.query_row( 72 + "SELECT COALESCE(SUM(count),0), COALESCE(SUM(total_ms),0) 73 + FROM daily_stats WHERE event_type = 105 74 + AND (?1 IS NULL OR day >= ?1) AND day < ?2", 75 + rusqlite::params![since_day, today.day], 76 + |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?)), 77 + ).context("Failed to query build summary from daily_stats")?; 78 + 79 + let (hsc, hsms) = conn.query_row( 80 + "SELECT COALESCE(SUM(count),0), COALESCE(SUM(total_ms),0) 81 + FROM daily_stats WHERE event_type = 108 82 + AND (?1 IS NULL OR day >= ?1) AND day < ?2", 83 + rusqlite::params![since_day, today.day], 84 + |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?)), 85 + ).context("Failed to query subst summary from daily_stats")?; 86 + 87 + let (hdb, hdms) = conn.query_row( 88 + "SELECT COALESCE(SUM(total_bytes),0), COALESCE(SUM(total_ms),0) 89 + FROM daily_stats WHERE event_type = 101 90 + AND (?1 IS NULL OR day >= ?1) AND day < ?2", 91 + rusqlite::params![since_day, today.day], 92 + |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?)), 93 + ).context("Failed to query download summary from daily_stats")?; 94 + 95 + let build_count = hbc + if today_in_range { today.build_count } else { 0 }; 96 + let build_total_ms = hbms + if today_in_range { today.build_total_ms } else { 0 }; 97 + let subst_count = hsc + if today_in_range { today.subst_count } else { 0 }; 98 + let subst_total_ms = hsms + if today_in_range { today.subst_total_ms } else { 0 }; 99 + let download_bytes = hdb + if today_in_range { today.download_bytes } else { 0 }; 100 + let download_ms = hdms + if today_in_range { today.download_ms } else { 0 }; 101 + 102 + // Cache latency: closed days from daily_cache_stats, today from memory. 103 + let mut cache_map: HashMap<String, (i64, i64)> = HashMap::new(); 104 + let mut stmt = conn.prepare( 105 + "SELECT cache_url, SUM(total_ms), SUM(count) 106 + FROM daily_cache_stats 107 + WHERE (?1 IS NULL OR day >= ?1) AND day < ?2 108 + GROUP BY cache_url", 109 + ).context("Failed to prepare cache latency query")?; 110 + for row in stmt.query_map(rusqlite::params![since_day, today.day], |r| { 111 + Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)?, r.get::<_, i64>(2)?)) 112 + })?.filter_map(|r| r.ok()) { 113 + let (url, ms, cnt) = row; 114 + let e = cache_map.entry(url).or_insert((0, 0)); 115 + e.0 += ms; e.1 += cnt; 116 + } 117 + if today_in_range { 118 + for (url, &(ms, cnt)) in &today.cache { 119 + let e = cache_map.entry(url.clone()).or_insert((0, 0)); 120 + e.0 += ms; e.1 += cnt; 121 + } 122 + } 123 + let mut cache_latency: Vec<CacheStat> = cache_map.into_iter() 124 + .filter(|(_, (_, cnt))| *cnt > 0) 125 + .map(|(url, (ms, cnt))| CacheStat { cache_url: url, avg_ms: ms as f64 / cnt as f64, count: cnt }) 126 + .collect(); 127 + cache_latency.sort_by(|a, b| b.avg_ms.partial_cmp(&a.avg_ms).unwrap_or(std::cmp::Ordering::Equal)); 55 128 56 - let conn = db.lock().unwrap(); 129 + Ok((build_count, build_total_ms, subst_count, subst_total_ms, download_bytes, download_ms, cache_latency)) 130 + } 57 131 132 + // Slow path: full events table scan (used in direct mode or when drv filter is active). 133 + fn summary_from_events( 134 + conn: &Connection, 135 + since: Option<i64>, 136 + drv: Option<&str>, 137 + ) -> Result<(i64, i64, i64, i64, i64, i64, Vec<CacheStat>)> { 58 138 let (build_count, build_total_ms) = conn.query_row( 59 139 "SELECT COUNT(*), COALESCE(SUM(duration_ms), 0) 60 - FROM events WHERE event_type = 105 140 + FROM events INDEXED BY idx_events_type_start WHERE event_type = 105 61 141 AND (?1 IS NULL OR start_time >= ?1) 62 142 AND (?2 IS NULL OR drv_path LIKE '%' || ?2 || '%')", 63 143 rusqlite::params![since, drv], ··· 66 146 67 147 let (subst_count, subst_total_ms) = conn.query_row( 68 148 "SELECT COUNT(*), COALESCE(SUM(duration_ms), 0) 69 - FROM events WHERE event_type = 108 149 + FROM events INDEXED BY idx_events_type_start WHERE event_type = 108 70 150 AND (?1 IS NULL OR start_time >= ?1) 71 151 AND (?2 IS NULL OR drv_path LIKE '%' || ?2 || '%')", 72 152 rusqlite::params![since, drv], ··· 75 155 76 156 let (download_bytes, download_ms) = conn.query_row( 77 157 "SELECT COALESCE(SUM(total_bytes), 0), COALESCE(SUM(duration_ms), 0) 78 - FROM events WHERE event_type = 101 158 + FROM events INDEXED BY idx_events_type_start WHERE event_type = 101 79 159 AND (?1 IS NULL OR start_time >= ?1)", 80 160 rusqlite::params![since], 81 161 |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?)), 82 162 ).context("Failed to query download summary")?; 83 163 164 + let mut stmt = conn.prepare( 165 + "SELECT cache_url, AVG(duration_ms), COUNT(*) 166 + FROM events INDEXED BY idx_events_type_start 167 + WHERE event_type = 108 AND cache_url IS NOT NULL 168 + AND (?1 IS NULL OR start_time >= ?1) 169 + GROUP BY cache_url ORDER BY AVG(duration_ms) DESC", 170 + ).context("Failed to prepare cache latency query")?; 171 + let cache_latency: Vec<CacheStat> = stmt.query_map(rusqlite::params![since], |r| { 172 + Ok(CacheStat { cache_url: r.get(0)?, avg_ms: r.get(1)?, count: r.get(2)? }) 173 + })?.filter_map(|r| r.ok()).collect(); 174 + 175 + Ok((build_count, build_total_ms, subst_count, subst_total_ms, download_bytes, download_ms, cache_latency)) 176 + } 177 + 178 + pub fn collect_stats( 179 + db: &Mutex<Connection>, 180 + since: Option<i64>, 181 + drv: Option<&str>, 182 + sort: SortField, 183 + limit: u32, 184 + group: bool, 185 + today: Option<TodaySummary>, 186 + ) -> Result<Stats> { 187 + assert!(limit > 0, "limit must be > 0"); 188 + 189 + let conn = db.lock().unwrap(); 190 + 191 + // Fast path: daily_stats + today memory — O(days) instead of O(events). 192 + // Falls back to events scan when a drv filter is active (drv_path not in daily_stats) 193 + // or when running in direct mode (no daemon, today is None). 194 + let (build_count, build_total_ms, subst_count, subst_total_ms, 195 + download_bytes, download_ms, cache_latency) = 196 + if let (Some(t), None) = (today.as_ref(), drv) { 197 + summary_from_cache(&conn, since, t)? 198 + } else { 199 + summary_from_events(&conn, since, drv)? 200 + }; 201 + 84 202 assert!(build_count >= 0); 85 203 assert!(subst_count >= 0); 86 204 assert!(download_bytes >= 0); ··· 93 211 }; 94 212 let sql = format!( 95 213 "SELECT drv_path, CAST(ROUND(AVG(duration_ms)) AS INTEGER) as avg_ms, COUNT(*) as cnt 96 - FROM events 214 + FROM events INDEXED BY idx_events_type_start 97 215 WHERE event_type = 105 98 216 AND (?1 IS NULL OR start_time >= ?1) 99 217 AND (?2 IS NULL OR drv_path LIKE '%' || ?2 || '%') ··· 112 230 }; 113 231 let sql = format!( 114 232 "SELECT duration_ms, drv_path, text 115 - FROM events 233 + FROM events INDEXED BY idx_events_type_start 116 234 WHERE event_type = 105 117 235 AND (?1 IS NULL OR start_time >= ?1) 118 236 AND (?2 IS NULL OR drv_path LIKE '%' || ?2 || '%') ··· 124 242 Ok(SlowBuild { duration_ms: r.get(0)?, count: None, drv_path: r.get(1)?, text: r.get(2)? }) 125 243 })?.filter_map(|r| r.ok()).collect() 126 244 }; 127 - 128 - // Substitute (108) measures full substitution time per cache; QueryPathInfo (109) 129 - // only measures metadata lookup and would undercount latency. 130 - let mut stmt = conn.prepare( 131 - "SELECT cache_url, AVG(duration_ms), COUNT(*) 132 - FROM events 133 - WHERE event_type = 108 AND cache_url IS NOT NULL 134 - AND (?1 IS NULL OR start_time >= ?1) 135 - GROUP BY cache_url ORDER BY AVG(duration_ms) DESC", 136 - ).context("Failed to prepare cache latency query")?; 137 - let cache_latency: Vec<CacheStat> = stmt.query_map(rusqlite::params![since], |r| { 138 - Ok(CacheStat { cache_url: r.get(0)?, avg_ms: r.get(1)?, count: r.get(2)? }) 139 - })?.filter_map(|r| r.ok()).collect(); 140 245 141 246 Ok(Stats { build_count, build_total_ms, subst_count, subst_total_ms, download_bytes, download_ms, slowest_builds, cache_latency }) 142 247 } ··· 366 471 let drv_ref = drv.as_deref(); 367 472 368 473 // FileTransfer (101) has NULL drv_path and is intentionally excluded by the drv filter. 474 + // INDEXED BY forces the covering start_time-first index so all four projected columns 475 + // (start_time, event_type, duration_ms, total_bytes) are served without table lookups. 369 476 let mut stmt = conn.prepare( 370 477 "SELECT start_time, event_type, duration_ms, total_bytes 371 - FROM events 478 + FROM events INDEXED BY idx_events_start_cover 372 479 WHERE event_type IN (101, 105, 108) 373 480 AND (?1 IS NULL OR start_time >= ?1) 374 481 AND (?2 IS NULL OR drv_path LIKE '%' || ?2 || '%')