Nix Observability Daemon
observability nix
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

add trend command

+404 -9
+9 -1
src/daemon.rs
··· 10 10 use tokio::net::{UnixListener, UnixStream}; 11 11 use tracing::{error, info}; 12 12 13 - use crate::stats::collect_stats; 13 + use crate::stats::{collect_stats, collect_trend, BucketSize}; 14 14 15 15 const fn schema_hash(s: &[u8]) -> u32 { 16 16 let mut h: u32 = 2166136261; ··· 158 158 #[serde(tag = "action", rename_all = "snake_case")] 159 159 enum ClientCommand { 160 160 GetStats { since: Option<i64> }, 161 + GetTrend { since: Option<i64>, bucket: BucketSize, drv: Option<String> }, 161 162 Clean, 162 163 } 163 164 ··· 258 259 let stats = tokio::task::spawn_blocking(move || collect_stats(&db, since)) 259 260 .await??; 260 261 writer.write_all((serde_json::to_string(&stats)? + "\n").as_bytes()).await?; 262 + break; 263 + } 264 + Ok(SocketMessage::Command(ClientCommand::GetTrend { since, bucket, drv })) => { 265 + let db = state.lock().unwrap().db.clone(); 266 + let trend = tokio::task::spawn_blocking(move || collect_trend(&db, since, bucket, drv)) 267 + .await??; 268 + writer.write_all((serde_json::to_string(&trend)? + "\n").as_bytes()).await?; 261 269 break; 262 270 } 263 271 Ok(SocketMessage::Command(ClientCommand::Clean)) => {
+92 -2
src/main.rs
··· 12 12 use tracing::{error, info}; 13 13 14 14 use daemon::{open_db, run_daemon}; 15 - use stats::{display_stats, Stats}; 15 + use stats::{display_stats, display_trend, display_trend_test, output_csv_trend, BucketSize, Stats, Trend}; 16 + 17 + #[derive(Clone, clap::ValueEnum)] 18 + enum BucketArg { 19 + Hour, 20 + Day, 21 + Week, 22 + Month, 23 + } 16 24 17 25 #[derive(Parser)] 18 26 struct Cli { ··· 46 54 #[arg(short = 'y', long)] 47 55 years: Option<u32>, 48 56 }, 57 + /// Show how builds, substitutions, and downloads trend over time 58 + Trend { 59 + /// Path to the Unix socket 60 + #[arg(short, long, env = "NOD_SOCKET")] 61 + socket: Option<PathBuf>, 62 + /// Limit to last N days 63 + #[arg(short = 'd', long)] 64 + days: Option<u32>, 65 + /// Limit to last N months 66 + #[arg(short = 'm', long)] 67 + months: Option<u32>, 68 + /// Limit to last N years 69 + #[arg(short = 'y', long)] 70 + years: Option<u32>, 71 + /// Time bucket granularity (auto-detected from period if omitted) 72 + #[arg(short = 'b', long, value_enum)] 73 + bucket: Option<BucketArg>, 74 + /// Filter builds and substitutions to derivations matching this substring 75 + #[arg(long)] 76 + drv: Option<String>, 77 + /// Run Mann-Whitney U test between adjacent periods instead of showing the plain table 78 + #[arg(long)] 79 + test: bool, 80 + /// Output as CSV 81 + #[arg(long)] 82 + csv: bool, 83 + }, 49 84 /// Clear all data from the database 50 85 Clean { 51 86 /// Path to the Unix socket ··· 61 96 62 97 let project_dirs = ProjectDirs::from("org", "nixos", "nod"); 63 98 64 - let command = cli.command.unwrap_or(Commands::Daemon { db: None, socket: None }); 99 + let command = cli.command.unwrap_or(Commands::Stats { socket: None, days: None, months: None, years: None }); 65 100 66 101 match command { 67 102 Commands::Daemon { db, socket } => { ··· 92 127 } 93 128 } 94 129 130 + // Check if a daemon is already listening before we remove and rebind the socket. 131 + // A successful connect means another process owns it — refuse to start. 132 + if UnixStream::connect(&socket_path).await.is_ok() { 133 + anyhow::bail!("Daemon already running at {}", socket_path.display()); 134 + } 135 + 95 136 let conn = open_db(&db_path)?; 96 137 run_daemon(socket_path, Arc::new(Mutex::new(conn))).await.context("Daemon failed")? 97 138 } ··· 125 166 reader.read_line(&mut line).await.context("Daemon closed connection without response")?; 126 167 let stats: Stats = serde_json::from_str(&line).context("Invalid response from daemon")?; 127 168 display_stats(stats); 169 + } 170 + Commands::Trend { socket, days, months, years, bucket, drv, test, csv } => { 171 + let socket_path = socket.unwrap_or_else(|| { 172 + project_dirs.as_ref() 173 + .and_then(|d| d.runtime_dir()) 174 + .map(|d| d.join("nod.sock")) 175 + .unwrap_or_else(|| PathBuf::from("/tmp/nod.sock")) 176 + }); 177 + 178 + let since: Option<i64> = if days.is_some() || months.is_some() || years.is_some() { 179 + let mut t = Utc::now(); 180 + if let Some(y) = years { t = t - chrono::Months::new(y * 12); } 181 + if let Some(m) = months { t = t - chrono::Months::new(m); } 182 + if let Some(d) = days { t = t - chrono::Duration::days(d as i64); } 183 + Some(t.timestamp()) 184 + } else { 185 + None 186 + }; 187 + 188 + // Auto-detect bucket granularity from the period length when not specified. 189 + let bucket_size = match bucket { 190 + Some(BucketArg::Hour) => BucketSize::Hour, 191 + Some(BucketArg::Day) => BucketSize::Day, 192 + Some(BucketArg::Week) => BucketSize::Week, 193 + Some(BucketArg::Month) => BucketSize::Month, 194 + None => match since { 195 + None => BucketSize::Month, 196 + Some(ts) => { 197 + let days_span = (Utc::now().timestamp() - ts) / 86400; 198 + if days_span <= 2 { BucketSize::Hour } 199 + else if days_span <= 60 { BucketSize::Day } 200 + else if days_span <= 365 { BucketSize::Week } 201 + else { BucketSize::Month } 202 + } 203 + }, 204 + }; 205 + 206 + let mut stream = UnixStream::connect(&socket_path) 207 + .await 208 + .with_context(|| format!("Failed to connect to daemon at {}", socket_path.display()))?; 209 + 210 + let cmd = serde_json::json!({"action": "get_trend", "since": since, "bucket": bucket_size, "drv": drv}); 211 + stream.write_all((cmd.to_string() + "\n").as_bytes()).await?; 212 + 213 + let mut reader = BufReader::new(stream); 214 + let mut line = String::new(); 215 + reader.read_line(&mut line).await.context("Daemon closed connection without response")?; 216 + let trend: Trend = serde_json::from_str(&line).context("Invalid response from daemon")?; 217 + if csv { output_csv_trend(&trend); } else if test { display_trend_test(&trend); } else { display_trend(&trend); } 128 218 } 129 219 Commands::Clean { socket } => { 130 220 let socket_path = socket.unwrap_or_else(|| {
+303 -6
src/stats.rs
··· 3 3 use serde::{Deserialize, Serialize}; 4 4 use std::sync::{Arc, Mutex}; 5 5 6 - // Sent over the socket as JSON - all computed values, no raw types 7 6 #[derive(Debug, Serialize, Deserialize)] 8 7 pub struct Stats { 9 8 pub build_count: i64, ··· 33 32 pub fn collect_stats(db: &Arc<Mutex<Connection>>, since: Option<i64>) -> Result<Stats> { 34 33 let conn = db.lock().unwrap(); 35 34 36 - // Convert unix timestamp to RFC3339 for comparison with stored start_time strings. 37 - // None means no filter - SQL NULL makes the condition vacuously true. 35 + // SQL NULL makes the WHERE condition vacuously true, giving us "no filter". 38 36 let since_str: Option<String> = since 39 37 .and_then(|ts| chrono::DateTime::from_timestamp(ts, 0)) 40 38 .map(|dt| dt.to_rfc3339()); ··· 68 66 }) 69 67 })?.filter_map(|r| r.ok()).collect(); 70 68 71 - // Use Substitute (108) events for cache latency — these measure full download time 72 - // per substituter, not just metadata query time (QueryPathInfo). 69 + // Substitute (108) measures full substitution time per cache; QueryPathInfo (109) 70 + // only measures metadata lookup and would undercount latency. 73 71 let mut stmt = conn.prepare( 74 72 "SELECT cache_url, AVG(duration_ms), COUNT(*) 75 73 FROM events WHERE event_type = 108 AND cache_url IS NOT NULL AND (?1 IS NULL OR start_time >= ?1) ··· 86 84 Ok(Stats { build_count, build_total_ms, subst_count, subst_total_ms, download_bytes, download_ms, slowest_builds, cache_latency }) 87 85 } 88 86 87 + // Mann-Whitney U is non-parametric and makes no distributional assumptions, 88 + // which is appropriate for build times that are right-skewed. 89 + pub struct MannWhitneyResult { 90 + pub p_value: f64, 91 + } 92 + 93 + pub fn mann_whitney_u(a: &[i64], b: &[i64]) -> Option<MannWhitneyResult> { 94 + if a.is_empty() || b.is_empty() { return None; } 95 + 96 + let n1 = a.len(); 97 + let n2 = b.len(); 98 + let n1f = n1 as f64; 99 + let n2f = n2 as f64; 100 + 101 + let mut combined: Vec<(i64, usize)> = a.iter().map(|&v| (v, 0)) 102 + .chain(b.iter().map(|&v| (v, 1))) 103 + .collect(); 104 + combined.sort_unstable_by_key(|&(v, _)| v); 105 + 106 + let n = combined.len(); 107 + 108 + // Average ranks within tie groups and accumulate the tie-correction term Σ(t³ - t). 109 + let mut rank_sum_a = 0.0f64; 110 + let mut tie_correction = 0.0f64; 111 + let mut i = 0; 112 + while i < n { 113 + let mut j = i; 114 + while j < n && combined[j].0 == combined[i].0 { j += 1; } 115 + let avg_rank = (i as f64 + 1.0 + j as f64) / 2.0; 116 + for k in i..j { 117 + if combined[k].1 == 0 { rank_sum_a += avg_rank; } 118 + } 119 + let t = (j - i) as f64; 120 + if t > 1.0 { tie_correction += t * t * t - t; } 121 + i = j; 122 + } 123 + 124 + assert!(rank_sum_a >= 0.0); 125 + 126 + let u_a = rank_sum_a - n1f * (n1f + 1.0) / 2.0; 127 + let u_b = n1f * n2f - u_a; 128 + 129 + assert!(u_a >= 0.0); 130 + assert!(u_b >= 0.0); 131 + assert!((u_a + u_b - n1f * n2f).abs() < 1e-6, "U_A + U_B must equal n1*n2"); 132 + 133 + let cliffs_delta = (u_a - u_b) / (n1f * n2f); 134 + assert!(cliffs_delta >= -1.0 - 1e-9); 135 + assert!(cliffs_delta <= 1.0 + 1e-9); 136 + let _ = cliffs_delta; 137 + 138 + // Var[U] = n1*n2/12 * [(n+1) - Σ(t³-t)/(n*(n-1))] 139 + let nf = n as f64; 140 + let variance = (n1f * n2f / 12.0) * ((nf + 1.0) - tie_correction / (nf * (nf - 1.0))); 141 + 142 + let p_value = if variance <= 0.0 { 143 + 1.0 144 + } else { 145 + let u_min = u_a.min(u_b); 146 + let mean_u = n1f * n2f / 2.0; 147 + // Continuity correction: +0.5 shifts U_min toward the mean, making z conservative. 148 + let z = (u_min - mean_u + 0.5) / variance.sqrt(); 149 + assert!(z <= 0.0 + 1e-9, "z must be non-positive for U_min ≤ mean_U"); 150 + (2.0 * normal_cdf(z)).min(1.0) 151 + }; 152 + 153 + assert!(p_value >= 0.0); 154 + assert!(p_value <= 1.0); 155 + 156 + Some(MannWhitneyResult { p_value }) 157 + } 158 + 159 + // Abramowitz & Stegun 7.1.26, max error ≈ 1.5×10⁻⁷. 160 + fn normal_cdf(z: f64) -> f64 { 161 + 0.5 * (1.0 + erf_approx(z / std::f64::consts::SQRT_2)) 162 + } 163 + 164 + fn erf_approx(x: f64) -> f64 { 165 + let t = 1.0 / (1.0 + 0.3275911 * x.abs()); 166 + let poly = t * (0.254829592 167 + + t * (-0.284496736 168 + + t * (1.421413741 169 + + t * (-1.453152027 170 + + t * 1.061405429)))); 171 + let result = 1.0 - poly * (-x * x).exp(); 172 + if x >= 0.0 { result } else { -result } 173 + } 174 + 175 + fn median_sorted(sorted: &[i64]) -> f64 { 176 + assert!(!sorted.is_empty()); 177 + let n = sorted.len(); 178 + if n % 2 == 0 { 179 + (sorted[n / 2 - 1] + sorted[n / 2]) as f64 / 2.0 180 + } else { 181 + sorted[n / 2] as f64 182 + } 183 + } 184 + 89 185 fn fmt_ms(ms: i64) -> String { 90 186 if ms < 1000 { 91 187 format!("{}ms", ms) ··· 96 192 } 97 193 } 98 194 99 - // Strip /nix/store/ prefix, keep the hash and package name. 100 195 fn drv_name(path: &str) -> &str { 101 196 path.strip_prefix("/nix/store/").unwrap_or(path) 197 + } 198 + 199 + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] 200 + #[serde(rename_all = "snake_case")] 201 + pub enum BucketSize { 202 + Hour, 203 + Day, 204 + Week, 205 + Month, 206 + } 207 + 208 + impl BucketSize { 209 + fn strftime_fmt(&self) -> &'static str { 210 + match self { 211 + BucketSize::Hour => "%Y-%m-%dT%H", 212 + BucketSize::Day => "%Y-%m-%d", 213 + BucketSize::Week => "%Y-W%W", 214 + BucketSize::Month => "%Y-%m", 215 + } 216 + } 217 + 218 + fn col_width(&self) -> usize { 219 + match self { 220 + BucketSize::Hour => 13, 221 + BucketSize::Day => 10, 222 + BucketSize::Week => 8, 223 + BucketSize::Month => 7, 224 + } 225 + } 226 + } 227 + 228 + // Raw durations are carried per-bucket so adjacent buckets can be compared 229 + // without a second round-trip to the daemon. 230 + #[derive(Debug, Serialize, Deserialize)] 231 + pub struct TrendBucket { 232 + pub bucket: String, 233 + pub build_durations: Vec<i64>, 234 + pub subst_durations: Vec<i64>, 235 + pub download_bytes: i64, 236 + } 237 + 238 + #[derive(Debug, Serialize, Deserialize)] 239 + pub struct Trend { 240 + pub buckets: Vec<TrendBucket>, 241 + pub bucket_size: BucketSize, 242 + pub drv_filter: Option<String>, 243 + } 244 + 245 + pub fn collect_trend( 246 + db: &Arc<Mutex<Connection>>, 247 + since: Option<i64>, 248 + bucket: BucketSize, 249 + drv: Option<String>, 250 + ) -> Result<Trend> { 251 + if let Some(ref d) = drv { 252 + assert!(!d.is_empty(), "drv filter must not be empty"); 253 + } 254 + 255 + let conn = db.lock().unwrap(); 256 + 257 + let since_str: Option<String> = since 258 + .and_then(|ts| chrono::DateTime::from_timestamp(ts, 0)) 259 + .map(|dt| dt.to_rfc3339()); 260 + let p = since_str.as_deref(); 261 + let drv_ref = drv.as_deref(); 262 + let fmt = bucket.strftime_fmt(); 263 + 264 + // Rows come out ordered by bucket then time, so grouping by sequential scan is valid. 265 + // FileTransfer (101) has NULL drv_path and is intentionally excluded by the drv filter. 266 + let mut stmt = conn.prepare( 267 + "SELECT strftime(?3, start_time), event_type, duration_ms, total_bytes 268 + FROM events 269 + WHERE event_type IN (101, 105, 108) 270 + AND (?1 IS NULL OR start_time >= ?1) 271 + AND (?2 IS NULL OR drv_path LIKE '%' || ?2 || '%') 272 + ORDER BY strftime(?3, start_time) ASC, start_time ASC", 273 + ).context("Failed to prepare trend query")?; 274 + 275 + let mut buckets: Vec<TrendBucket> = vec![]; 276 + for row in stmt.query_map(rusqlite::params![p, drv_ref, fmt], |r| { 277 + Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)?, r.get::<_, i64>(2)?, r.get::<_, i64>(3)?)) 278 + })?.filter_map(|r| r.ok()) { 279 + let (b, etype, dur, bytes) = row; 280 + if buckets.last().map(|x: &TrendBucket| x.bucket.as_str()) != Some(&b) { 281 + buckets.push(TrendBucket { bucket: b, build_durations: vec![], subst_durations: vec![], download_bytes: 0 }); 282 + } 283 + let last = buckets.last_mut().unwrap(); 284 + match etype { 285 + 105 => { assert!(dur >= 0); last.build_durations.push(dur); } 286 + 108 => { assert!(dur >= 0); last.subst_durations.push(dur); } 287 + 101 => { assert!(bytes >= 0); last.download_bytes += bytes; } 288 + _ => {} 289 + } 290 + } 291 + 292 + for i in 1..buckets.len() { 293 + assert!(buckets[i].bucket > buckets[i - 1].bucket, "buckets must be strictly ascending"); 294 + } 295 + 296 + Ok(Trend { buckets, bucket_size: bucket, drv_filter: drv }) 297 + } 298 + 299 + pub fn display_trend(trend: &Trend) { 300 + let has_downloads = trend.drv_filter.is_none() 301 + && trend.buckets.iter().any(|b| b.download_bytes > 0); 302 + let bw = trend.bucket_size.col_width(); 303 + 304 + if let Some(ref drv) = trend.drv_filter { 305 + println!("filter: {}", drv); 306 + } 307 + 308 + print!("{:<bw$} {:>6} {:>10} {:>6} {:>10}", "period", "builds", "build med", "subst", "subst med"); 309 + if has_downloads { print!(" {:>8}", "dl (MB)"); } 310 + println!(); 311 + 312 + if trend.buckets.is_empty() { 313 + println!("(no data)"); 314 + return; 315 + } 316 + 317 + for b in &trend.buckets { 318 + let mut bs = b.build_durations.clone(); bs.sort_unstable(); 319 + let mut ss = b.subst_durations.clone(); ss.sort_unstable(); 320 + let build_med = if bs.is_empty() { 0 } else { median_sorted(&bs) as i64 }; 321 + let subst_med = if ss.is_empty() { 0 } else { median_sorted(&ss) as i64 }; 322 + 323 + print!("{:<bw$} {:>6} {:>10} {:>6} {:>10}", 324 + b.bucket, b.build_durations.len(), fmt_ms(build_med), 325 + b.subst_durations.len(), fmt_ms(subst_med)); 326 + if has_downloads { print!(" {:>8.1}", b.download_bytes as f64 / 1_048_576.0); } 327 + println!(); 328 + } 329 + } 330 + 331 + fn print_test_section<F>(label: &str, buckets: &[TrendBucket], get_durs: F, bw: usize) 332 + where 333 + F: Fn(&TrendBucket) -> &[i64], 334 + { 335 + let any_data = buckets.iter().any(|b| !get_durs(b).is_empty()); 336 + if !any_data { return; } 337 + 338 + println!("{}", label); 339 + println!("{:<bw$} {:>5} {:>10} {:>7} {:>8}", "period", "n", "median", "Δ", "p-value"); 340 + 341 + for (i, b) in buckets.iter().enumerate() { 342 + let durs = get_durs(b); 343 + let mut sorted = durs.to_vec(); sorted.sort_unstable(); 344 + let med = if sorted.is_empty() { 0.0 } else { median_sorted(&sorted) }; 345 + 346 + let (delta_str, p_str) = if i == 0 || get_durs(&buckets[i - 1]).is_empty() { 347 + (String::new(), String::new()) 348 + } else { 349 + let prev = get_durs(&buckets[i - 1]); 350 + let mut prev_sorted = prev.to_vec(); prev_sorted.sort_unstable(); 351 + let prev_med = median_sorted(&prev_sorted); 352 + 353 + let delta = if prev_med > 0.0 { 354 + let pct = (med - prev_med) / prev_med * 100.0; 355 + let sign = if pct >= 0.0 { "+" } else { "" }; 356 + format!("{}{:.0}%", sign, pct) 357 + } else { 358 + String::new() 359 + }; 360 + 361 + let p = match mann_whitney_u(prev, durs) { 362 + Some(r) => format!("{:.3}", r.p_value), 363 + None => String::new(), 364 + }; 365 + 366 + (delta, p) 367 + }; 368 + 369 + println!("{:<bw$} {:>5} {:>10} {:>7} {:>8}", 370 + b.bucket, durs.len(), fmt_ms(med as i64), delta_str, p_str); 371 + } 372 + println!(); 373 + } 374 + 375 + pub fn display_trend_test(trend: &Trend) { 376 + let bw = trend.bucket_size.col_width(); 377 + 378 + if let Some(ref drv) = trend.drv_filter { 379 + println!("filter: {}", drv); 380 + } 381 + 382 + print_test_section("builds", &trend.buckets, |b| &b.build_durations, bw); 383 + print_test_section("substitutions", &trend.buckets, |b| &b.subst_durations, bw); 384 + 385 + println!("Mann-Whitney U (two-tailed). H0: adjacent periods have the same duration distribution."); 386 + } 387 + 388 + pub fn output_csv_trend(trend: &Trend) { 389 + println!("period,build_count,build_median_ms,subst_count,subst_median_ms,download_bytes"); 390 + for b in &trend.buckets { 391 + let mut bs = b.build_durations.clone(); bs.sort_unstable(); 392 + let mut ss = b.subst_durations.clone(); ss.sort_unstable(); 393 + let build_med = if bs.is_empty() { 0 } else { median_sorted(&bs) as i64 }; 394 + let subst_med = if ss.is_empty() { 0 } else { median_sorted(&ss) as i64 }; 395 + assert!(build_med >= 0); 396 + assert!(subst_med >= 0); 397 + println!("{},{},{},{},{},{}", b.bucket, b.build_durations.len(), build_med, b.subst_durations.len(), subst_med, b.download_bytes); 398 + } 102 399 } 103 400 104 401 pub fn display_stats(stats: Stats) {