Nix Observability Daemon
observability nix
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

at master 761 lines 29 kB view raw
1use anyhow::{Context, Result}; 2use chrono::prelude::*; 3use rusqlite::Connection; 4use serde::{Deserialize, Serialize}; 5use std::collections::HashMap; 6use std::sync::Mutex; 7 8#[derive(Debug, Clone, Default, Serialize, Deserialize, clap::ValueEnum)] 9#[serde(rename_all = "snake_case")] 10pub enum SortField { 11 #[default] 12 Duration, 13 Count, 14 Name, 15} 16 17#[derive(Debug, Serialize, Deserialize)] 18pub struct Stats { 19 pub build_count: i64, 20 pub build_total_ms: i64, 21 pub subst_count: i64, 22 pub subst_total_ms: i64, 23 pub download_bytes: i64, 24 pub download_ms: i64, 25 pub slowest_builds: Vec<SlowBuild>, 26 pub cache_latency: Vec<CacheStat>, 27} 28 29#[derive(Debug, Serialize, Deserialize)] 30pub struct SlowBuild { 31 pub duration_ms: i64, 32 pub count: Option<i64>, 33 pub drv_path: Option<String>, 34 pub text: Option<String>, 35} 36 37#[derive(Debug, Serialize, Deserialize)] 38pub struct CacheStat { 39 pub cache_url: String, 40 pub avg_ms: f64, 41 pub count: i64, 42} 43 44// Snapshot of the current day's in-memory aggregates, passed from the daemon 45// to collect_stats so the summary path never needs to scan the events table. 46// cache: substituter URL → (total_ms, count) 47#[derive(Clone, Default)] 48pub struct TodaySummary { 49 pub day: i64, 50 pub build_count: i64, 51 pub build_total_ms: i64, 52 pub subst_count: i64, 53 pub subst_total_ms: i64, 54 pub download_count: i64, 55 pub download_bytes: i64, 56 pub download_ms: i64, 57 pub cache: HashMap<String, (i64, i64)>, 58} 59 60// Fast path: query daily_stats for closed days, merge today's in-memory snapshot. 61// Returns (build_count, build_ms, subst_count, subst_ms, dl_bytes, dl_ms, cache_latency). 62fn summary_from_cache( 63 conn: &Connection, 64 since: Option<i64>, 65 today: &TodaySummary, 66) -> Result<(i64, i64, i64, i64, i64, i64, Vec<CacheStat>)> { 67 assert!(today.day > 0); 68 let since_day: Option<i64> = since.map(|ts| ts / 86400); 69 let today_in_range = since_day.map_or(true, |sd| today.day >= sd); 70 71 let (hbc, hbms) = conn.query_row( 72 "SELECT COALESCE(SUM(count),0), COALESCE(SUM(total_ms),0) 73 FROM daily_stats WHERE event_type = 105 74 AND (?1 IS NULL OR day >= ?1) AND day < ?2", 75 rusqlite::params![since_day, today.day], 76 |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?)), 77 ).context("Failed to query build summary from daily_stats")?; 78 79 let (hsc, hsms) = conn.query_row( 80 "SELECT COALESCE(SUM(count),0), COALESCE(SUM(total_ms),0) 81 FROM daily_stats WHERE event_type = 108 82 AND (?1 IS NULL OR day >= ?1) AND day < ?2", 83 rusqlite::params![since_day, today.day], 84 |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?)), 85 ).context("Failed to query subst summary from daily_stats")?; 86 87 let (hdb, hdms) = conn.query_row( 88 "SELECT COALESCE(SUM(total_bytes),0), COALESCE(SUM(total_ms),0) 89 FROM daily_stats WHERE event_type = 101 90 AND (?1 IS NULL OR day >= ?1) AND day < ?2", 91 rusqlite::params![since_day, today.day], 92 |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?)), 93 ).context("Failed to query download summary from daily_stats")?; 94 95 let build_count = hbc + if today_in_range { today.build_count } else { 0 }; 96 let build_total_ms = hbms + if today_in_range { today.build_total_ms } else { 0 }; 97 let subst_count = hsc + if today_in_range { today.subst_count } else { 0 }; 98 let subst_total_ms = hsms + if today_in_range { today.subst_total_ms } else { 0 }; 99 let download_bytes = hdb + if today_in_range { today.download_bytes } else { 0 }; 100 let download_ms = hdms + if today_in_range { today.download_ms } else { 0 }; 101 102 // Cache latency: closed days from daily_cache_stats, today from memory. 103 let mut cache_map: HashMap<String, (i64, i64)> = HashMap::new(); 104 let mut stmt = conn.prepare( 105 "SELECT cache_url, SUM(total_ms), SUM(count) 106 FROM daily_cache_stats 107 WHERE (?1 IS NULL OR day >= ?1) AND day < ?2 108 GROUP BY cache_url", 109 ).context("Failed to prepare cache latency query")?; 110 for row in stmt.query_map(rusqlite::params![since_day, today.day], |r| { 111 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)?, r.get::<_, i64>(2)?)) 112 })?.filter_map(|r| r.ok()) { 113 let (url, ms, cnt) = row; 114 let e = cache_map.entry(url).or_insert((0, 0)); 115 e.0 += ms; e.1 += cnt; 116 } 117 if today_in_range { 118 for (url, &(ms, cnt)) in &today.cache { 119 let e = cache_map.entry(url.clone()).or_insert((0, 0)); 120 e.0 += ms; e.1 += cnt; 121 } 122 } 123 let mut cache_latency: Vec<CacheStat> = cache_map.into_iter() 124 .filter(|(_, (_, cnt))| *cnt > 0) 125 .map(|(url, (ms, cnt))| CacheStat { cache_url: url, avg_ms: ms as f64 / cnt as f64, count: cnt }) 126 .collect(); 127 cache_latency.sort_by(|a, b| b.avg_ms.partial_cmp(&a.avg_ms).unwrap_or(std::cmp::Ordering::Equal)); 128 129 Ok((build_count, build_total_ms, subst_count, subst_total_ms, download_bytes, download_ms, cache_latency)) 130} 131 132// Slow path: full events table scan (used in direct mode or when drv filter is active). 133fn summary_from_events( 134 conn: &Connection, 135 since: Option<i64>, 136 drv: Option<&str>, 137) -> Result<(i64, i64, i64, i64, i64, i64, Vec<CacheStat>)> { 138 let (build_count, build_total_ms) = conn.query_row( 139 "SELECT COUNT(*), COALESCE(SUM(duration_ms), 0) 140 FROM events INDEXED BY idx_events_type_start WHERE event_type = 105 141 AND (?1 IS NULL OR start_time >= ?1) 142 AND (?2 IS NULL OR drv_path LIKE '%' || ?2 || '%')", 143 rusqlite::params![since, drv], 144 |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?)), 145 ).context("Failed to query build summary")?; 146 147 let (subst_count, subst_total_ms) = conn.query_row( 148 "SELECT COUNT(*), COALESCE(SUM(duration_ms), 0) 149 FROM events INDEXED BY idx_events_type_start WHERE event_type = 108 150 AND (?1 IS NULL OR start_time >= ?1) 151 AND (?2 IS NULL OR drv_path LIKE '%' || ?2 || '%')", 152 rusqlite::params![since, drv], 153 |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?)), 154 ).context("Failed to query substitution summary")?; 155 156 let (download_bytes, download_ms) = conn.query_row( 157 "SELECT COALESCE(SUM(total_bytes), 0), COALESCE(SUM(duration_ms), 0) 158 FROM events INDEXED BY idx_events_type_start WHERE event_type = 101 159 AND (?1 IS NULL OR start_time >= ?1)", 160 rusqlite::params![since], 161 |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?)), 162 ).context("Failed to query download summary")?; 163 164 let mut stmt = conn.prepare( 165 "SELECT cache_url, AVG(duration_ms), COUNT(*) 166 FROM events INDEXED BY idx_events_type_start 167 WHERE event_type = 108 AND cache_url IS NOT NULL 168 AND (?1 IS NULL OR start_time >= ?1) 169 GROUP BY cache_url ORDER BY AVG(duration_ms) DESC", 170 ).context("Failed to prepare cache latency query")?; 171 let cache_latency: Vec<CacheStat> = stmt.query_map(rusqlite::params![since], |r| { 172 Ok(CacheStat { cache_url: r.get(0)?, avg_ms: r.get(1)?, count: r.get(2)? }) 173 })?.filter_map(|r| r.ok()).collect(); 174 175 Ok((build_count, build_total_ms, subst_count, subst_total_ms, download_bytes, download_ms, cache_latency)) 176} 177 178pub fn collect_stats( 179 db: &Mutex<Connection>, 180 since: Option<i64>, 181 drv: Option<&str>, 182 sort: SortField, 183 limit: u32, 184 group: bool, 185 today: Option<TodaySummary>, 186) -> Result<Stats> { 187 assert!(limit > 0, "limit must be > 0"); 188 189 let conn = db.lock().unwrap(); 190 191 // Fast path: daily_stats + today memory — O(days) instead of O(events). 192 // Falls back to events scan when a drv filter is active (drv_path not in daily_stats) 193 // or when running in direct mode (no daemon, today is None). 194 let (build_count, build_total_ms, subst_count, subst_total_ms, 195 download_bytes, download_ms, cache_latency) = 196 if let (Some(t), None) = (today.as_ref(), drv) { 197 summary_from_cache(&conn, since, t)? 198 } else { 199 summary_from_events(&conn, since, drv)? 200 }; 201 202 assert!(build_count >= 0); 203 assert!(subst_count >= 0); 204 assert!(download_bytes >= 0); 205 206 let slowest_builds: Vec<SlowBuild> = if group { 207 let sort_col = match sort { 208 SortField::Duration => "avg_ms", 209 SortField::Count => "cnt", 210 SortField::Name => "drv_path", 211 }; 212 let sql = format!( 213 "SELECT drv_path, CAST(ROUND(AVG(duration_ms)) AS INTEGER) as avg_ms, COUNT(*) as cnt 214 FROM events INDEXED BY idx_events_type_start 215 WHERE event_type = 105 216 AND (?1 IS NULL OR start_time >= ?1) 217 AND (?2 IS NULL OR drv_path LIKE '%' || ?2 || '%') 218 GROUP BY drv_path 219 ORDER BY {sort_col} DESC 220 LIMIT ?3" 221 ); 222 let mut stmt = conn.prepare(&sql).context("Failed to prepare grouped builds query")?; 223 stmt.query_map(rusqlite::params![since, drv, limit], |r| { 224 Ok(SlowBuild { drv_path: r.get(0)?, duration_ms: r.get(1)?, count: Some(r.get(2)?), text: None }) 225 })?.filter_map(|r| r.ok()).collect() 226 } else { 227 let sort_col = match sort { 228 SortField::Duration | SortField::Count => "duration_ms", 229 SortField::Name => "drv_path", 230 }; 231 let sql = format!( 232 "SELECT duration_ms, drv_path, text 233 FROM events INDEXED BY idx_events_type_start 234 WHERE event_type = 105 235 AND (?1 IS NULL OR start_time >= ?1) 236 AND (?2 IS NULL OR drv_path LIKE '%' || ?2 || '%') 237 ORDER BY {sort_col} DESC 238 LIMIT ?3" 239 ); 240 let mut stmt = conn.prepare(&sql).context("Failed to prepare slowest builds query")?; 241 stmt.query_map(rusqlite::params![since, drv, limit], |r| { 242 Ok(SlowBuild { duration_ms: r.get(0)?, count: None, drv_path: r.get(1)?, text: r.get(2)? }) 243 })?.filter_map(|r| r.ok()).collect() 244 }; 245 246 Ok(Stats { build_count, build_total_ms, subst_count, subst_total_ms, download_bytes, download_ms, slowest_builds, cache_latency }) 247} 248 249// Mann-Whitney U is non-parametric and makes no distributional assumptions, 250// which is appropriate for build times that are right-skewed. 251pub struct MannWhitneyResult { 252 pub p_value: f64, 253} 254 255pub fn mann_whitney_u(a: &[i64], b: &[i64]) -> Option<MannWhitneyResult> { 256 if a.is_empty() || b.is_empty() { return None; } 257 258 let n1 = a.len(); 259 let n2 = b.len(); 260 let n1f = n1 as f64; 261 let n2f = n2 as f64; 262 263 let mut combined: Vec<(i64, usize)> = a.iter().map(|&v| (v, 0)) 264 .chain(b.iter().map(|&v| (v, 1))) 265 .collect(); 266 combined.sort_unstable_by_key(|&(v, _)| v); 267 268 let n = combined.len(); 269 270 let mut rank_sum_a = 0.0f64; 271 let mut tie_correction = 0.0f64; 272 let mut i = 0; 273 while i < n { 274 let mut j = i; 275 while j < n && combined[j].0 == combined[i].0 { j += 1; } 276 let avg_rank = (i as f64 + 1.0 + j as f64) / 2.0; 277 for k in i..j { 278 if combined[k].1 == 0 { rank_sum_a += avg_rank; } 279 } 280 let t = (j - i) as f64; 281 if t > 1.0 { tie_correction += t * t * t - t; } 282 i = j; 283 } 284 285 assert!(rank_sum_a >= 0.0); 286 287 let u_a = rank_sum_a - n1f * (n1f + 1.0) / 2.0; 288 let u_b = n1f * n2f - u_a; 289 290 assert!(u_a >= 0.0); 291 assert!(u_b >= 0.0); 292 assert!((u_a + u_b - n1f * n2f).abs() < 1e-6, "U_A + U_B must equal n1*n2"); 293 294 let cliffs_delta = (u_a - u_b) / (n1f * n2f); 295 assert!(cliffs_delta >= -1.0 - 1e-9); 296 assert!(cliffs_delta <= 1.0 + 1e-9); 297 let _ = cliffs_delta; 298 299 let nf = n as f64; 300 let variance = (n1f * n2f / 12.0) * ((nf + 1.0) - tie_correction / (nf * (nf - 1.0))); 301 302 let p_value = if variance <= 0.0 { 303 1.0 304 } else { 305 let u_min = u_a.min(u_b); 306 let mean_u = n1f * n2f / 2.0; 307 let z = (u_min - mean_u + 0.5) / variance.sqrt(); 308 assert!(z <= 0.0 + 1e-9, "z must be non-positive for U_min ≤ mean_U"); 309 (2.0 * normal_cdf(z)).min(1.0) 310 }; 311 312 assert!(p_value >= 0.0); 313 assert!(p_value <= 1.0); 314 315 Some(MannWhitneyResult { p_value }) 316} 317 318fn normal_cdf(z: f64) -> f64 { 319 0.5 * (1.0 + erf_approx(z / std::f64::consts::SQRT_2)) 320} 321 322fn erf_approx(x: f64) -> f64 { 323 let t = 1.0 / (1.0 + 0.3275911 * x.abs()); 324 let poly = t * (0.254829592 325 + t * (-0.284496736 326 + t * (1.421413741 327 + t * (-1.453152027 328 + t * 1.061405429)))); 329 let result = 1.0 - poly * (-x * x).exp(); 330 if x >= 0.0 { result } else { -result } 331} 332 333pub fn median_sorted(sorted: &[i64]) -> f64 { 334 assert!(!sorted.is_empty()); 335 let n = sorted.len(); 336 if n % 2 == 0 { 337 (sorted[n / 2 - 1] + sorted[n / 2]) as f64 / 2.0 338 } else { 339 sorted[n / 2] as f64 340 } 341} 342 343pub fn fmt_ms(ms: i64) -> String { 344 if ms < 1000 { 345 format!("{}ms", ms) 346 } else if ms < 60_000 { 347 format!("{:.1}s", ms as f64 / 1000.0) 348 } else { 349 format!("{}m{:.1}s", ms / 60_000, (ms % 60_000) as f64 / 1000.0) 350 } 351} 352 353fn drv_name(path: &str) -> &str { 354 path.strip_prefix("/nix/store/").unwrap_or(path) 355} 356 357#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, clap::ValueEnum)] 358#[serde(rename_all = "snake_case")] 359pub enum BucketSize { 360 Hour, 361 Day, 362 Week, 363 Month, 364} 365 366impl BucketSize { 367 // Format string must match bucket_label() output exactly so that display 368 // functions and any downstream consumers are consistent. 369 pub fn strftime_fmt(&self) -> &'static str { 370 match self { 371 BucketSize::Hour => "%Y-%m-%dT%H", 372 BucketSize::Day => "%Y-%m-%d", 373 BucketSize::Week => "%Y-W%W", 374 BucketSize::Month => "%Y-%m", 375 } 376 } 377 378 fn col_width(&self) -> usize { 379 match self { 380 BucketSize::Hour => 13, 381 BucketSize::Day => 10, 382 BucketSize::Week => 8, 383 BucketSize::Month => 7, 384 } 385 } 386} 387 388// Compute the display label for the bucket containing ts. 389// Uses the same format strings as strftime_fmt() so labels are consistent 390// with any legacy data or external tooling that used the SQL strftime path. 391fn bucket_label(ts: i64, size: &BucketSize) -> String { 392 let dt = DateTime::from_timestamp(ts, 0) 393 .unwrap_or_default() 394 .with_timezone(&Utc); 395 match size { 396 BucketSize::Hour => dt.format("%Y-%m-%dT%H").to_string(), 397 BucketSize::Day => dt.format("%Y-%m-%d").to_string(), 398 BucketSize::Week => dt.format("%Y-W%W").to_string(), 399 BucketSize::Month => dt.format("%Y-%m").to_string(), 400 } 401} 402 403// Compute the start timestamp of the bucket immediately after the bucket 404// containing ts. Used for integer-comparison bucket detection in the hot loop — 405// avoids a string allocation + comparison per row. 406fn bucket_end(ts: i64, size: &BucketSize) -> i64 { 407 let dt = DateTime::from_timestamp(ts, 0) 408 .unwrap_or_default() 409 .with_timezone(&Utc); 410 match size { 411 BucketSize::Hour => { 412 Utc.with_ymd_and_hms(dt.year(), dt.month(), dt.day(), dt.hour(), 0, 0) 413 .unwrap() 414 .checked_add_signed(chrono::Duration::hours(1)) 415 .unwrap() 416 .timestamp() 417 } 418 BucketSize::Day => { 419 Utc.with_ymd_and_hms(dt.year(), dt.month(), dt.day(), 0, 0, 0) 420 .unwrap() 421 .checked_add_signed(chrono::Duration::days(1)) 422 .unwrap() 423 .timestamp() 424 } 425 BucketSize::Week => { 426 // Monday-based weeks, matching strftime '%W'. 427 let days_since_monday = dt.weekday().num_days_from_monday() as i64; 428 let monday = dt.date_naive() - chrono::Duration::days(days_since_monday); 429 Utc.from_utc_datetime(&monday.and_hms_opt(0, 0, 0).unwrap()) 430 .checked_add_signed(chrono::Duration::weeks(1)) 431 .unwrap() 432 .timestamp() 433 } 434 BucketSize::Month => { 435 let (year, month) = if dt.month() == 12 { (dt.year() + 1, 1u32) } else { (dt.year(), dt.month() + 1) }; 436 Utc.with_ymd_and_hms(year, month, 1, 0, 0, 0).unwrap().timestamp() 437 } 438 } 439} 440 441#[derive(Debug, Serialize, Deserialize)] 442pub struct TrendBucket { 443 pub bucket: String, 444 pub build_count: i64, 445 pub build_total_ms: i64, 446 pub subst_count: i64, 447 pub subst_total_ms: i64, 448 pub download_bytes: i64, 449 // Only populated when full duration data is requested (--output test). 450 #[serde(default)] 451 pub build_durations: Vec<i64>, 452 #[serde(default)] 453 pub subst_durations: Vec<i64>, 454} 455 456#[derive(Debug, Serialize, Deserialize)] 457pub struct Trend { 458 pub buckets: Vec<TrendBucket>, 459 pub bucket_size: BucketSize, 460 pub drv_filter: Option<String>, 461} 462 463// Events-table scan. Returns buckets with individual duration samples populated. 464// Required when individual samples are needed (--output test / Mann-Whitney) or 465// when a drv filter or hour granularity rules out the daily_stats path. 466// 467// Raw start_time integers from SQLite — bucket boundaries detected with a single 468// integer comparison per row (ts >= next_bucket) so string allocs happen only 469// once per bucket, not once per row. 470fn trend_from_events( 471 conn: &Connection, 472 since: Option<i64>, 473 bucket: &BucketSize, 474 drv: Option<&str>, 475) -> Result<Vec<TrendBucket>> { 476 // FileTransfer (101) has NULL drv_path and is intentionally excluded by the drv filter. 477 // INDEXED BY forces the covering start_time-first index so all four projected columns 478 // (start_time, event_type, duration_ms, total_bytes) are served without table lookups. 479 let mut stmt = conn.prepare( 480 "SELECT start_time, event_type, duration_ms, total_bytes 481 FROM events INDEXED BY idx_events_start_cover 482 WHERE event_type IN (101, 105, 108) 483 AND (?1 IS NULL OR start_time >= ?1) 484 AND (?2 IS NULL OR drv_path LIKE '%' || ?2 || '%') 485 ORDER BY start_time ASC", 486 ).context("Failed to prepare trend query")?; 487 488 let mut buckets: Vec<TrendBucket> = vec![]; 489 let mut next_bucket: i64 = 0; 490 491 for row in stmt.query_map(rusqlite::params![since, drv], |r| { 492 Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?, r.get::<_, i64>(2)?, r.get::<_, i64>(3)?)) 493 })?.filter_map(|r| r.ok()) { 494 let (ts, etype, dur, bytes) = row; 495 assert!(ts >= 0); 496 497 if ts >= next_bucket { 498 buckets.push(TrendBucket { 499 bucket: bucket_label(ts, bucket), 500 build_count: 0, build_total_ms: 0, 501 subst_count: 0, subst_total_ms: 0, 502 download_bytes: 0, 503 build_durations: vec![], 504 subst_durations: vec![], 505 }); 506 next_bucket = bucket_end(ts, bucket); 507 } 508 509 let last = buckets.last_mut().unwrap(); 510 match etype { 511 105 => { assert!(dur >= 0); last.build_count += 1; last.build_total_ms += dur; last.build_durations.push(dur); } 512 108 => { assert!(dur >= 0); last.subst_count += 1; last.subst_total_ms += dur; last.subst_durations.push(dur); } 513 101 => { assert!(bytes >= 0); last.download_bytes += bytes; } 514 _ => {} 515 } 516 } 517 518 Ok(buckets) 519} 520 521// daily_stats query. O(days) — does not populate build_durations/subst_durations. 522// Closed days come from the table; today's partial data is merged from the in-memory 523// snapshot so the current day is always included when running via the daemon. 524fn trend_from_daily_stats( 525 conn: &Connection, 526 since: Option<i64>, 527 bucket: &BucketSize, 528 today: Option<TodaySummary>, 529) -> Result<Vec<TrendBucket>> { 530 let since_day: Option<i64> = since.map(|ts| ts / 86400); 531 let today_day = today.as_ref().map(|t| t.day) 532 .unwrap_or_else(|| Utc::now().timestamp() / 86400); 533 534 let mut stmt = conn.prepare( 535 "SELECT day * 86400, event_type, count, total_ms, total_bytes 536 FROM daily_stats 537 WHERE event_type IN (101, 105, 108) 538 AND (?1 IS NULL OR day >= ?1) 539 AND day < ?2 540 ORDER BY day ASC", 541 ).context("Failed to prepare trend query")?; 542 543 let mut buckets: Vec<TrendBucket> = vec![]; 544 let mut next_bucket: i64 = 0; 545 546 for row in stmt.query_map(rusqlite::params![since_day, today_day], |r| { 547 Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?, r.get::<_, i64>(2)?, r.get::<_, i64>(3)?, r.get::<_, i64>(4)?)) 548 })?.filter_map(|r| r.ok()) { 549 let (ts, etype, count, total_ms, total_bytes) = row; 550 assert!(ts >= 0); 551 assert!(count >= 0); 552 553 if ts >= next_bucket { 554 buckets.push(TrendBucket { 555 bucket: bucket_label(ts, bucket), 556 build_count: 0, build_total_ms: 0, 557 subst_count: 0, subst_total_ms: 0, 558 download_bytes: 0, 559 build_durations: vec![], 560 subst_durations: vec![], 561 }); 562 next_bucket = bucket_end(ts, bucket); 563 } 564 565 let last = buckets.last_mut().unwrap(); 566 match etype { 567 105 => { last.build_count += count; last.build_total_ms += total_ms; } 568 108 => { last.subst_count += count; last.subst_total_ms += total_ms; } 569 101 => { last.download_bytes += total_bytes; } 570 _ => {} 571 } 572 } 573 574 if let Some(t) = today { 575 if t.build_count > 0 || t.subst_count > 0 || t.download_bytes > 0 { 576 let today_ts = t.day * 86400; 577 if since.map_or(true, |s| today_ts >= s) { 578 if today_ts >= next_bucket { 579 buckets.push(TrendBucket { 580 bucket: bucket_label(today_ts, bucket), 581 build_count: t.build_count, 582 build_total_ms: t.build_total_ms, 583 subst_count: t.subst_count, 584 subst_total_ms: t.subst_total_ms, 585 download_bytes: t.download_bytes, 586 build_durations: vec![], 587 subst_durations: vec![], 588 }); 589 } else if let Some(last) = buckets.last_mut() { 590 // Today falls in the same week/month bucket as the last historical day. 591 last.build_count += t.build_count; 592 last.build_total_ms += t.build_total_ms; 593 last.subst_count += t.subst_count; 594 last.subst_total_ms += t.subst_total_ms; 595 last.download_bytes += t.download_bytes; 596 } 597 } 598 } 599 } 600 601 Ok(buckets) 602} 603 604pub fn collect_trend( 605 db: &Mutex<Connection>, 606 since: Option<i64>, 607 bucket: BucketSize, 608 drv: Option<String>, 609 today: Option<TodaySummary>, 610 full: bool, 611) -> Result<Trend> { 612 if let Some(ref d) = drv { 613 assert!(!d.is_empty(), "drv filter must not be empty"); 614 } 615 616 let conn = db.lock().unwrap(); 617 618 // daily_stats path when individual samples are not needed, no drv filter is active 619 // (daily_stats has no per-drv breakdown), and bucket granularity is at least a day 620 // (daily_stats has no intra-day resolution). 621 let buckets = if !full && drv.is_none() && !matches!(bucket, BucketSize::Hour) { 622 trend_from_daily_stats(&conn, since, &bucket, today)? 623 } else { 624 trend_from_events(&conn, since, &bucket, drv.as_deref())? 625 }; 626 627 for i in 1..buckets.len() { 628 assert!(buckets[i].bucket > buckets[i - 1].bucket, "buckets must be strictly ascending"); 629 } 630 631 Ok(Trend { buckets, bucket_size: bucket, drv_filter: drv }) 632} 633 634pub fn display_trend(trend: &Trend) { 635 let has_downloads = trend.drv_filter.is_none() 636 && trend.buckets.iter().any(|b| b.download_bytes > 0); 637 let bw = trend.bucket_size.col_width(); 638 639 if let Some(ref drv) = trend.drv_filter { 640 println!("filter: {}", drv); 641 } 642 643 print!("{:<bw$} {:>6} {:>10} {:>6} {:>10}", "period", "builds", "build avg", "subst", "subst avg"); 644 if has_downloads { print!(" {:>8}", "dl (MB)"); } 645 println!(); 646 647 if trend.buckets.is_empty() { 648 println!("(no data)"); 649 return; 650 } 651 652 for b in &trend.buckets { 653 let build_avg = if b.build_count > 0 { b.build_total_ms / b.build_count } else { 0 }; 654 let subst_avg = if b.subst_count > 0 { b.subst_total_ms / b.subst_count } else { 0 }; 655 656 print!("{:<bw$} {:>6} {:>10} {:>6} {:>10}", 657 b.bucket, b.build_count, fmt_ms(build_avg), 658 b.subst_count, fmt_ms(subst_avg)); 659 if has_downloads { print!(" {:>8.1}", b.download_bytes as f64 / 1_048_576.0); } 660 println!(); 661 } 662} 663 664fn print_test_section<F>(label: &str, buckets: &[TrendBucket], get_durs: F, bw: usize) 665where 666 F: Fn(&TrendBucket) -> &[i64], 667{ 668 let any_data = buckets.iter().any(|b| !get_durs(b).is_empty()); 669 if !any_data { return; } 670 671 println!("{}", label); 672 println!("{:<bw$} {:>5} {:>10} {:>7} {:>8}", "period", "n", "median", "Δ", "p-value"); 673 674 for (i, b) in buckets.iter().enumerate() { 675 let durs = get_durs(b); 676 let mut sorted = durs.to_vec(); sorted.sort_unstable(); 677 let med = if sorted.is_empty() { 0.0 } else { median_sorted(&sorted) }; 678 679 let (delta_str, p_str) = if i == 0 || get_durs(&buckets[i - 1]).is_empty() { 680 (String::new(), String::new()) 681 } else { 682 let prev = get_durs(&buckets[i - 1]); 683 let mut prev_sorted = prev.to_vec(); prev_sorted.sort_unstable(); 684 let prev_med = median_sorted(&prev_sorted); 685 686 let delta = if prev_med > 0.0 { 687 let pct = (med - prev_med) / prev_med * 100.0; 688 format!("{}{:.0}%", if pct >= 0.0 { "+" } else { "" }, pct) 689 } else { 690 String::new() 691 }; 692 693 let p = match mann_whitney_u(prev, durs) { 694 Some(r) => format!("{:.3}", r.p_value), 695 None => String::new(), 696 }; 697 698 (delta, p) 699 }; 700 701 println!("{:<bw$} {:>5} {:>10} {:>7} {:>8}", 702 b.bucket, durs.len(), fmt_ms(med as i64), delta_str, p_str); 703 } 704 println!(); 705} 706 707pub fn display_trend_test(trend: &Trend) { 708 let bw = trend.bucket_size.col_width(); 709 710 if let Some(ref drv) = trend.drv_filter { 711 println!("filter: {}", drv); 712 } 713 714 print_test_section("builds", &trend.buckets, |b| &b.build_durations, bw); 715 print_test_section("substitutions", &trend.buckets, |b| &b.subst_durations, bw); 716 717 println!("Mann-Whitney U (two-tailed). H0: adjacent periods have the same duration distribution."); 718} 719 720pub fn output_csv_trend(trend: &Trend) { 721 println!("period,build_count,build_avg_ms,subst_count,subst_avg_ms,download_bytes"); 722 for b in &trend.buckets { 723 let build_avg = if b.build_count > 0 { b.build_total_ms / b.build_count } else { 0 }; 724 let subst_avg = if b.subst_count > 0 { b.subst_total_ms / b.subst_count } else { 0 }; 725 assert!(build_avg >= 0); 726 assert!(subst_avg >= 0); 727 println!("{},{},{},{},{},{}", b.bucket, b.build_count, build_avg, b.subst_count, subst_avg, b.download_bytes); 728 } 729} 730 731pub fn display_stats(stats: Stats) { 732 let build_avg = if stats.build_count > 0 { stats.build_total_ms / stats.build_count } else { 0 }; 733 let subst_avg = if stats.subst_count > 0 { stats.subst_total_ms / stats.subst_count } else { 0 }; 734 let mb = stats.download_bytes as f64 / 1_048_576.0; 735 let dl_speed = if stats.download_ms > 0 { mb / (stats.download_ms as f64 / 1000.0) } else { 0.0 }; 736 737 println!("{:<14} {:>6} total {:>9} avg {:>9}", "built", stats.build_count, fmt_ms(stats.build_total_ms), fmt_ms(build_avg)); 738 println!("{:<14} {:>6} total {:>9} avg {:>9}", "substituted", stats.subst_count, fmt_ms(stats.subst_total_ms), fmt_ms(subst_avg)); 739 println!("{:<14} {:>8.1} MB avg {:>6.1} MB/s", "downloaded", mb, dl_speed); 740 741 if !stats.slowest_builds.is_empty() { 742 let grouped = stats.slowest_builds.first().map(|b| b.count.is_some()).unwrap_or(false); 743 println!(); 744 for row in &stats.slowest_builds { 745 let path = row.drv_path.as_deref().or(row.text.as_deref()).unwrap_or("?"); 746 if grouped { 747 println!("{:>9} ({:>3}x) {}", fmt_ms(row.duration_ms), row.count.unwrap_or(0), drv_name(path)); 748 } else { 749 println!("{:>9} {}", fmt_ms(row.duration_ms), drv_name(path)); 750 } 751 } 752 } 753 754 if !stats.cache_latency.is_empty() { 755 let url_w = stats.cache_latency.iter().map(|r| r.cache_url.len()).max().unwrap_or(0); 756 println!(); 757 for row in &stats.cache_latency { 758 println!("{:<width$} avg {:>7} {:>6} queries", row.cache_url, fmt_ms(row.avg_ms as i64), row.count, width = url_w); 759 } 760 } 761}