Nix Observability Daemon
observability nix
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

more perf

+162 -236
+28 -27
benches/queries.rs
··· 3 3 use rusqlite::Connection; 4 4 use std::sync::Mutex; 5 5 6 - // Seed an in-memory database with n rows spread evenly across one year. 6 + const BASE_TIME: i64 = 1_700_000_000; // 2023-11-14 7 + const YEAR_SPAN: i64 = 365 * 86400; 8 + 9 + // Seed n rows spread evenly across one year. 7 10 // Row mix: 60% builds (105), 30% substitutions (108), 10% downloads (101). 8 - // Uses a single transaction and a prepared statement for speed. 9 11 fn seed(conn: &Connection, n: usize) { 10 12 conn.execute_batch(" 11 13 CREATE TABLE events ( 12 - id INTEGER PRIMARY KEY AUTOINCREMENT, 13 - nix_id INTEGER, 14 - parent_id INTEGER, 15 - event_type INTEGER, 16 - text TEXT, 17 - drv_path TEXT, 18 - cache_url TEXT, 19 - start_time INTEGER, 20 - end_time INTEGER, 21 - duration_ms INTEGER, 22 - total_bytes INTEGER 14 + id INTEGER PRIMARY KEY AUTOINCREMENT, 15 + nix_id INTEGER, parent_id INTEGER, event_type INTEGER, 16 + text TEXT, drv_path TEXT, cache_url TEXT, 17 + start_time INTEGER, end_time INTEGER, 18 + duration_ms INTEGER, total_bytes INTEGER 23 19 ); 24 20 CREATE INDEX idx_events_type_start ON events(event_type, start_time); 25 21 CREATE INDEX idx_events_start_time ON events(start_time); 26 22 PRAGMA journal_mode = WAL; 27 23 PRAGMA synchronous = NORMAL; 28 24 ").unwrap(); 29 - 30 - let base: i64 = 1_700_000_000; // 2023-11-14 31 - let span: i64 = 365 * 86400; 32 25 33 26 conn.execute_batch("BEGIN").unwrap(); 34 27 let mut stmt = conn.prepare( ··· 38 31 39 32 for i in 0..n { 40 33 let event_type: i64 = if i % 10 < 6 { 105 } else if i % 10 < 9 { 108 } else { 101 }; 41 - let start = base + (i as i64 * span / n as i64); 42 - // Varied durations 1ms–10min. wrapping_mul avoids overflow; abs() ensures positive. 34 + let start = BASE_TIME + (i as i64 * YEAR_SPAN / n as i64); 43 35 let duration_ms: i64 = 1 + (i as i64).wrapping_mul(6364136223846793005).abs() % 600_000; 44 36 let total_bytes: i64 = if event_type == 101 { (i as i64).wrapping_mul(104729).abs() % 500_000_000 } else { 0 }; 45 37 let drv_path: Option<String> = if event_type != 101 { ··· 67 59 seed(&conn, n); 68 60 let db = Mutex::new(conn); 69 61 70 - group.bench_with_input(BenchmarkId::new("no_filter", n), &n, |b, _| { 62 + // All-time: no index benefit on start_time, exercises full scan path. 63 + group.bench_with_input(BenchmarkId::new("all_time", n), &n, |b, _| { 71 64 b.iter(|| collect_stats(&db, None, None, SortField::Duration, 10, false).unwrap()) 65 + }); 66 + 67 + // Recent 10%: composite index (event_type, start_time) prunes 90% of rows. 68 + let since = Some(BASE_TIME + YEAR_SPAN * 9 / 10); 69 + group.bench_with_input(BenchmarkId::new("recent_10pct", n), &n, |b, _| { 70 + b.iter(|| collect_stats(&db, since, None, SortField::Duration, 10, false).unwrap()) 72 71 }); 73 72 74 73 group.bench_with_input(BenchmarkId::new("grouped", n), &n, |b, _| { ··· 87 86 seed(&conn, n); 88 87 let db = Mutex::new(conn); 89 88 90 - // aggregate (raw=false): SQL window function median — the default code path. 91 - group.bench_with_input(BenchmarkId::new("aggregate/month", n), &n, |b, _| { 92 - b.iter(|| collect_trend(&db, None, BucketSize::Month, None, false).unwrap()) 89 + // All-time monthly: exercises full scan + Rust-side bucketing. 90 + group.bench_with_input(BenchmarkId::new("all_time/month", n), &n, |b, _| { 91 + b.iter(|| collect_trend(&db, None, BucketSize::Month, None).unwrap()) 93 92 }); 94 93 95 - group.bench_with_input(BenchmarkId::new("aggregate/day", n), &n, |b, _| { 96 - b.iter(|| collect_trend(&db, None, BucketSize::Day, None, false).unwrap()) 94 + // All-time daily: more bucket transitions, otherwise identical scan. 95 + group.bench_with_input(BenchmarkId::new("all_time/day", n), &n, |b, _| { 96 + b.iter(|| collect_trend(&db, None, BucketSize::Day, None).unwrap()) 97 97 }); 98 98 99 - // raw (raw=true): sends all durations for Mann-Whitney — the memory-heavy path. 100 - group.bench_with_input(BenchmarkId::new("raw/month", n), &n, |b, _| { 101 - b.iter(|| collect_trend(&db, None, BucketSize::Month, None, true).unwrap()) 99 + // Recent 10%: start_time index reduces rows scanned by 90%. 100 + let since = Some(BASE_TIME + YEAR_SPAN * 9 / 10); 101 + group.bench_with_input(BenchmarkId::new("recent_10pct/day", n), &n, |b, _| { 102 + b.iter(|| collect_trend(&db, since, BucketSize::Day, None).unwrap()) 102 103 }); 103 104 } 104 105
+2 -4
src/daemon.rs
··· 206 206 since: Option<i64>, 207 207 bucket: BucketSize, 208 208 drv: Option<String>, 209 - #[serde(default)] 210 - raw: bool, 211 209 }, 212 210 Clean { 213 211 // Unix timestamp; None means delete everything. ··· 390 388 writer.write_all((serde_json::to_string(&stats)? + "\n").as_bytes()).await?; 391 389 break; 392 390 } 393 - Ok(SocketMessage::Command(ClientCommand::GetTrend { since, bucket, drv, raw })) => { 391 + Ok(SocketMessage::Command(ClientCommand::GetTrend { since, bucket, drv })) => { 394 392 let db = Arc::clone(&db); 395 - let trend = tokio::task::spawn_blocking(move || collect_trend(&db.reader, since, bucket, drv, raw)) 393 + let trend = tokio::task::spawn_blocking(move || collect_trend(&db.reader, since, bucket, drv)) 396 394 .await??; 397 395 writer.write_all((serde_json::to_string(&trend)? + "\n").as_bytes()).await?; 398 396 break;
+1 -5
src/main.rs
··· 273 273 assert!(limit > 0); 274 274 275 275 if let Some(bucket_size) = bucket { 276 - // raw=true only when Mann-Whitney test output is requested — sends all durations. 277 - let raw = matches!(output, OutputFormat::Test); 278 276 let cmd = serde_json::json!({ 279 277 "action": "get_trend", 280 278 "since": since, 281 279 "bucket": bucket_size, 282 280 "drv": drv, 283 - "raw": raw, 284 281 }); 285 282 stream.write_all((cmd.to_string() + "\n").as_bytes()).await?; 286 283 ··· 326 323 let conn = Mutex::new(conn); 327 324 328 325 if let Some(bucket_size) = bucket { 329 - let raw = matches!(output, OutputFormat::Test); 330 - let trend = collect_trend(&conn, since, bucket_size, drv, raw)?; 326 + let trend = collect_trend(&conn, since, bucket_size, drv)?; 331 327 display_trend_output(&trend, output); 332 328 } else { 333 329 let s = nod::stats::collect_stats(&conn, since, drv.as_deref(), sort, limit, group)?;
+131 -200
src/stats.rs
··· 1 1 use anyhow::{Context, Result}; 2 + use chrono::prelude::*; 2 3 use rusqlite::Connection; 3 4 use serde::{Deserialize, Serialize}; 4 5 use std::sync::Mutex; ··· 39 40 pub count: i64, 40 41 } 41 42 43 + // Three separate queries — one per event type — so the composite index 44 + // (event_type, start_time) can be used for each rather than doing a full 45 + // table scan with per-aggregate FILTER conditions. 42 46 pub fn collect_stats( 43 47 db: &Mutex<Connection>, 44 48 since: Option<i64>, ··· 51 55 52 56 let conn = db.lock().unwrap(); 53 57 54 - let (build_count, build_total_ms, subst_count, subst_total_ms, download_bytes, download_ms) = 55 - conn.query_row( 56 - "SELECT 57 - COUNT(*) FILTER (WHERE event_type = 105), 58 - COALESCE(SUM(duration_ms) FILTER (WHERE event_type = 105), 0), 59 - COUNT(*) FILTER (WHERE event_type = 108), 60 - COALESCE(SUM(duration_ms) FILTER (WHERE event_type = 108), 0), 61 - COALESCE(SUM(total_bytes) FILTER (WHERE event_type = 101), 0), 62 - COALESCE(SUM(duration_ms) FILTER (WHERE event_type = 101), 0) 63 - FROM events 64 - WHERE (?1 IS NULL OR start_time >= ?1) 65 - AND (?2 IS NULL OR drv_path LIKE '%' || ?2 || '%')", 66 - rusqlite::params![since, drv], 67 - |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?, r.get::<_, i64>(2)?, 68 - r.get::<_, i64>(3)?, r.get::<_, i64>(4)?, r.get::<_, i64>(5)?)), 69 - ).context("Failed to query summary stats")?; 58 + let (build_count, build_total_ms) = conn.query_row( 59 + "SELECT COUNT(*), COALESCE(SUM(duration_ms), 0) 60 + FROM events WHERE event_type = 105 61 + AND (?1 IS NULL OR start_time >= ?1) 62 + AND (?2 IS NULL OR drv_path LIKE '%' || ?2 || '%')", 63 + rusqlite::params![since, drv], 64 + |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?)), 65 + ).context("Failed to query build summary")?; 66 + 67 + let (subst_count, subst_total_ms) = conn.query_row( 68 + "SELECT COUNT(*), COALESCE(SUM(duration_ms), 0) 69 + FROM events WHERE event_type = 108 70 + AND (?1 IS NULL OR start_time >= ?1) 71 + AND (?2 IS NULL OR drv_path LIKE '%' || ?2 || '%')", 72 + rusqlite::params![since, drv], 73 + |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?)), 74 + ).context("Failed to query substitution summary")?; 75 + 76 + let (download_bytes, download_ms) = conn.query_row( 77 + "SELECT COALESCE(SUM(total_bytes), 0), COALESCE(SUM(duration_ms), 0) 78 + FROM events WHERE event_type = 101 79 + AND (?1 IS NULL OR start_time >= ?1)", 80 + rusqlite::params![since], 81 + |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?)), 82 + ).context("Failed to query download summary")?; 83 + 84 + assert!(build_count >= 0); 85 + assert!(subst_count >= 0); 86 + assert!(download_bytes >= 0); 70 87 71 88 let slowest_builds: Vec<SlowBuild> = if group { 72 89 let sort_col = match sort { ··· 74 91 SortField::Count => "cnt", 75 92 SortField::Name => "drv_path", 76 93 }; 77 - // sort_col is an internal identifier, not user input — safe to interpolate. 78 94 let sql = format!( 79 95 "SELECT drv_path, CAST(ROUND(AVG(duration_ms)) AS INTEGER) as avg_ms, COUNT(*) as cnt 80 96 FROM events ··· 87 103 ); 88 104 let mut stmt = conn.prepare(&sql).context("Failed to prepare grouped builds query")?; 89 105 stmt.query_map(rusqlite::params![since, drv, limit], |r| { 90 - Ok(SlowBuild { 91 - drv_path: r.get(0)?, 92 - duration_ms: r.get(1)?, 93 - count: Some(r.get(2)?), 94 - text: None, 95 - }) 106 + Ok(SlowBuild { drv_path: r.get(0)?, duration_ms: r.get(1)?, count: Some(r.get(2)?), text: None }) 96 107 })?.filter_map(|r| r.ok()).collect() 97 108 } else { 98 109 let sort_col = match sort { ··· 110 121 ); 111 122 let mut stmt = conn.prepare(&sql).context("Failed to prepare slowest builds query")?; 112 123 stmt.query_map(rusqlite::params![since, drv, limit], |r| { 113 - Ok(SlowBuild { 114 - duration_ms: r.get(0)?, 115 - count: None, 116 - drv_path: r.get(1)?, 117 - text: r.get(2)?, 118 - }) 124 + Ok(SlowBuild { duration_ms: r.get(0)?, count: None, drv_path: r.get(1)?, text: r.get(2)? }) 119 125 })?.filter_map(|r| r.ok()).collect() 120 126 }; 121 127 ··· 129 135 GROUP BY cache_url ORDER BY AVG(duration_ms) DESC", 130 136 ).context("Failed to prepare cache latency query")?; 131 137 let cache_latency: Vec<CacheStat> = stmt.query_map(rusqlite::params![since], |r| { 132 - Ok(CacheStat { 133 - cache_url: r.get(0)?, 134 - avg_ms: r.get(1)?, 135 - count: r.get(2)?, 136 - }) 138 + Ok(CacheStat { cache_url: r.get(0)?, avg_ms: r.get(1)?, count: r.get(2)? }) 137 139 })?.filter_map(|r| r.ok()).collect(); 138 140 139 141 Ok(Stats { build_count, build_total_ms, subst_count, subst_total_ms, download_bytes, download_ms, slowest_builds, cache_latency }) ··· 160 162 161 163 let n = combined.len(); 162 164 163 - // Average ranks within tie groups and accumulate the tie-correction term Σ(t³ - t). 164 165 let mut rank_sum_a = 0.0f64; 165 166 let mut tie_correction = 0.0f64; 166 167 let mut i = 0; ··· 190 191 assert!(cliffs_delta <= 1.0 + 1e-9); 191 192 let _ = cliffs_delta; 192 193 193 - // Var[U] = n1*n2/12 * [(n+1) - Σ(t³-t)/(n*(n-1))] 194 194 let nf = n as f64; 195 195 let variance = (n1f * n2f / 12.0) * ((nf + 1.0) - tie_correction / (nf * (nf - 1.0))); 196 196 ··· 199 199 } else { 200 200 let u_min = u_a.min(u_b); 201 201 let mean_u = n1f * n2f / 2.0; 202 - // Continuity correction: +0.5 shifts U_min toward the mean, making z conservative. 203 202 let z = (u_min - mean_u + 0.5) / variance.sqrt(); 204 203 assert!(z <= 0.0 + 1e-9, "z must be non-positive for U_min ≤ mean_U"); 205 204 (2.0 * normal_cdf(z)).min(1.0) ··· 211 210 Some(MannWhitneyResult { p_value }) 212 211 } 213 212 214 - // Abramowitz & Stegun 7.1.26, max error ≈ 1.5×10⁻⁷. 215 213 fn normal_cdf(z: f64) -> f64 { 216 214 0.5 * (1.0 + erf_approx(z / std::f64::consts::SQRT_2)) 217 215 } ··· 261 259 } 262 260 263 261 impl BucketSize { 262 + // Format string must match bucket_label() output exactly so that display 263 + // functions and any downstream consumers are consistent. 264 264 pub fn strftime_fmt(&self) -> &'static str { 265 265 match self { 266 266 BucketSize::Hour => "%Y-%m-%dT%H", ··· 280 280 } 281 281 } 282 282 283 + // Compute the display label for the bucket containing ts. 284 + // Uses the same format strings as strftime_fmt() so labels are consistent 285 + // with any legacy data or external tooling that used the SQL strftime path. 286 + fn bucket_label(ts: i64, size: &BucketSize) -> String { 287 + let dt = DateTime::from_timestamp(ts, 0) 288 + .unwrap_or_default() 289 + .with_timezone(&Utc); 290 + match size { 291 + BucketSize::Hour => dt.format("%Y-%m-%dT%H").to_string(), 292 + BucketSize::Day => dt.format("%Y-%m-%d").to_string(), 293 + BucketSize::Week => dt.format("%Y-W%W").to_string(), 294 + BucketSize::Month => dt.format("%Y-%m").to_string(), 295 + } 296 + } 297 + 298 + // Compute the start timestamp of the bucket immediately after the bucket 299 + // containing ts. Used for integer-comparison bucket detection in the hot loop — 300 + // avoids a string allocation + comparison per row. 301 + fn bucket_end(ts: i64, size: &BucketSize) -> i64 { 302 + let dt = DateTime::from_timestamp(ts, 0) 303 + .unwrap_or_default() 304 + .with_timezone(&Utc); 305 + match size { 306 + BucketSize::Hour => { 307 + Utc.with_ymd_and_hms(dt.year(), dt.month(), dt.day(), dt.hour(), 0, 0) 308 + .unwrap() 309 + .checked_add_signed(chrono::Duration::hours(1)) 310 + .unwrap() 311 + .timestamp() 312 + } 313 + BucketSize::Day => { 314 + Utc.with_ymd_and_hms(dt.year(), dt.month(), dt.day(), 0, 0, 0) 315 + .unwrap() 316 + .checked_add_signed(chrono::Duration::days(1)) 317 + .unwrap() 318 + .timestamp() 319 + } 320 + BucketSize::Week => { 321 + // Monday-based weeks, matching strftime '%W'. 322 + let days_since_monday = dt.weekday().num_days_from_monday() as i64; 323 + let monday = dt.date_naive() - chrono::Duration::days(days_since_monday); 324 + Utc.from_utc_datetime(&monday.and_hms_opt(0, 0, 0).unwrap()) 325 + .checked_add_signed(chrono::Duration::weeks(1)) 326 + .unwrap() 327 + .timestamp() 328 + } 329 + BucketSize::Month => { 330 + let (year, month) = if dt.month() == 12 { (dt.year() + 1, 1u32) } else { (dt.year(), dt.month() + 1) }; 331 + Utc.with_ymd_and_hms(year, month, 1, 0, 0, 0).unwrap().timestamp() 332 + } 333 + } 334 + } 335 + 283 336 #[derive(Debug, Serialize, Deserialize)] 284 337 pub struct TrendBucket { 285 338 pub bucket: String, 286 - // Precomputed aggregates — always populated; computed client-side in raw mode, 287 - // computed SQL-side in aggregate mode. 288 - #[serde(default)] 289 - pub build_count: i64, 290 - #[serde(default)] 291 - pub build_median_ms: i64, 292 - #[serde(default)] 293 - pub subst_count: i64, 294 - #[serde(default)] 295 - pub subst_median_ms: i64, 296 - pub download_bytes: i64, 297 - // Raw durations — only populated when raw=true (needed for Mann-Whitney). 298 - #[serde(default)] 299 339 pub build_durations: Vec<i64>, 300 - #[serde(default)] 301 340 pub subst_durations: Vec<i64>, 341 + pub download_bytes: i64, 302 342 } 303 343 304 344 #[derive(Debug, Serialize, Deserialize)] ··· 308 348 pub drv_filter: Option<String>, 309 349 } 310 350 311 - // raw=false: compute per-bucket median in SQL via window functions — no raw durations 312 - // sent over the socket. Use this for all display modes except Mann-Whitney. 313 - // raw=true: return all durations for Mann-Whitney (--output test). Memory-intensive 314 - // for large datasets; only use when genuinely needed. 351 + // Query returns raw start_time integers — strftime is computed in Rust via 352 + // bucket_label()/bucket_end() to avoid N SQLite string allocations. Bucket 353 + // boundaries are detected with a single integer comparison per row (ts >= next_bucket) 354 + // so string allocs happen only once per bucket, not once per row. 315 355 pub fn collect_trend( 316 356 db: &Mutex<Connection>, 317 357 since: Option<i64>, 318 358 bucket: BucketSize, 319 359 drv: Option<String>, 320 - raw: bool, 321 360 ) -> Result<Trend> { 322 361 if let Some(ref d) = drv { 323 362 assert!(!d.is_empty(), "drv filter must not be empty"); ··· 325 364 326 365 let conn = db.lock().unwrap(); 327 366 let drv_ref = drv.as_deref(); 328 - let fmt = bucket.strftime_fmt(); 329 367 330 - let buckets = if raw { 331 - collect_trend_raw(&conn, since, drv_ref, fmt)? 332 - } else { 333 - collect_trend_aggregate(&conn, since, drv_ref, fmt)? 334 - }; 335 - 336 - for i in 1..buckets.len() { 337 - assert!(buckets[i].bucket > buckets[i - 1].bucket, "buckets must be strictly ascending"); 338 - } 339 - 340 - Ok(Trend { buckets, bucket_size: bucket, drv_filter: drv }) 341 - } 342 - 343 - // Fetches all raw durations. ORDER BY start_time (not strftime) so the index 344 - // (event_type, start_time) can be used for ordering — strftime is monotone in 345 - // start_time so bucket grouping is preserved. 346 - fn collect_trend_raw( 347 - conn: &Connection, 348 - since: Option<i64>, 349 - drv: Option<&str>, 350 - fmt: &str, 351 - ) -> Result<Vec<TrendBucket>> { 352 368 // FileTransfer (101) has NULL drv_path and is intentionally excluded by the drv filter. 353 369 let mut stmt = conn.prepare( 354 - "SELECT strftime(?3, start_time, 'unixepoch'), event_type, duration_ms, total_bytes 370 + "SELECT start_time, event_type, duration_ms, total_bytes 355 371 FROM events 356 372 WHERE event_type IN (101, 105, 108) 357 373 AND (?1 IS NULL OR start_time >= ?1) 358 374 AND (?2 IS NULL OR drv_path LIKE '%' || ?2 || '%') 359 375 ORDER BY start_time ASC", 360 - ).context("Failed to prepare raw trend query")?; 376 + ).context("Failed to prepare trend query")?; 361 377 362 378 let mut buckets: Vec<TrendBucket> = vec![]; 363 - for row in stmt.query_map(rusqlite::params![since, drv, fmt], |r| { 364 - Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)?, r.get::<_, i64>(2)?, r.get::<_, i64>(3)?)) 379 + let mut next_bucket: i64 = 0; 380 + 381 + for row in stmt.query_map(rusqlite::params![since, drv_ref], |r| { 382 + Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?, r.get::<_, i64>(2)?, r.get::<_, i64>(3)?)) 365 383 })?.filter_map(|r| r.ok()) { 366 - let (b, etype, dur, bytes) = row; 367 - if buckets.last().map(|x: &TrendBucket| x.bucket.as_str()) != Some(&b) { 384 + let (ts, etype, dur, bytes) = row; 385 + assert!(ts >= 0); 386 + 387 + if ts >= next_bucket { 368 388 buckets.push(TrendBucket { 369 - bucket: b, build_count: 0, build_median_ms: 0, 370 - subst_count: 0, subst_median_ms: 0, download_bytes: 0, 371 - build_durations: vec![], subst_durations: vec![], 389 + bucket: bucket_label(ts, &bucket), 390 + build_durations: vec![], 391 + subst_durations: vec![], 392 + download_bytes: 0, 372 393 }); 394 + next_bucket = bucket_end(ts, &bucket); 373 395 } 396 + 374 397 let last = buckets.last_mut().unwrap(); 375 398 match etype { 376 399 105 => { assert!(dur >= 0); last.build_durations.push(dur); } ··· 380 403 } 381 404 } 382 405 383 - // Compute aggregates from raw durations so display functions can use them uniformly. 384 - for b in &mut buckets { 385 - if !b.build_durations.is_empty() { 386 - let mut s = b.build_durations.clone(); s.sort_unstable(); 387 - b.build_count = s.len() as i64; 388 - b.build_median_ms = median_sorted(&s) as i64; 389 - } 390 - if !b.subst_durations.is_empty() { 391 - let mut s = b.subst_durations.clone(); s.sort_unstable(); 392 - b.subst_count = s.len() as i64; 393 - b.subst_median_ms = median_sorted(&s) as i64; 394 - } 406 + for i in 1..buckets.len() { 407 + assert!(buckets[i].bucket > buckets[i - 1].bucket, "buckets must be strictly ascending"); 395 408 } 396 409 397 - Ok(buckets) 398 - } 399 - 400 - // Computes per-bucket median entirely in SQL using window functions. Returns no 401 - // raw durations — only counts and medians. Memory usage is proportional to the 402 - // number of distinct buckets, not the number of rows. 403 - fn collect_trend_aggregate( 404 - conn: &Connection, 405 - since: Option<i64>, 406 - drv: Option<&str>, 407 - fmt: &str, 408 - ) -> Result<Vec<TrendBucket>> { 409 - // Window function median: ROW_NUMBER orders rows by duration within each 410 - // (bucket, event_type) partition; we take the one or two middle rows and AVG them. 411 - // Integer division: odd n → single middle row; even n → average of two middle rows. 412 - let mut stmt = conn.prepare( 413 - "SELECT bucket, event_type, 414 - CAST(AVG(duration_ms) AS INTEGER) AS median_ms, 415 - MAX(n) AS cnt 416 - FROM ( 417 - SELECT strftime(?3, start_time, 'unixepoch') AS bucket, 418 - event_type, duration_ms, 419 - ROW_NUMBER() OVER ( 420 - PARTITION BY strftime(?3, start_time, 'unixepoch'), event_type 421 - ORDER BY duration_ms 422 - ) AS rn, 423 - COUNT(*) OVER ( 424 - PARTITION BY strftime(?3, start_time, 'unixepoch'), event_type 425 - ) AS n 426 - FROM events 427 - WHERE event_type IN (105, 108) 428 - AND (?1 IS NULL OR start_time >= ?1) 429 - AND (?2 IS NULL OR drv_path LIKE '%' || ?2 || '%') 430 - ) 431 - WHERE rn IN ((n + 1) / 2, (n + 2) / 2) 432 - GROUP BY bucket, event_type 433 - ORDER BY bucket ASC", 434 - ).context("Failed to prepare aggregate trend query")?; 435 - 436 - let mut buckets: Vec<TrendBucket> = vec![]; 437 - for row in stmt.query_map(rusqlite::params![since, drv, fmt], |r| { 438 - Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)?, r.get::<_, i64>(2)?, r.get::<_, i64>(3)?)) 439 - })?.filter_map(|r| r.ok()) { 440 - let (b, etype, median_ms, cnt) = row; 441 - assert!(median_ms >= 0); 442 - assert!(cnt > 0); 443 - if buckets.last().map(|x: &TrendBucket| x.bucket.as_str()) != Some(&b) { 444 - buckets.push(TrendBucket { 445 - bucket: b, build_count: 0, build_median_ms: 0, 446 - subst_count: 0, subst_median_ms: 0, download_bytes: 0, 447 - build_durations: vec![], subst_durations: vec![], 448 - }); 449 - } 450 - let last = buckets.last_mut().unwrap(); 451 - match etype { 452 - 105 => { last.build_median_ms = median_ms; last.build_count = cnt; } 453 - 108 => { last.subst_median_ms = median_ms; last.subst_count = cnt; } 454 - _ => {} 455 - } 456 - } 457 - 458 - // Separate query for download bytes — FileTransfer has no meaningful duration median. 459 - let mut dl_stmt = conn.prepare( 460 - "SELECT strftime(?2, start_time, 'unixepoch') AS bucket, SUM(total_bytes) 461 - FROM events 462 - WHERE event_type = 101 463 - AND (?1 IS NULL OR start_time >= ?1) 464 - GROUP BY bucket 465 - ORDER BY bucket ASC", 466 - ).context("Failed to prepare download bytes query")?; 467 - 468 - for row in dl_stmt.query_map(rusqlite::params![since, fmt], |r| { 469 - Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)?)) 470 - })?.filter_map(|r| r.ok()) { 471 - let (b, bytes) = row; 472 - assert!(bytes >= 0); 473 - // Merge into existing bucket or create a download-only bucket. 474 - if let Some(existing) = buckets.iter_mut().find(|x| x.bucket == b) { 475 - existing.download_bytes = bytes; 476 - } else { 477 - buckets.push(TrendBucket { 478 - bucket: b, build_count: 0, build_median_ms: 0, 479 - subst_count: 0, subst_median_ms: 0, download_bytes: bytes, 480 - build_durations: vec![], subst_durations: vec![], 481 - }); 482 - } 483 - } 484 - 485 - buckets.sort_unstable_by(|a, b| a.bucket.cmp(&b.bucket)); 486 - Ok(buckets) 410 + Ok(Trend { buckets, bucket_size: bucket, drv_filter: drv }) 487 411 } 488 412 489 413 pub fn display_trend(trend: &Trend) { ··· 505 429 } 506 430 507 431 for b in &trend.buckets { 432 + let mut bs = b.build_durations.clone(); bs.sort_unstable(); 433 + let mut ss = b.subst_durations.clone(); ss.sort_unstable(); 434 + let build_med = if bs.is_empty() { 0 } else { median_sorted(&bs) as i64 }; 435 + let subst_med = if ss.is_empty() { 0 } else { median_sorted(&ss) as i64 }; 436 + 508 437 print!("{:<bw$} {:>6} {:>10} {:>6} {:>10}", 509 - b.bucket, b.build_count, fmt_ms(b.build_median_ms), 510 - b.subst_count, fmt_ms(b.subst_median_ms)); 438 + b.bucket, b.build_durations.len(), fmt_ms(build_med), 439 + b.subst_durations.len(), fmt_ms(subst_med)); 511 440 if has_downloads { print!(" {:>8.1}", b.download_bytes as f64 / 1_048_576.0); } 512 441 println!(); 513 442 } ··· 537 466 538 467 let delta = if prev_med > 0.0 { 539 468 let pct = (med - prev_med) / prev_med * 100.0; 540 - let sign = if pct >= 0.0 { "+" } else { "" }; 541 - format!("{}{:.0}%", sign, pct) 469 + format!("{}{:.0}%", if pct >= 0.0 { "+" } else { "" }, pct) 542 470 } else { 543 471 String::new() 544 472 }; ··· 573 501 pub fn output_csv_trend(trend: &Trend) { 574 502 println!("period,build_count,build_median_ms,subst_count,subst_median_ms,download_bytes"); 575 503 for b in &trend.buckets { 576 - assert!(b.build_median_ms >= 0); 577 - assert!(b.subst_median_ms >= 0); 578 - println!("{},{},{},{},{},{}", b.bucket, b.build_count, b.build_median_ms, b.subst_count, b.subst_median_ms, b.download_bytes); 504 + let mut bs = b.build_durations.clone(); bs.sort_unstable(); 505 + let mut ss = b.subst_durations.clone(); ss.sort_unstable(); 506 + let build_med = if bs.is_empty() { 0 } else { median_sorted(&bs) as i64 }; 507 + let subst_med = if ss.is_empty() { 0 } else { median_sorted(&ss) as i64 }; 508 + assert!(build_med >= 0); 509 + assert!(subst_med >= 0); 510 + println!("{},{},{},{},{},{}", b.bucket, b.build_durations.len(), build_med, b.subst_durations.len(), subst_med, b.download_bytes); 579 511 } 580 512 } 581 513 ··· 595 527 for row in &stats.slowest_builds { 596 528 let path = row.drv_path.as_deref().or(row.text.as_deref()).unwrap_or("?"); 597 529 if grouped { 598 - let cnt = row.count.unwrap_or(0); 599 - println!("{:>9} ({:>3}x) {}", fmt_ms(row.duration_ms), cnt, drv_name(path)); 530 + println!("{:>9} ({:>3}x) {}", fmt_ms(row.duration_ms), row.count.unwrap_or(0), drv_name(path)); 600 531 } else { 601 532 println!("{:>9} {}", fmt_ms(row.duration_ms), drv_name(path)); 602 533 }