Nix Observability Daemon
observability
nix
1use anyhow::{Context, Result};
2use chrono::prelude::*;
3use rusqlite::Connection;
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::sync::Mutex;
7
8#[derive(Debug, Clone, Default, Serialize, Deserialize, clap::ValueEnum)]
9#[serde(rename_all = "snake_case")]
10pub enum SortField {
11 #[default]
12 Duration,
13 Count,
14 Name,
15}
16
17#[derive(Debug, Serialize, Deserialize)]
18pub struct Stats {
19 pub build_count: i64,
20 pub build_total_ms: i64,
21 pub subst_count: i64,
22 pub subst_total_ms: i64,
23 pub download_bytes: i64,
24 pub download_ms: i64,
25 pub slowest_builds: Vec<SlowBuild>,
26 pub cache_latency: Vec<CacheStat>,
27}
28
29#[derive(Debug, Serialize, Deserialize)]
30pub struct SlowBuild {
31 pub duration_ms: i64,
32 pub count: Option<i64>,
33 pub drv_path: Option<String>,
34 pub text: Option<String>,
35}
36
37#[derive(Debug, Serialize, Deserialize)]
38pub struct CacheStat {
39 pub cache_url: String,
40 pub avg_ms: f64,
41 pub count: i64,
42}
43
44// Snapshot of the current day's in-memory aggregates, passed from the daemon
45// to collect_stats so the summary path never needs to scan the events table.
46// cache: substituter URL → (total_ms, count)
47#[derive(Clone, Default)]
48pub struct TodaySummary {
49 pub day: i64,
50 pub build_count: i64,
51 pub build_total_ms: i64,
52 pub subst_count: i64,
53 pub subst_total_ms: i64,
54 pub download_count: i64,
55 pub download_bytes: i64,
56 pub download_ms: i64,
57 pub cache: HashMap<String, (i64, i64)>,
58}
59
60// Fast path: query daily_stats for closed days, merge today's in-memory snapshot.
61// Returns (build_count, build_ms, subst_count, subst_ms, dl_bytes, dl_ms, cache_latency).
62fn summary_from_cache(
63 conn: &Connection,
64 since: Option<i64>,
65 today: &TodaySummary,
66) -> Result<(i64, i64, i64, i64, i64, i64, Vec<CacheStat>)> {
67 assert!(today.day > 0);
68 let since_day: Option<i64> = since.map(|ts| ts / 86400);
69 let today_in_range = since_day.map_or(true, |sd| today.day >= sd);
70
71 let (hbc, hbms) = conn.query_row(
72 "SELECT COALESCE(SUM(count),0), COALESCE(SUM(total_ms),0)
73 FROM daily_stats WHERE event_type = 105
74 AND (?1 IS NULL OR day >= ?1) AND day < ?2",
75 rusqlite::params![since_day, today.day],
76 |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?)),
77 ).context("Failed to query build summary from daily_stats")?;
78
79 let (hsc, hsms) = conn.query_row(
80 "SELECT COALESCE(SUM(count),0), COALESCE(SUM(total_ms),0)
81 FROM daily_stats WHERE event_type = 108
82 AND (?1 IS NULL OR day >= ?1) AND day < ?2",
83 rusqlite::params![since_day, today.day],
84 |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?)),
85 ).context("Failed to query subst summary from daily_stats")?;
86
87 let (hdb, hdms) = conn.query_row(
88 "SELECT COALESCE(SUM(total_bytes),0), COALESCE(SUM(total_ms),0)
89 FROM daily_stats WHERE event_type = 101
90 AND (?1 IS NULL OR day >= ?1) AND day < ?2",
91 rusqlite::params![since_day, today.day],
92 |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?)),
93 ).context("Failed to query download summary from daily_stats")?;
94
95 let build_count = hbc + if today_in_range { today.build_count } else { 0 };
96 let build_total_ms = hbms + if today_in_range { today.build_total_ms } else { 0 };
97 let subst_count = hsc + if today_in_range { today.subst_count } else { 0 };
98 let subst_total_ms = hsms + if today_in_range { today.subst_total_ms } else { 0 };
99 let download_bytes = hdb + if today_in_range { today.download_bytes } else { 0 };
100 let download_ms = hdms + if today_in_range { today.download_ms } else { 0 };
101
102 // Cache latency: closed days from daily_cache_stats, today from memory.
103 let mut cache_map: HashMap<String, (i64, i64)> = HashMap::new();
104 let mut stmt = conn.prepare(
105 "SELECT cache_url, SUM(total_ms), SUM(count)
106 FROM daily_cache_stats
107 WHERE (?1 IS NULL OR day >= ?1) AND day < ?2
108 GROUP BY cache_url",
109 ).context("Failed to prepare cache latency query")?;
110 for row in stmt.query_map(rusqlite::params![since_day, today.day], |r| {
111 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)?, r.get::<_, i64>(2)?))
112 })?.filter_map(|r| r.ok()) {
113 let (url, ms, cnt) = row;
114 let e = cache_map.entry(url).or_insert((0, 0));
115 e.0 += ms; e.1 += cnt;
116 }
117 if today_in_range {
118 for (url, &(ms, cnt)) in &today.cache {
119 let e = cache_map.entry(url.clone()).or_insert((0, 0));
120 e.0 += ms; e.1 += cnt;
121 }
122 }
123 let mut cache_latency: Vec<CacheStat> = cache_map.into_iter()
124 .filter(|(_, (_, cnt))| *cnt > 0)
125 .map(|(url, (ms, cnt))| CacheStat { cache_url: url, avg_ms: ms as f64 / cnt as f64, count: cnt })
126 .collect();
127 cache_latency.sort_by(|a, b| b.avg_ms.partial_cmp(&a.avg_ms).unwrap_or(std::cmp::Ordering::Equal));
128
129 Ok((build_count, build_total_ms, subst_count, subst_total_ms, download_bytes, download_ms, cache_latency))
130}
131
132// Slow path: full events table scan (used in direct mode or when drv filter is active).
133fn summary_from_events(
134 conn: &Connection,
135 since: Option<i64>,
136 drv: Option<&str>,
137) -> Result<(i64, i64, i64, i64, i64, i64, Vec<CacheStat>)> {
138 let (build_count, build_total_ms) = conn.query_row(
139 "SELECT COUNT(*), COALESCE(SUM(duration_ms), 0)
140 FROM events INDEXED BY idx_events_type_start WHERE event_type = 105
141 AND (?1 IS NULL OR start_time >= ?1)
142 AND (?2 IS NULL OR drv_path LIKE '%' || ?2 || '%')",
143 rusqlite::params![since, drv],
144 |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?)),
145 ).context("Failed to query build summary")?;
146
147 let (subst_count, subst_total_ms) = conn.query_row(
148 "SELECT COUNT(*), COALESCE(SUM(duration_ms), 0)
149 FROM events INDEXED BY idx_events_type_start WHERE event_type = 108
150 AND (?1 IS NULL OR start_time >= ?1)
151 AND (?2 IS NULL OR drv_path LIKE '%' || ?2 || '%')",
152 rusqlite::params![since, drv],
153 |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?)),
154 ).context("Failed to query substitution summary")?;
155
156 let (download_bytes, download_ms) = conn.query_row(
157 "SELECT COALESCE(SUM(total_bytes), 0), COALESCE(SUM(duration_ms), 0)
158 FROM events INDEXED BY idx_events_type_start WHERE event_type = 101
159 AND (?1 IS NULL OR start_time >= ?1)",
160 rusqlite::params![since],
161 |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?)),
162 ).context("Failed to query download summary")?;
163
164 let mut stmt = conn.prepare(
165 "SELECT cache_url, AVG(duration_ms), COUNT(*)
166 FROM events INDEXED BY idx_events_type_start
167 WHERE event_type = 108 AND cache_url IS NOT NULL
168 AND (?1 IS NULL OR start_time >= ?1)
169 GROUP BY cache_url ORDER BY AVG(duration_ms) DESC",
170 ).context("Failed to prepare cache latency query")?;
171 let cache_latency: Vec<CacheStat> = stmt.query_map(rusqlite::params![since], |r| {
172 Ok(CacheStat { cache_url: r.get(0)?, avg_ms: r.get(1)?, count: r.get(2)? })
173 })?.filter_map(|r| r.ok()).collect();
174
175 Ok((build_count, build_total_ms, subst_count, subst_total_ms, download_bytes, download_ms, cache_latency))
176}
177
178pub fn collect_stats(
179 db: &Mutex<Connection>,
180 since: Option<i64>,
181 drv: Option<&str>,
182 sort: SortField,
183 limit: u32,
184 group: bool,
185 today: Option<TodaySummary>,
186) -> Result<Stats> {
187 assert!(limit > 0, "limit must be > 0");
188
189 let conn = db.lock().unwrap();
190
191 // Fast path: daily_stats + today memory — O(days) instead of O(events).
192 // Falls back to events scan when a drv filter is active (drv_path not in daily_stats)
193 // or when running in direct mode (no daemon, today is None).
194 let (build_count, build_total_ms, subst_count, subst_total_ms,
195 download_bytes, download_ms, cache_latency) =
196 if let (Some(t), None) = (today.as_ref(), drv) {
197 summary_from_cache(&conn, since, t)?
198 } else {
199 summary_from_events(&conn, since, drv)?
200 };
201
202 assert!(build_count >= 0);
203 assert!(subst_count >= 0);
204 assert!(download_bytes >= 0);
205
206 let slowest_builds: Vec<SlowBuild> = if group {
207 let sort_col = match sort {
208 SortField::Duration => "avg_ms",
209 SortField::Count => "cnt",
210 SortField::Name => "drv_path",
211 };
212 let sql = format!(
213 "SELECT drv_path, CAST(ROUND(AVG(duration_ms)) AS INTEGER) as avg_ms, COUNT(*) as cnt
214 FROM events INDEXED BY idx_events_type_start
215 WHERE event_type = 105
216 AND (?1 IS NULL OR start_time >= ?1)
217 AND (?2 IS NULL OR drv_path LIKE '%' || ?2 || '%')
218 GROUP BY drv_path
219 ORDER BY {sort_col} DESC
220 LIMIT ?3"
221 );
222 let mut stmt = conn.prepare(&sql).context("Failed to prepare grouped builds query")?;
223 stmt.query_map(rusqlite::params![since, drv, limit], |r| {
224 Ok(SlowBuild { drv_path: r.get(0)?, duration_ms: r.get(1)?, count: Some(r.get(2)?), text: None })
225 })?.filter_map(|r| r.ok()).collect()
226 } else {
227 let sort_col = match sort {
228 SortField::Duration | SortField::Count => "duration_ms",
229 SortField::Name => "drv_path",
230 };
231 let sql = format!(
232 "SELECT duration_ms, drv_path, text
233 FROM events INDEXED BY idx_events_type_start
234 WHERE event_type = 105
235 AND (?1 IS NULL OR start_time >= ?1)
236 AND (?2 IS NULL OR drv_path LIKE '%' || ?2 || '%')
237 ORDER BY {sort_col} DESC
238 LIMIT ?3"
239 );
240 let mut stmt = conn.prepare(&sql).context("Failed to prepare slowest builds query")?;
241 stmt.query_map(rusqlite::params![since, drv, limit], |r| {
242 Ok(SlowBuild { duration_ms: r.get(0)?, count: None, drv_path: r.get(1)?, text: r.get(2)? })
243 })?.filter_map(|r| r.ok()).collect()
244 };
245
246 Ok(Stats { build_count, build_total_ms, subst_count, subst_total_ms, download_bytes, download_ms, slowest_builds, cache_latency })
247}
248
249// Mann-Whitney U is non-parametric and makes no distributional assumptions,
250// which is appropriate for build times that are right-skewed.
251pub struct MannWhitneyResult {
252 pub p_value: f64,
253}
254
255pub fn mann_whitney_u(a: &[i64], b: &[i64]) -> Option<MannWhitneyResult> {
256 if a.is_empty() || b.is_empty() { return None; }
257
258 let n1 = a.len();
259 let n2 = b.len();
260 let n1f = n1 as f64;
261 let n2f = n2 as f64;
262
263 let mut combined: Vec<(i64, usize)> = a.iter().map(|&v| (v, 0))
264 .chain(b.iter().map(|&v| (v, 1)))
265 .collect();
266 combined.sort_unstable_by_key(|&(v, _)| v);
267
268 let n = combined.len();
269
270 let mut rank_sum_a = 0.0f64;
271 let mut tie_correction = 0.0f64;
272 let mut i = 0;
273 while i < n {
274 let mut j = i;
275 while j < n && combined[j].0 == combined[i].0 { j += 1; }
276 let avg_rank = (i as f64 + 1.0 + j as f64) / 2.0;
277 for k in i..j {
278 if combined[k].1 == 0 { rank_sum_a += avg_rank; }
279 }
280 let t = (j - i) as f64;
281 if t > 1.0 { tie_correction += t * t * t - t; }
282 i = j;
283 }
284
285 assert!(rank_sum_a >= 0.0);
286
287 let u_a = rank_sum_a - n1f * (n1f + 1.0) / 2.0;
288 let u_b = n1f * n2f - u_a;
289
290 assert!(u_a >= 0.0);
291 assert!(u_b >= 0.0);
292 assert!((u_a + u_b - n1f * n2f).abs() < 1e-6, "U_A + U_B must equal n1*n2");
293
294 let cliffs_delta = (u_a - u_b) / (n1f * n2f);
295 assert!(cliffs_delta >= -1.0 - 1e-9);
296 assert!(cliffs_delta <= 1.0 + 1e-9);
297 let _ = cliffs_delta;
298
299 let nf = n as f64;
300 let variance = (n1f * n2f / 12.0) * ((nf + 1.0) - tie_correction / (nf * (nf - 1.0)));
301
302 let p_value = if variance <= 0.0 {
303 1.0
304 } else {
305 let u_min = u_a.min(u_b);
306 let mean_u = n1f * n2f / 2.0;
307 let z = (u_min - mean_u + 0.5) / variance.sqrt();
308 assert!(z <= 0.0 + 1e-9, "z must be non-positive for U_min ≤ mean_U");
309 (2.0 * normal_cdf(z)).min(1.0)
310 };
311
312 assert!(p_value >= 0.0);
313 assert!(p_value <= 1.0);
314
315 Some(MannWhitneyResult { p_value })
316}
317
318fn normal_cdf(z: f64) -> f64 {
319 0.5 * (1.0 + erf_approx(z / std::f64::consts::SQRT_2))
320}
321
322fn erf_approx(x: f64) -> f64 {
323 let t = 1.0 / (1.0 + 0.3275911 * x.abs());
324 let poly = t * (0.254829592
325 + t * (-0.284496736
326 + t * (1.421413741
327 + t * (-1.453152027
328 + t * 1.061405429))));
329 let result = 1.0 - poly * (-x * x).exp();
330 if x >= 0.0 { result } else { -result }
331}
332
333pub fn median_sorted(sorted: &[i64]) -> f64 {
334 assert!(!sorted.is_empty());
335 let n = sorted.len();
336 if n % 2 == 0 {
337 (sorted[n / 2 - 1] + sorted[n / 2]) as f64 / 2.0
338 } else {
339 sorted[n / 2] as f64
340 }
341}
342
343pub fn fmt_ms(ms: i64) -> String {
344 if ms < 1000 {
345 format!("{}ms", ms)
346 } else if ms < 60_000 {
347 format!("{:.1}s", ms as f64 / 1000.0)
348 } else {
349 format!("{}m{:.1}s", ms / 60_000, (ms % 60_000) as f64 / 1000.0)
350 }
351}
352
353fn drv_name(path: &str) -> &str {
354 path.strip_prefix("/nix/store/").unwrap_or(path)
355}
356
357#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, clap::ValueEnum)]
358#[serde(rename_all = "snake_case")]
359pub enum BucketSize {
360 Hour,
361 Day,
362 Week,
363 Month,
364}
365
366impl BucketSize {
367 // Format string must match bucket_label() output exactly so that display
368 // functions and any downstream consumers are consistent.
369 pub fn strftime_fmt(&self) -> &'static str {
370 match self {
371 BucketSize::Hour => "%Y-%m-%dT%H",
372 BucketSize::Day => "%Y-%m-%d",
373 BucketSize::Week => "%Y-W%W",
374 BucketSize::Month => "%Y-%m",
375 }
376 }
377
378 fn col_width(&self) -> usize {
379 match self {
380 BucketSize::Hour => 13,
381 BucketSize::Day => 10,
382 BucketSize::Week => 8,
383 BucketSize::Month => 7,
384 }
385 }
386}
387
388// Compute the display label for the bucket containing ts.
389// Uses the same format strings as strftime_fmt() so labels are consistent
390// with any legacy data or external tooling that used the SQL strftime path.
391fn bucket_label(ts: i64, size: &BucketSize) -> String {
392 let dt = DateTime::from_timestamp(ts, 0)
393 .unwrap_or_default()
394 .with_timezone(&Utc);
395 match size {
396 BucketSize::Hour => dt.format("%Y-%m-%dT%H").to_string(),
397 BucketSize::Day => dt.format("%Y-%m-%d").to_string(),
398 BucketSize::Week => dt.format("%Y-W%W").to_string(),
399 BucketSize::Month => dt.format("%Y-%m").to_string(),
400 }
401}
402
403// Compute the start timestamp of the bucket immediately after the bucket
404// containing ts. Used for integer-comparison bucket detection in the hot loop —
405// avoids a string allocation + comparison per row.
406fn bucket_end(ts: i64, size: &BucketSize) -> i64 {
407 let dt = DateTime::from_timestamp(ts, 0)
408 .unwrap_or_default()
409 .with_timezone(&Utc);
410 match size {
411 BucketSize::Hour => {
412 Utc.with_ymd_and_hms(dt.year(), dt.month(), dt.day(), dt.hour(), 0, 0)
413 .unwrap()
414 .checked_add_signed(chrono::Duration::hours(1))
415 .unwrap()
416 .timestamp()
417 }
418 BucketSize::Day => {
419 Utc.with_ymd_and_hms(dt.year(), dt.month(), dt.day(), 0, 0, 0)
420 .unwrap()
421 .checked_add_signed(chrono::Duration::days(1))
422 .unwrap()
423 .timestamp()
424 }
425 BucketSize::Week => {
426 // Monday-based weeks, matching strftime '%W'.
427 let days_since_monday = dt.weekday().num_days_from_monday() as i64;
428 let monday = dt.date_naive() - chrono::Duration::days(days_since_monday);
429 Utc.from_utc_datetime(&monday.and_hms_opt(0, 0, 0).unwrap())
430 .checked_add_signed(chrono::Duration::weeks(1))
431 .unwrap()
432 .timestamp()
433 }
434 BucketSize::Month => {
435 let (year, month) = if dt.month() == 12 { (dt.year() + 1, 1u32) } else { (dt.year(), dt.month() + 1) };
436 Utc.with_ymd_and_hms(year, month, 1, 0, 0, 0).unwrap().timestamp()
437 }
438 }
439}
440
441#[derive(Debug, Serialize, Deserialize)]
442pub struct TrendBucket {
443 pub bucket: String,
444 pub build_count: i64,
445 pub build_total_ms: i64,
446 pub subst_count: i64,
447 pub subst_total_ms: i64,
448 pub download_bytes: i64,
449 // Only populated when full duration data is requested (--output test).
450 #[serde(default)]
451 pub build_durations: Vec<i64>,
452 #[serde(default)]
453 pub subst_durations: Vec<i64>,
454}
455
456#[derive(Debug, Serialize, Deserialize)]
457pub struct Trend {
458 pub buckets: Vec<TrendBucket>,
459 pub bucket_size: BucketSize,
460 pub drv_filter: Option<String>,
461}
462
463// Events-table scan. Returns buckets with individual duration samples populated.
464// Required when individual samples are needed (--output test / Mann-Whitney) or
465// when a drv filter or hour granularity rules out the daily_stats path.
466//
467// Raw start_time integers from SQLite — bucket boundaries detected with a single
468// integer comparison per row (ts >= next_bucket) so string allocs happen only
469// once per bucket, not once per row.
470fn trend_from_events(
471 conn: &Connection,
472 since: Option<i64>,
473 bucket: &BucketSize,
474 drv: Option<&str>,
475) -> Result<Vec<TrendBucket>> {
476 // FileTransfer (101) has NULL drv_path and is intentionally excluded by the drv filter.
477 // INDEXED BY forces the covering start_time-first index so all four projected columns
478 // (start_time, event_type, duration_ms, total_bytes) are served without table lookups.
479 let mut stmt = conn.prepare(
480 "SELECT start_time, event_type, duration_ms, total_bytes
481 FROM events INDEXED BY idx_events_start_cover
482 WHERE event_type IN (101, 105, 108)
483 AND (?1 IS NULL OR start_time >= ?1)
484 AND (?2 IS NULL OR drv_path LIKE '%' || ?2 || '%')
485 ORDER BY start_time ASC",
486 ).context("Failed to prepare trend query")?;
487
488 let mut buckets: Vec<TrendBucket> = vec![];
489 let mut next_bucket: i64 = 0;
490
491 for row in stmt.query_map(rusqlite::params![since, drv], |r| {
492 Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?, r.get::<_, i64>(2)?, r.get::<_, i64>(3)?))
493 })?.filter_map(|r| r.ok()) {
494 let (ts, etype, dur, bytes) = row;
495 assert!(ts >= 0);
496
497 if ts >= next_bucket {
498 buckets.push(TrendBucket {
499 bucket: bucket_label(ts, bucket),
500 build_count: 0, build_total_ms: 0,
501 subst_count: 0, subst_total_ms: 0,
502 download_bytes: 0,
503 build_durations: vec![],
504 subst_durations: vec![],
505 });
506 next_bucket = bucket_end(ts, bucket);
507 }
508
509 let last = buckets.last_mut().unwrap();
510 match etype {
511 105 => { assert!(dur >= 0); last.build_count += 1; last.build_total_ms += dur; last.build_durations.push(dur); }
512 108 => { assert!(dur >= 0); last.subst_count += 1; last.subst_total_ms += dur; last.subst_durations.push(dur); }
513 101 => { assert!(bytes >= 0); last.download_bytes += bytes; }
514 _ => {}
515 }
516 }
517
518 Ok(buckets)
519}
520
521// daily_stats query. O(days) — does not populate build_durations/subst_durations.
522// Closed days come from the table; today's partial data is merged from the in-memory
523// snapshot so the current day is always included when running via the daemon.
524fn trend_from_daily_stats(
525 conn: &Connection,
526 since: Option<i64>,
527 bucket: &BucketSize,
528 today: Option<TodaySummary>,
529) -> Result<Vec<TrendBucket>> {
530 let since_day: Option<i64> = since.map(|ts| ts / 86400);
531 let today_day = today.as_ref().map(|t| t.day)
532 .unwrap_or_else(|| Utc::now().timestamp() / 86400);
533
534 let mut stmt = conn.prepare(
535 "SELECT day * 86400, event_type, count, total_ms, total_bytes
536 FROM daily_stats
537 WHERE event_type IN (101, 105, 108)
538 AND (?1 IS NULL OR day >= ?1)
539 AND day < ?2
540 ORDER BY day ASC",
541 ).context("Failed to prepare trend query")?;
542
543 let mut buckets: Vec<TrendBucket> = vec![];
544 let mut next_bucket: i64 = 0;
545
546 for row in stmt.query_map(rusqlite::params![since_day, today_day], |r| {
547 Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?, r.get::<_, i64>(2)?, r.get::<_, i64>(3)?, r.get::<_, i64>(4)?))
548 })?.filter_map(|r| r.ok()) {
549 let (ts, etype, count, total_ms, total_bytes) = row;
550 assert!(ts >= 0);
551 assert!(count >= 0);
552
553 if ts >= next_bucket {
554 buckets.push(TrendBucket {
555 bucket: bucket_label(ts, bucket),
556 build_count: 0, build_total_ms: 0,
557 subst_count: 0, subst_total_ms: 0,
558 download_bytes: 0,
559 build_durations: vec![],
560 subst_durations: vec![],
561 });
562 next_bucket = bucket_end(ts, bucket);
563 }
564
565 let last = buckets.last_mut().unwrap();
566 match etype {
567 105 => { last.build_count += count; last.build_total_ms += total_ms; }
568 108 => { last.subst_count += count; last.subst_total_ms += total_ms; }
569 101 => { last.download_bytes += total_bytes; }
570 _ => {}
571 }
572 }
573
574 if let Some(t) = today {
575 if t.build_count > 0 || t.subst_count > 0 || t.download_bytes > 0 {
576 let today_ts = t.day * 86400;
577 if since.map_or(true, |s| today_ts >= s) {
578 if today_ts >= next_bucket {
579 buckets.push(TrendBucket {
580 bucket: bucket_label(today_ts, bucket),
581 build_count: t.build_count,
582 build_total_ms: t.build_total_ms,
583 subst_count: t.subst_count,
584 subst_total_ms: t.subst_total_ms,
585 download_bytes: t.download_bytes,
586 build_durations: vec![],
587 subst_durations: vec![],
588 });
589 } else if let Some(last) = buckets.last_mut() {
590 // Today falls in the same week/month bucket as the last historical day.
591 last.build_count += t.build_count;
592 last.build_total_ms += t.build_total_ms;
593 last.subst_count += t.subst_count;
594 last.subst_total_ms += t.subst_total_ms;
595 last.download_bytes += t.download_bytes;
596 }
597 }
598 }
599 }
600
601 Ok(buckets)
602}
603
604pub fn collect_trend(
605 db: &Mutex<Connection>,
606 since: Option<i64>,
607 bucket: BucketSize,
608 drv: Option<String>,
609 today: Option<TodaySummary>,
610 full: bool,
611) -> Result<Trend> {
612 if let Some(ref d) = drv {
613 assert!(!d.is_empty(), "drv filter must not be empty");
614 }
615
616 let conn = db.lock().unwrap();
617
618 // daily_stats path when individual samples are not needed, no drv filter is active
619 // (daily_stats has no per-drv breakdown), and bucket granularity is at least a day
620 // (daily_stats has no intra-day resolution).
621 let buckets = if !full && drv.is_none() && !matches!(bucket, BucketSize::Hour) {
622 trend_from_daily_stats(&conn, since, &bucket, today)?
623 } else {
624 trend_from_events(&conn, since, &bucket, drv.as_deref())?
625 };
626
627 for i in 1..buckets.len() {
628 assert!(buckets[i].bucket > buckets[i - 1].bucket, "buckets must be strictly ascending");
629 }
630
631 Ok(Trend { buckets, bucket_size: bucket, drv_filter: drv })
632}
633
634pub fn display_trend(trend: &Trend) {
635 let has_downloads = trend.drv_filter.is_none()
636 && trend.buckets.iter().any(|b| b.download_bytes > 0);
637 let bw = trend.bucket_size.col_width();
638
639 if let Some(ref drv) = trend.drv_filter {
640 println!("filter: {}", drv);
641 }
642
643 print!("{:<bw$} {:>6} {:>10} {:>6} {:>10}", "period", "builds", "build avg", "subst", "subst avg");
644 if has_downloads { print!(" {:>8}", "dl (MB)"); }
645 println!();
646
647 if trend.buckets.is_empty() {
648 println!("(no data)");
649 return;
650 }
651
652 for b in &trend.buckets {
653 let build_avg = if b.build_count > 0 { b.build_total_ms / b.build_count } else { 0 };
654 let subst_avg = if b.subst_count > 0 { b.subst_total_ms / b.subst_count } else { 0 };
655
656 print!("{:<bw$} {:>6} {:>10} {:>6} {:>10}",
657 b.bucket, b.build_count, fmt_ms(build_avg),
658 b.subst_count, fmt_ms(subst_avg));
659 if has_downloads { print!(" {:>8.1}", b.download_bytes as f64 / 1_048_576.0); }
660 println!();
661 }
662}
663
664fn print_test_section<F>(label: &str, buckets: &[TrendBucket], get_durs: F, bw: usize)
665where
666 F: Fn(&TrendBucket) -> &[i64],
667{
668 let any_data = buckets.iter().any(|b| !get_durs(b).is_empty());
669 if !any_data { return; }
670
671 println!("{}", label);
672 println!("{:<bw$} {:>5} {:>10} {:>7} {:>8}", "period", "n", "median", "Δ", "p-value");
673
674 for (i, b) in buckets.iter().enumerate() {
675 let durs = get_durs(b);
676 let mut sorted = durs.to_vec(); sorted.sort_unstable();
677 let med = if sorted.is_empty() { 0.0 } else { median_sorted(&sorted) };
678
679 let (delta_str, p_str) = if i == 0 || get_durs(&buckets[i - 1]).is_empty() {
680 (String::new(), String::new())
681 } else {
682 let prev = get_durs(&buckets[i - 1]);
683 let mut prev_sorted = prev.to_vec(); prev_sorted.sort_unstable();
684 let prev_med = median_sorted(&prev_sorted);
685
686 let delta = if prev_med > 0.0 {
687 let pct = (med - prev_med) / prev_med * 100.0;
688 format!("{}{:.0}%", if pct >= 0.0 { "+" } else { "" }, pct)
689 } else {
690 String::new()
691 };
692
693 let p = match mann_whitney_u(prev, durs) {
694 Some(r) => format!("{:.3}", r.p_value),
695 None => String::new(),
696 };
697
698 (delta, p)
699 };
700
701 println!("{:<bw$} {:>5} {:>10} {:>7} {:>8}",
702 b.bucket, durs.len(), fmt_ms(med as i64), delta_str, p_str);
703 }
704 println!();
705}
706
707pub fn display_trend_test(trend: &Trend) {
708 let bw = trend.bucket_size.col_width();
709
710 if let Some(ref drv) = trend.drv_filter {
711 println!("filter: {}", drv);
712 }
713
714 print_test_section("builds", &trend.buckets, |b| &b.build_durations, bw);
715 print_test_section("substitutions", &trend.buckets, |b| &b.subst_durations, bw);
716
717 println!("Mann-Whitney U (two-tailed). H0: adjacent periods have the same duration distribution.");
718}
719
720pub fn output_csv_trend(trend: &Trend) {
721 println!("period,build_count,build_avg_ms,subst_count,subst_avg_ms,download_bytes");
722 for b in &trend.buckets {
723 let build_avg = if b.build_count > 0 { b.build_total_ms / b.build_count } else { 0 };
724 let subst_avg = if b.subst_count > 0 { b.subst_total_ms / b.subst_count } else { 0 };
725 assert!(build_avg >= 0);
726 assert!(subst_avg >= 0);
727 println!("{},{},{},{},{},{}", b.bucket, b.build_count, build_avg, b.subst_count, subst_avg, b.download_bytes);
728 }
729}
730
731pub fn display_stats(stats: Stats) {
732 let build_avg = if stats.build_count > 0 { stats.build_total_ms / stats.build_count } else { 0 };
733 let subst_avg = if stats.subst_count > 0 { stats.subst_total_ms / stats.subst_count } else { 0 };
734 let mb = stats.download_bytes as f64 / 1_048_576.0;
735 let dl_speed = if stats.download_ms > 0 { mb / (stats.download_ms as f64 / 1000.0) } else { 0.0 };
736
737 println!("{:<14} {:>6} total {:>9} avg {:>9}", "built", stats.build_count, fmt_ms(stats.build_total_ms), fmt_ms(build_avg));
738 println!("{:<14} {:>6} total {:>9} avg {:>9}", "substituted", stats.subst_count, fmt_ms(stats.subst_total_ms), fmt_ms(subst_avg));
739 println!("{:<14} {:>8.1} MB avg {:>6.1} MB/s", "downloaded", mb, dl_speed);
740
741 if !stats.slowest_builds.is_empty() {
742 let grouped = stats.slowest_builds.first().map(|b| b.count.is_some()).unwrap_or(false);
743 println!();
744 for row in &stats.slowest_builds {
745 let path = row.drv_path.as_deref().or(row.text.as_deref()).unwrap_or("?");
746 if grouped {
747 println!("{:>9} ({:>3}x) {}", fmt_ms(row.duration_ms), row.count.unwrap_or(0), drv_name(path));
748 } else {
749 println!("{:>9} {}", fmt_ms(row.duration_ms), drv_name(path));
750 }
751 }
752 }
753
754 if !stats.cache_latency.is_empty() {
755 let url_w = stats.cache_latency.iter().map(|r| r.cache_url.len()).max().unwrap_or(0);
756 println!();
757 for row in &stats.cache_latency {
758 println!("{:<width$} avg {:>7} {:>6} queries", row.cache_url, fmt_ms(row.avg_ms as i64), row.count, width = url_w);
759 }
760 }
761}