Nix Observability Daemon
observability nix
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

optimize trend queries using daily_stats

+273 -104
+2 -2
Cargo.lock
··· 486 486 487 487 [[package]] 488 488 name = "libredox" 489 - version = "0.1.14" 489 + version = "0.1.16" 490 490 source = "registry+https://github.com/rust-lang/crates.io-index" 491 - checksum = "1744e39d1d6a9948f4f388969627434e31128196de472883b39f148769bfe30a" 491 + checksum = "e02f3bb43d335493c96bf3fd3a321600bf6bd07ed34bc64118e9293bdffea46c" 492 492 dependencies = [ 493 493 "libc", 494 494 ]
+1 -1
Cargo.toml
··· 23 23 chrono = "0.4" 24 24 clap = { version = "4", features = ["derive", "env"] } 25 25 anyhow = "1.0" 26 - directories = "5.0" 27 26 serde = { version = "1.0", features = ["derive"] } 28 27 serde_json = "1.0" 28 + directories = "5.0" 29 29 30 30 [dev-dependencies] 31 31 criterion = { version = "0.5", features = ["html_reports"] }
+33 -12
README.md
··· 1 1 ## nod 2 2 3 - A simple daemon that collects Nix build and substitution statistics using structured JSON logs. 3 + A daemon that collects Nix build and substitution statistics using structured JSON logs. 4 4 5 5 ## requirements 6 6 ··· 23 23 Point Nix at the socket in `nix.conf`: 24 24 25 25 ``` 26 - json-log-path = /run/user/1000/nod/nod.sock 26 + json-log-path = /tmp/nod.sock 27 27 ``` 28 28 29 - Then use Nix normally. View accumulated stats with: 29 + Then use Nix normally. Query accumulated stats: 30 30 31 31 ```bash 32 - nod stats # all time 33 - nod stats -d 7 # last 7 days 34 - nod stats -m 3 # last 3 months 35 - nod stats -y 1 # last year 36 - nod clean # reset the database 32 + nod # aggregate stats, all time 33 + nod -d 30 # last 30 days 34 + nod -m 3 # last 3 months 35 + nod --drv firefox # filter to derivations matching "firefox" 36 + nod --sort count --group # group by derivation, sort by frequency 37 + nod --bucket day # time series by day 38 + nod --bucket month # time series by month 39 + nod --bucket day --output test # Mann-Whitney significance test vs prior period 40 + nod --bucket day --output csv # CSV output 41 + nod clean # delete all events 42 + nod clean --before-days 90 # delete events older than 90 days 43 + ``` 44 + 45 + ## NixOS 46 + 47 + ```nix 48 + { 49 + services.nod = { 50 + enable = true; 51 + retainDays = 180; # default 52 + }; 53 + } 37 54 ``` 38 55 56 + The module sets `nix.settings.json-log-path` automatically and exports `NOD_SOCKET` into `/etc/environment` so every session (login, SSH, scripts) finds the daemon without `--socket`. 57 + 39 58 ## configuration 40 59 41 60 | flag | env | default | 42 61 |------|-----|---------| 43 - | `--socket` | `NOD_SOCKET` | `$XDG_RUNTIME_DIR/nod/nod.sock` | 44 - | `--db` | `NOD_DB` | `$XDG_DATA_HOME/nod/nod.db` | 62 + | `--socket` | `NOD_SOCKET` | `/tmp/nod.sock` | 63 + | `--db` | `NOD_DB` | `nod.db` (current directory) | 45 64 46 - Both directories are created automatically. The socket path in `nix.conf` must match `--socket`. 65 + When using the NixOS module both are pinned to fixed system paths and exported automatically. 47 66 48 67 ## how? 49 68 50 - Nix 2.30 added `json-log-path`, which writes a stream of structured activity events (start/result/stop) to a file or Unix socket while a build runs. nod listens on that socket, tracks in-flight activities by ID, and on each stop event inserts a completed row into a local SQLite database. `nod stats` queries that database through the daemon. 69 + Nix 2.30 added `json-log-path`, which writes a stream of structured activity events (start/result/stop) to a file or Unix socket while a build runs. nod listens on that socket, tracks in-flight activities by ID, and on each stop event inserts a completed row into a local SQLite database. `nod` queries that database through the daemon socket. 70 + 71 + Stats queries are served from a pre-aggregated `daily_stats` table (one row per day per event type) maintained in lockstep with inserts, so summary queries are O(days) regardless of total event count. The current day's aggregates are held in memory and flushed on day rollover. 51 72 52 73 Relevant Nix source: [logging.hh](https://github.com/NixOS/nix/blob/b4de973847370204cf28fe2092abdd21f25ee0e8/src/libutil/include/nix/util/logging.hh)
+24 -9
benches/queries.rs
··· 120 120 fn bench_collect_trend(c: &mut Criterion) { 121 121 let mut group = c.benchmark_group("collect_trend"); 122 122 123 + let today_day = std::time::SystemTime::now() 124 + .duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() as i64 / 86400; 125 + let today_snap = TodaySummary { day: today_day, ..Default::default() }; 126 + 123 127 for n in [10_000usize, 100_000, 1_000_000] { 124 128 let conn = Connection::open_in_memory().unwrap(); 125 129 seed(&conn, n); 126 130 let db = Mutex::new(conn); 127 131 128 - // All-time monthly: exercises full scan + Rust-side bucketing. 129 - group.bench_with_input(BenchmarkId::new("all_time/month", n), &n, |b, _| { 130 - b.iter(|| collect_trend(&db, None, BucketSize::Month, None).unwrap()) 132 + // full=false: daily_stats path (table/csv output). 133 + let t = today_snap.clone(); 134 + group.bench_with_input(BenchmarkId::new("agg/all_time/month", n), &n, |b, _| { 135 + b.iter(|| collect_trend(&db, None, BucketSize::Month, None, Some(t.clone()), false).unwrap()) 136 + }); 137 + 138 + let t = today_snap.clone(); 139 + group.bench_with_input(BenchmarkId::new("agg/all_time/day", n), &n, |b, _| { 140 + b.iter(|| collect_trend(&db, None, BucketSize::Day, None, Some(t.clone()), false).unwrap()) 141 + }); 142 + 143 + let since = Some(BASE_TIME + YEAR_SPAN * 9 / 10); 144 + let t = today_snap.clone(); 145 + group.bench_with_input(BenchmarkId::new("agg/recent_10pct/day", n), &n, |b, _| { 146 + b.iter(|| collect_trend(&db, since, BucketSize::Day, None, Some(t.clone()), false).unwrap()) 131 147 }); 132 148 133 - // All-time daily: more bucket transitions, otherwise identical scan. 134 - group.bench_with_input(BenchmarkId::new("all_time/day", n), &n, |b, _| { 135 - b.iter(|| collect_trend(&db, None, BucketSize::Day, None).unwrap()) 149 + // full=true: events scan (--output test). 150 + group.bench_with_input(BenchmarkId::new("full/all_time/month", n), &n, |b, _| { 151 + b.iter(|| collect_trend(&db, None, BucketSize::Month, None, None, true).unwrap()) 136 152 }); 137 153 138 - // Recent 10%: start_time index reduces rows scanned by 90%. 139 154 let since = Some(BASE_TIME + YEAR_SPAN * 9 / 10); 140 - group.bench_with_input(BenchmarkId::new("recent_10pct/day", n), &n, |b, _| { 141 - b.iter(|| collect_trend(&db, since, BucketSize::Day, None).unwrap()) 155 + group.bench_with_input(BenchmarkId::new("full/recent_10pct/day", n), &n, |b, _| { 156 + b.iter(|| collect_trend(&db, since, BucketSize::Day, None, None, true).unwrap()) 142 157 }); 143 158 } 144 159
+23 -8
flake.nix
··· 84 84 socketPath = lib.mkOption { 85 85 type = lib.types.path; 86 86 default = "/run/nod/nod.sock"; 87 - description = "Path to the Unix socket. Exposed via NOD_SOCKET in the session environment."; 87 + description = "Path to the Unix socket. Propagated to all sessions via /etc/environment so nod always finds the daemon without --socket."; 88 88 }; 89 89 databasePath = lib.mkOption { 90 90 type = lib.types.path; 91 91 default = "/var/lib/nod/nod.db"; 92 - description = "Path to the SQLite database"; 92 + description = "Path to the SQLite database."; 93 + }; 94 + retainDays = lib.mkOption { 95 + type = lib.types.nullOr lib.types.ints.positive; 96 + default = null; 97 + description = "Override the retention period in days. When null the daemon default of 180 days is used."; 93 98 }; 94 99 }; 95 100 ··· 101 106 }; 102 107 users.groups.${cfg.group} = {}; 103 108 104 - # Tell nix to forward its internal JSON log to the socket 109 + # Forward Nix's internal JSON activity log to the daemon socket. 110 + # The nix-daemon runs as root so the socket directory must be world-searchable 111 + # and the socket itself must be group-writable (handled by RuntimeDirectoryMode 112 + # and UMask below). Users that only need to query nod require no group membership. 105 113 nix.settings.json-log-path = cfg.socketPath; 106 114 107 - # Make the socket path available to interactive shells so users 108 - # can run `nod stats` without passing --socket explicitly. 109 - environment.sessionVariables.NOD_SOCKET = cfg.socketPath; 115 + # Expose the socket path to every session (login, SSH, scripts) via /etc/environment 116 + # so that `nod` always resolves the socket without needing --socket or NOD_SOCKET set 117 + # manually. sessionVariables only reaches interactive login shells and would cause 118 + # "cannot connect to socket" errors in non-login SSH sessions and cron jobs. 119 + environment.variables.NOD_SOCKET = cfg.socketPath; 120 + environment.variables.NOD_DB = cfg.databasePath; 121 + 122 + # Make `nod` available to all users without manual systemPackages entries. 123 + environment.systemPackages = [ cfg.package ]; 110 124 111 125 systemd.services.nod = { 112 126 description = "Nix Observability Daemon"; 113 127 wantedBy = ["multi-user.target"]; 114 - after = ["network.target"]; 128 + after = ["local-fs.target"]; 115 129 116 130 serviceConfig = { 117 131 User = cfg.user; 118 132 Group = cfg.group; 119 - ExecStart = "${cfg.package}/bin/nod daemon --db ${cfg.databasePath} --socket ${cfg.socketPath}"; 133 + ExecStart = "${cfg.package}/bin/nod daemon --db ${cfg.databasePath} --socket ${cfg.socketPath}" 134 + + lib.optionalString (cfg.retainDays != null) " --retain-days ${toString cfg.retainDays}"; 120 135 Restart = "always"; 121 136 StateDirectory = "nod"; 122 137 StateDirectoryMode = "0750";
+24 -9
src/daemon.rs
··· 269 269 since: Option<i64>, 270 270 bucket: BucketSize, 271 271 drv: Option<String>, 272 + // true = return individual duration samples (needed for --output test / Mann-Whitney). 273 + // false = return aggregates only via the daily_stats fast path. 274 + #[serde(default)] 275 + full: bool, 272 276 }, 273 277 Clean { 274 278 // Unix timestamp; None means delete everything. ··· 404 408 405 409 fn run_retention(conn: &Connection, retain_days: u32) -> Result<()> { 406 410 assert!(retain_days > 0, "retain_days must be > 0"); 407 - let cutoff = Utc::now().timestamp() - retain_days as i64 * 86400; 411 + let cutoff = Utc::now().timestamp() - retain_days as i64 * 86400; 412 + let cutoff_day = cutoff / 86400; 408 413 let deleted = conn.execute("DELETE FROM events WHERE start_time < ?1", [cutoff]) 409 414 .context("Retention DELETE failed")?; 415 + conn.execute("DELETE FROM daily_stats WHERE day < ?1", [cutoff_day]) 416 + .context("Retention DELETE daily_stats failed")?; 417 + conn.execute("DELETE FROM daily_cache_stats WHERE day < ?1", [cutoff_day]) 418 + .context("Retention DELETE daily_cache_stats failed")?; 410 419 // TRUNCATE resets and shrinks the WAL file; use RESTART as fallback if readers 411 420 // are active (TRUNCATE fails when a reader holds the WAL open). 412 421 if conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE)").is_err() { ··· 559 568 pub async fn run_daemon( 560 569 socket_path: PathBuf, 561 570 db: Arc<DbConnections>, 562 - retain_days: Option<u32>, 571 + retain_days: u32, 563 572 ) -> Result<()> { 573 + assert!(retain_days > 0, "retain_days must be > 0"); 564 574 let today_day = Utc::now().timestamp() / 86400; 565 575 let today = { 566 576 let conn = db.reader.lock().unwrap(); ··· 578 588 } 579 589 580 590 // Run retention on startup, then every 6 hours. 581 - if let Some(days) = retain_days { 582 - assert!(days > 0, "retain_days must be > 0"); 591 + { 583 592 let db_startup = Arc::clone(&db); 584 593 tokio::task::spawn_blocking(move || { 585 - run_retention(&db_startup.writer.lock().unwrap(), days) 594 + run_retention(&db_startup.writer.lock().unwrap(), retain_days) 586 595 }).await??; 587 596 588 597 let db_timer = Arc::clone(&db); ··· 593 602 interval.tick().await; 594 603 let db2 = Arc::clone(&db_timer); 595 604 if let Err(e) = tokio::task::spawn_blocking(move || { 596 - run_retention(&db2.writer.lock().unwrap(), days) 605 + run_retention(&db2.writer.lock().unwrap(), retain_days) 597 606 }).await { 598 607 error!("Retention timer task failed: {}", e); 599 608 } ··· 635 644 writer.write_all((serde_json::to_string(&stats)? + "\n").as_bytes()).await?; 636 645 break; 637 646 } 638 - Ok(SocketMessage::Command(ClientCommand::GetTrend { since, bucket, drv })) => { 647 + Ok(SocketMessage::Command(ClientCommand::GetTrend { since, bucket, drv, full })) => { 648 + let today = if !full && drv.is_none() && !matches!(bucket, BucketSize::Hour) { 649 + Some(state.lock().unwrap().today.snapshot()) 650 + } else { 651 + None 652 + }; 639 653 let db = Arc::clone(&db); 640 - let trend = tokio::task::spawn_blocking(move || collect_trend(&db.reader, since, bucket, drv)) 641 - .await??; 654 + let trend = tokio::task::spawn_blocking(move || { 655 + collect_trend(&db.reader, since, bucket, drv, today, full) 656 + }).await??; 642 657 writer.write_all((serde_json::to_string(&trend)? + "\n").as_bytes()).await?; 643 658 break; 644 659 }
+18 -25
src/main.rs
··· 79 79 enum Commands { 80 80 /// Run the observability daemon 81 81 Daemon { 82 - /// Delete events older than N days; cleanup runs on startup and every 6 hours 83 - #[arg(long, env = "NOD_RETAIN_DAYS")] 84 - retain_days: Option<u32>, 82 + /// Delete events older than N days; cleanup runs on startup and every 6 hours (default: 180) 83 + #[arg(long, env = "NOD_RETAIN_DAYS", default_value = "180")] 84 + retain_days: u32, 85 85 }, 86 86 /// Clear data from the database 87 87 Clean { ··· 91 91 }, 92 92 } 93 93 94 - fn resolve_db_path(db: Option<PathBuf>, project_dirs: &Option<ProjectDirs>) -> PathBuf { 94 + fn resolve_db_path(db: Option<PathBuf>) -> PathBuf { 95 95 db.unwrap_or_else(|| { 96 - project_dirs.as_ref() 97 - .map(|d| d.data_dir().join("nod.db")) 96 + ProjectDirs::from("", "", "nod") 97 + .map(|dirs| dirs.data_local_dir().join("nod.db")) 98 98 .unwrap_or_else(|| PathBuf::from("nod.db")) 99 99 }) 100 100 } 101 101 102 - fn resolve_socket_path(socket: Option<PathBuf>, project_dirs: &Option<ProjectDirs>) -> PathBuf { 103 - socket.unwrap_or_else(|| { 104 - project_dirs.as_ref() 105 - .and_then(|d| d.runtime_dir()) 106 - .map(|d| d.join("nod.sock")) 107 - .unwrap_or_else(|| PathBuf::from("/tmp/nod.sock")) 108 - }) 102 + fn resolve_socket_path(socket: Option<PathBuf>) -> PathBuf { 103 + socket.unwrap_or_else(|| PathBuf::from("/tmp/nod.sock")) 109 104 } 110 105 111 106 fn is_connection_error(e: &std::io::Error) -> bool { ··· 153 148 tracing_subscriber::fmt::init(); 154 149 let cli = Cli::parse(); 155 150 156 - let project_dirs = ProjectDirs::from("org", "nixos", "nod"); 157 - 158 151 match cli.command { 159 152 Some(Commands::Daemon { retain_days }) => { 160 - if let Some(days) = retain_days { 161 - assert!(days > 0, "--retain-days must be > 0"); 162 - } 153 + assert!(retain_days > 0, "--retain-days must be > 0"); 163 154 164 - let db_path = resolve_db_path(cli.db, &project_dirs); 165 - let socket_path = resolve_socket_path(cli.socket, &project_dirs); 155 + let db_path = resolve_db_path(cli.db); 156 + let socket_path = resolve_socket_path(cli.socket); 166 157 167 158 if let Some(parent) = db_path.parent() { 168 159 if !parent.as_os_str().is_empty() { ··· 193 184 } 194 185 195 186 Some(Commands::Clean { before_days }) => { 196 - let socket_path = resolve_socket_path(cli.socket, &project_dirs); 187 + let socket_path = resolve_socket_path(cli.socket); 197 188 let before: Option<i64> = before_days.map(|d| { 198 189 assert!(d > 0, "--before-days must be > 0"); 199 190 (Utc::now() - chrono::Duration::days(d as i64)).timestamp() ··· 216 207 } 217 208 Err(e) if is_connection_error(&e) => { 218 209 // Daemon not running — operate directly on the database. 219 - let db_path = resolve_db_path(cli.db, &project_dirs); 210 + let db_path = resolve_db_path(cli.db); 220 211 let conn = open_db(&db_path)?; 221 212 if let Some(ts) = before { 222 213 let deleted = conn.execute( ··· 242 233 } 243 234 244 235 let since = compute_since(cli.days, cli.months, cli.years); 245 - let socket_path = resolve_socket_path(cli.socket, &project_dirs); 236 + let socket_path = resolve_socket_path(cli.socket); 246 237 247 238 match UnixStream::connect(&socket_path).await { 248 239 Ok(stream) => { 249 240 query_via_socket(stream, since, cli.drv, cli.bucket, cli.sort, cli.limit, cli.group, cli.output).await?; 250 241 } 251 242 Err(e) if is_connection_error(&e) => { 252 - let db_path = resolve_db_path(cli.db, &project_dirs); 243 + let db_path = resolve_db_path(cli.db); 253 244 query_direct(&db_path, since, cli.drv, cli.bucket, cli.sort, cli.limit, cli.group, &cli.output)?; 254 245 } 255 246 Err(e) => return Err(e).context("Failed to connect to daemon"), ··· 278 269 "since": since, 279 270 "bucket": bucket_size, 280 271 "drv": drv, 272 + "full": matches!(output, OutputFormat::Test), 281 273 }); 282 274 stream.write_all((cmd.to_string() + "\n").as_bytes()).await?; 283 275 ··· 323 315 let conn = Mutex::new(conn); 324 316 325 317 if let Some(bucket_size) = bucket { 326 - let trend = collect_trend(&conn, since, bucket_size, drv)?; 318 + let full = matches!(output, OutputFormat::Test); 319 + let trend = collect_trend(&conn, since, bucket_size, drv, None, full)?; 327 320 display_trend_output(&trend, output); 328 321 } else { 329 322 let s = nod::stats::collect_stats(&conn, since, drv.as_deref(), sort, limit, group, None)?;
+148 -38
src/stats.rs
··· 441 441 #[derive(Debug, Serialize, Deserialize)] 442 442 pub struct TrendBucket { 443 443 pub bucket: String, 444 + pub build_count: i64, 445 + pub build_total_ms: i64, 446 + pub subst_count: i64, 447 + pub subst_total_ms: i64, 448 + pub download_bytes: i64, 449 + // Only populated when full duration data is requested (--output test). 450 + #[serde(default)] 444 451 pub build_durations: Vec<i64>, 452 + #[serde(default)] 445 453 pub subst_durations: Vec<i64>, 446 - pub download_bytes: i64, 447 454 } 448 455 449 456 #[derive(Debug, Serialize, Deserialize)] ··· 453 460 pub drv_filter: Option<String>, 454 461 } 455 462 456 - // Query returns raw start_time integers — strftime is computed in Rust via 457 - // bucket_label()/bucket_end() to avoid N SQLite string allocations. Bucket 458 - // boundaries are detected with a single integer comparison per row (ts >= next_bucket) 459 - // so string allocs happen only once per bucket, not once per row. 460 - pub fn collect_trend( 461 - db: &Mutex<Connection>, 463 + // Events-table scan. Returns buckets with individual duration samples populated. 464 + // Required when individual samples are needed (--output test / Mann-Whitney) or 465 + // when a drv filter or hour granularity rules out the daily_stats path. 466 + // 467 + // Raw start_time integers from SQLite — bucket boundaries detected with a single 468 + // integer comparison per row (ts >= next_bucket) so string allocs happen only 469 + // once per bucket, not once per row. 470 + fn trend_from_events( 471 + conn: &Connection, 462 472 since: Option<i64>, 463 - bucket: BucketSize, 464 - drv: Option<String>, 465 - ) -> Result<Trend> { 466 - if let Some(ref d) = drv { 467 - assert!(!d.is_empty(), "drv filter must not be empty"); 468 - } 469 - 470 - let conn = db.lock().unwrap(); 471 - let drv_ref = drv.as_deref(); 472 - 473 + bucket: &BucketSize, 474 + drv: Option<&str>, 475 + ) -> Result<Vec<TrendBucket>> { 473 476 // FileTransfer (101) has NULL drv_path and is intentionally excluded by the drv filter. 474 477 // INDEXED BY forces the covering start_time-first index so all four projected columns 475 478 // (start_time, event_type, duration_ms, total_bytes) are served without table lookups. ··· 485 488 let mut buckets: Vec<TrendBucket> = vec![]; 486 489 let mut next_bucket: i64 = 0; 487 490 488 - for row in stmt.query_map(rusqlite::params![since, drv_ref], |r| { 491 + for row in stmt.query_map(rusqlite::params![since, drv], |r| { 489 492 Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?, r.get::<_, i64>(2)?, r.get::<_, i64>(3)?)) 490 493 })?.filter_map(|r| r.ok()) { 491 494 let (ts, etype, dur, bytes) = row; ··· 493 496 494 497 if ts >= next_bucket { 495 498 buckets.push(TrendBucket { 496 - bucket: bucket_label(ts, &bucket), 499 + bucket: bucket_label(ts, bucket), 500 + build_count: 0, build_total_ms: 0, 501 + subst_count: 0, subst_total_ms: 0, 502 + download_bytes: 0, 497 503 build_durations: vec![], 498 504 subst_durations: vec![], 499 - download_bytes: 0, 500 505 }); 501 - next_bucket = bucket_end(ts, &bucket); 506 + next_bucket = bucket_end(ts, bucket); 502 507 } 503 508 504 509 let last = buckets.last_mut().unwrap(); 505 510 match etype { 506 - 105 => { assert!(dur >= 0); last.build_durations.push(dur); } 507 - 108 => { assert!(dur >= 0); last.subst_durations.push(dur); } 511 + 105 => { assert!(dur >= 0); last.build_count += 1; last.build_total_ms += dur; last.build_durations.push(dur); } 512 + 108 => { assert!(dur >= 0); last.subst_count += 1; last.subst_total_ms += dur; last.subst_durations.push(dur); } 508 513 101 => { assert!(bytes >= 0); last.download_bytes += bytes; } 509 514 _ => {} 510 515 } 511 516 } 512 517 518 + Ok(buckets) 519 + } 520 + 521 + // daily_stats query. O(days) — does not populate build_durations/subst_durations. 522 + // Closed days come from the table; today's partial data is merged from the in-memory 523 + // snapshot so the current day is always included when running via the daemon. 524 + fn trend_from_daily_stats( 525 + conn: &Connection, 526 + since: Option<i64>, 527 + bucket: &BucketSize, 528 + today: Option<TodaySummary>, 529 + ) -> Result<Vec<TrendBucket>> { 530 + let since_day: Option<i64> = since.map(|ts| ts / 86400); 531 + let today_day = today.as_ref().map(|t| t.day) 532 + .unwrap_or_else(|| Utc::now().timestamp() / 86400); 533 + 534 + let mut stmt = conn.prepare( 535 + "SELECT day * 86400, event_type, count, total_ms, total_bytes 536 + FROM daily_stats 537 + WHERE event_type IN (101, 105, 108) 538 + AND (?1 IS NULL OR day >= ?1) 539 + AND day < ?2 540 + ORDER BY day ASC", 541 + ).context("Failed to prepare trend query")?; 542 + 543 + let mut buckets: Vec<TrendBucket> = vec![]; 544 + let mut next_bucket: i64 = 0; 545 + 546 + for row in stmt.query_map(rusqlite::params![since_day, today_day], |r| { 547 + Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?, r.get::<_, i64>(2)?, r.get::<_, i64>(3)?, r.get::<_, i64>(4)?)) 548 + })?.filter_map(|r| r.ok()) { 549 + let (ts, etype, count, total_ms, total_bytes) = row; 550 + assert!(ts >= 0); 551 + assert!(count >= 0); 552 + 553 + if ts >= next_bucket { 554 + buckets.push(TrendBucket { 555 + bucket: bucket_label(ts, bucket), 556 + build_count: 0, build_total_ms: 0, 557 + subst_count: 0, subst_total_ms: 0, 558 + download_bytes: 0, 559 + build_durations: vec![], 560 + subst_durations: vec![], 561 + }); 562 + next_bucket = bucket_end(ts, bucket); 563 + } 564 + 565 + let last = buckets.last_mut().unwrap(); 566 + match etype { 567 + 105 => { last.build_count += count; last.build_total_ms += total_ms; } 568 + 108 => { last.subst_count += count; last.subst_total_ms += total_ms; } 569 + 101 => { last.download_bytes += total_bytes; } 570 + _ => {} 571 + } 572 + } 573 + 574 + if let Some(t) = today { 575 + if t.build_count > 0 || t.subst_count > 0 || t.download_bytes > 0 { 576 + let today_ts = t.day * 86400; 577 + if since.map_or(true, |s| today_ts >= s) { 578 + if today_ts >= next_bucket { 579 + buckets.push(TrendBucket { 580 + bucket: bucket_label(today_ts, bucket), 581 + build_count: t.build_count, 582 + build_total_ms: t.build_total_ms, 583 + subst_count: t.subst_count, 584 + subst_total_ms: t.subst_total_ms, 585 + download_bytes: t.download_bytes, 586 + build_durations: vec![], 587 + subst_durations: vec![], 588 + }); 589 + } else if let Some(last) = buckets.last_mut() { 590 + // Today falls in the same week/month bucket as the last historical day. 591 + last.build_count += t.build_count; 592 + last.build_total_ms += t.build_total_ms; 593 + last.subst_count += t.subst_count; 594 + last.subst_total_ms += t.subst_total_ms; 595 + last.download_bytes += t.download_bytes; 596 + } 597 + } 598 + } 599 + } 600 + 601 + Ok(buckets) 602 + } 603 + 604 + pub fn collect_trend( 605 + db: &Mutex<Connection>, 606 + since: Option<i64>, 607 + bucket: BucketSize, 608 + drv: Option<String>, 609 + today: Option<TodaySummary>, 610 + full: bool, 611 + ) -> Result<Trend> { 612 + if let Some(ref d) = drv { 613 + assert!(!d.is_empty(), "drv filter must not be empty"); 614 + } 615 + 616 + let conn = db.lock().unwrap(); 617 + 618 + // daily_stats path when individual samples are not needed, no drv filter is active 619 + // (daily_stats has no per-drv breakdown), and bucket granularity is at least a day 620 + // (daily_stats has no intra-day resolution). 621 + let buckets = if !full && drv.is_none() && !matches!(bucket, BucketSize::Hour) { 622 + trend_from_daily_stats(&conn, since, &bucket, today)? 623 + } else { 624 + trend_from_events(&conn, since, &bucket, drv.as_deref())? 625 + }; 626 + 513 627 for i in 1..buckets.len() { 514 628 assert!(buckets[i].bucket > buckets[i - 1].bucket, "buckets must be strictly ascending"); 515 629 } ··· 526 640 println!("filter: {}", drv); 527 641 } 528 642 529 - print!("{:<bw$} {:>6} {:>10} {:>6} {:>10}", "period", "builds", "build med", "subst", "subst med"); 643 + print!("{:<bw$} {:>6} {:>10} {:>6} {:>10}", "period", "builds", "build avg", "subst", "subst avg"); 530 644 if has_downloads { print!(" {:>8}", "dl (MB)"); } 531 645 println!(); 532 646 ··· 536 650 } 537 651 538 652 for b in &trend.buckets { 539 - let mut bs = b.build_durations.clone(); bs.sort_unstable(); 540 - let mut ss = b.subst_durations.clone(); ss.sort_unstable(); 541 - let build_med = if bs.is_empty() { 0 } else { median_sorted(&bs) as i64 }; 542 - let subst_med = if ss.is_empty() { 0 } else { median_sorted(&ss) as i64 }; 653 + let build_avg = if b.build_count > 0 { b.build_total_ms / b.build_count } else { 0 }; 654 + let subst_avg = if b.subst_count > 0 { b.subst_total_ms / b.subst_count } else { 0 }; 543 655 544 656 print!("{:<bw$} {:>6} {:>10} {:>6} {:>10}", 545 - b.bucket, b.build_durations.len(), fmt_ms(build_med), 546 - b.subst_durations.len(), fmt_ms(subst_med)); 657 + b.bucket, b.build_count, fmt_ms(build_avg), 658 + b.subst_count, fmt_ms(subst_avg)); 547 659 if has_downloads { print!(" {:>8.1}", b.download_bytes as f64 / 1_048_576.0); } 548 660 println!(); 549 661 } ··· 606 718 } 607 719 608 720 pub fn output_csv_trend(trend: &Trend) { 609 - println!("period,build_count,build_median_ms,subst_count,subst_median_ms,download_bytes"); 721 + println!("period,build_count,build_avg_ms,subst_count,subst_avg_ms,download_bytes"); 610 722 for b in &trend.buckets { 611 - let mut bs = b.build_durations.clone(); bs.sort_unstable(); 612 - let mut ss = b.subst_durations.clone(); ss.sort_unstable(); 613 - let build_med = if bs.is_empty() { 0 } else { median_sorted(&bs) as i64 }; 614 - let subst_med = if ss.is_empty() { 0 } else { median_sorted(&ss) as i64 }; 615 - assert!(build_med >= 0); 616 - assert!(subst_med >= 0); 617 - println!("{},{},{},{},{},{}", b.bucket, b.build_durations.len(), build_med, b.subst_durations.len(), subst_med, b.download_bytes); 723 + let build_avg = if b.build_count > 0 { b.build_total_ms / b.build_count } else { 0 }; 724 + let subst_avg = if b.subst_count > 0 { b.subst_total_ms / b.subst_count } else { 0 }; 725 + assert!(build_avg >= 0); 726 + assert!(subst_avg >= 0); 727 + println!("{},{},{},{},{},{}", b.bucket, b.build_count, build_avg, b.subst_count, subst_avg, b.download_bytes); 618 728 } 619 729 } 620 730