Nix Observability Daemon
observability nix
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

better data model

+124 -150
+124 -150
src/main.rs
··· 1 1 use anyhow::{Context, Result}; 2 + use chrono::{DateTime, Utc}; 2 3 use clap::{Parser, Subcommand}; 3 4 use serde::{Deserialize, Serialize}; 5 + use sqlx::{sqlite::SqliteConnectOptions, sqlite::SqlitePoolOptions, Pool, Row, Sqlite}; 4 6 use std::collections::HashMap; 7 + use std::fmt; 5 8 use std::path::PathBuf; 9 + use std::str::FromStr; 6 10 use std::sync::Arc; 7 11 use tokio::io::{AsyncBufReadExt, BufReader}; 8 12 use tokio::net::{UnixListener, UnixStream}; 9 13 use tokio::sync::Mutex; 10 - use tracing::{error, info, debug}; 11 - use sqlx::{sqlite::SqlitePoolOptions, sqlite::SqliteConnectOptions, Pool, Sqlite, Row}; 12 - use chrono::{Utc, DateTime}; 13 - use std::str::FromStr; 14 - use std::fmt; 14 + use tracing::{debug, error, info}; 15 15 16 16 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 17 17 #[repr(u64)] ··· 59 59 } 60 60 } 61 61 62 + #[derive(Debug, Clone, Copy, PartialEq, Eq)] 63 + #[repr(u64)] 64 + enum ResultType { 65 + Unknown = 0, 66 + FileLinked = 100, 67 + BuildLogLine = 101, 68 + UntrustedPath = 102, 69 + CorruptedPath = 103, 70 + SetPhase = 104, 71 + Progress = 105, 72 + SetExpected = 106, 73 + PostBuildLogLine = 107, 74 + FetchStatus = 108, 75 + } 76 + 77 + impl From<u64> for ResultType { 78 + fn from(n: u64) -> Self { 79 + match n { 80 + 100 => Self::FileLinked, 81 + 101 => Self::BuildLogLine, 82 + 102 => Self::UntrustedPath, 83 + 103 => Self::CorruptedPath, 84 + 104 => Self::SetPhase, 85 + 105 => Self::Progress, 86 + 106 => Self::SetExpected, 87 + 107 => Self::PostBuildLogLine, 88 + 108 => Self::FetchStatus, 89 + _ => Self::Unknown, 90 + } 91 + } 92 + } 93 + 62 94 #[derive(Parser)] 63 95 struct Cli { 64 96 #[arg(short, long, default_value = "/tmp/nix-observability.sock")] ··· 73 105 74 106 #[derive(Subcommand)] 75 107 enum Commands { 76 - /// Start the observability daemon 77 108 Daemon, 78 - /// Show build and substitution statistics 79 109 Stats, 80 110 } 81 111 ··· 96 126 97 127 struct Activity { 98 128 id: u64, 129 + parent_id: u64, 99 130 event_type: u64, 100 131 text: String, 101 132 start_time: DateTime<Utc>, 102 133 fields: Vec<serde_json::Value>, 103 - result_fields: Vec<serde_json::Value>, 134 + total_bytes: u64, 135 + current_phase: Option<(String, DateTime<Utc>)>, 104 136 } 105 137 106 138 struct State { ··· 121 153 .max_connections(5) 122 154 .connect_with(connection_options) 123 155 .await 124 - .context("Failed to connect to/create database")?; 156 + .context("Failed to connect to database")?; 125 157 126 158 sqlx::query( 127 159 "CREATE TABLE IF NOT EXISTS events ( 128 - id INTEGER PRIMARY KEY, 160 + id INTEGER PRIMARY KEY AUTOINCREMENT, 129 161 nix_id INTEGER, 130 162 parent_id INTEGER, 131 163 event_type INTEGER, 132 164 text TEXT, 165 + drv_path TEXT, 166 + cache_url TEXT, 133 167 start_time TEXT, 134 168 end_time TEXT, 135 169 duration_ms INTEGER, 136 - fields TEXT, 137 - result_fields TEXT, 138 - drv_path TEXT, 139 - cache_url TEXT 140 - )", 170 + total_bytes INTEGER 171 + ); 172 + CREATE TABLE IF NOT EXISTS phases ( 173 + id INTEGER PRIMARY KEY AUTOINCREMENT, 174 + event_nix_id INTEGER, 175 + phase_name TEXT, 176 + duration_ms INTEGER 177 + );", 141 178 ) 142 179 .execute(&pool) 143 180 .await?; ··· 160 197 std::fs::remove_file(&socket_path)?; 161 198 } 162 199 163 - let listener = UnixListener::bind(&socket_path) 164 - .with_context(|| format!("Failed to bind to {}", socket_path.display()))?; 165 - 166 - info!("Observability daemon listening on {}", socket_path.display()); 200 + let listener = UnixListener::bind(&socket_path)?; 201 + info!("Daemon listening on {}", socket_path.display()); 167 202 168 203 loop { 169 204 let (stream, _) = listener.accept().await?; 170 205 let state = Arc::clone(&state); 171 - 172 206 tokio::spawn(async move { 173 207 if let Err(e) = handle_connection(stream, state).await { 174 208 error!("Connection error: {}", e); ··· 180 214 async fn handle_connection(stream: UnixStream, state: Arc<Mutex<State>>) -> Result<()> { 181 215 let mut reader = BufReader::new(stream); 182 216 let mut line = String::new(); 183 - 184 217 loop { 185 218 line.clear(); 186 - let n = reader.read_line(&mut line).await?; 187 - if n == 0 { break; } 188 - 189 - let event: NixEvent = match serde_json::from_str(&line) { 190 - Ok(e) => e, 191 - Err(e) => { 192 - debug!("Failed to parse JSON: {} (line: {})", e, line); 193 - continue; 194 - } 195 - }; 196 - 197 - process_event(event, &state).await?; 219 + if reader.read_line(&mut line).await? == 0 { break; } 220 + if let Ok(event) = serde_json::from_str::<NixEvent>(&line) { 221 + process_event(event, &state).await?; 222 + } 198 223 } 199 - 200 224 Ok(()) 201 225 } 202 226 ··· 206 230 match event.action.as_str() { 207 231 "start" => { 208 232 let act_type = ActivityType::from(event.event_type); 209 - info!(id = event.id, %act_type, "Activity started: {}", event.text); 210 - 233 + info!(id = event.id, %act_type, "Start: {}", event.text); 211 234 s.active_activities.insert(event.id, Activity { 212 235 id: event.id, 236 + parent_id: event.parent, 213 237 event_type: event.event_type, 214 238 text: event.text, 215 239 start_time: Utc::now(), 216 240 fields: event.fields, 217 - result_fields: Vec::new(), 241 + total_bytes: 0, 242 + current_phase: None, 218 243 }); 219 244 } 220 245 "result" => { 221 - if let Some(activity) = s.active_activities.get_mut(&event.id) { 222 - activity.result_fields = event.fields; 246 + let res_type = ResultType::from(event.event_type); 247 + let pool = s.pool.clone(); 248 + 249 + if let Some(act) = s.active_activities.get_mut(&event.id) { 250 + match res_type { 251 + ResultType::SetPhase => { 252 + let phase_name = event.fields.get(0).and_then(|v| v.as_str()).unwrap_or("unknown"); 253 + let now = Utc::now(); 254 + if let Some((name, start)) = act.current_phase.take() { 255 + let duration = now.signed_duration_since(start).num_milliseconds(); 256 + let nid = act.id as i64; 257 + let name = name.clone(); 258 + tokio::spawn(async move { 259 + let _ = sqlx::query("INSERT INTO phases (event_nix_id, phase_name, duration_ms) VALUES (?, ?, ?)") 260 + .bind(nid).bind(name).bind(duration).execute(&pool).await; 261 + }); 262 + } 263 + act.current_phase = Some((phase_name.to_string(), now)); 264 + } 265 + ResultType::Progress => { 266 + if let Some(total) = event.fields.get(1).and_then(|v| v.as_u64()) { 267 + act.total_bytes = total; 268 + } 269 + } 270 + _ => {} 271 + } 223 272 } 224 273 } 225 274 "stop" => { 226 - if let Some(activity) = s.active_activities.remove(&event.id) { 275 + if let Some(act) = s.active_activities.remove(&event.id) { 227 276 let end_time = Utc::now(); 228 - let duration = end_time.signed_duration_since(activity.start_time); 229 - let act_type = ActivityType::from(activity.event_type); 230 - 231 - info!( 232 - id = activity.id, 233 - %act_type, 234 - duration_ms = duration.num_milliseconds(), 235 - "Activity finished: {}", 236 - activity.text 237 - ); 277 + let duration = end_time.signed_duration_since(act.start_time).num_milliseconds(); 278 + let act_type = ActivityType::from(act.event_type); 238 279 239 - let mut drv_path = None; 280 + let mut drv_path = act.fields.get(0).and_then(|v| v.as_str()).map(|s| s.to_string()); 240 281 let mut cache_url = None; 241 - 242 - match act_type { 243 - ActivityType::CopyPath | ActivityType::FileTransfer | 244 - ActivityType::Realise | ActivityType::Builds | ActivityType::Build => { 245 - drv_path = activity.fields.get(0) 246 - .and_then(|v| v.as_str()) 247 - .map(|s| s.to_string()); 248 - } 249 - ActivityType::Substitute => { 250 - drv_path = activity.fields.get(0) 251 - .and_then(|v| v.as_str()) 252 - .map(|s| s.to_string()); 253 - cache_url = activity.fields.get(1) 254 - .and_then(|v| v.as_str()) 255 - .map(|s| s.to_string()); 256 - } 257 - _ => {} 282 + if act_type == ActivityType::Substitute { 283 + cache_url = act.fields.get(1).and_then(|v| v.as_str()).map(|s| s.to_string()); 258 284 } 259 285 260 286 sqlx::query( 261 - "INSERT INTO events (nix_id, parent_id, event_type, text, start_time, end_time, duration_ms, fields, result_fields, drv_path, cache_url) 262 - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" 287 + "INSERT INTO events (nix_id, parent_id, event_type, text, drv_path, cache_url, start_time, end_time, duration_ms, total_bytes) 288 + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" 263 289 ) 264 - .bind(activity.id as i64) 265 - .bind(event.parent as i64) 266 - .bind(activity.event_type as i64) 267 - .bind(&activity.text) 268 - .bind(activity.start_time.to_rfc3339()) 269 - .bind(end_time.to_rfc3339()) 270 - .bind(duration.num_milliseconds()) 271 - .bind(serde_json::to_string(&activity.fields)?) 272 - .bind(serde_json::to_string(&activity.result_fields)?) 273 - .bind(drv_path) 274 - .bind(cache_url) 275 - .execute(&s.pool) 276 - .await?; 290 + .bind(act.id as i64).bind(act.parent_id as i64).bind(act.event_type as i64) 291 + .bind(act.text).bind(drv_path).bind(cache_url) 292 + .bind(act.start_time.to_rfc3339()).bind(end_time.to_rfc3339()) 293 + .bind(duration).bind(act.total_bytes as i64) 294 + .execute(&s.pool).await?; 277 295 } 278 296 } 279 - _ => { 280 - debug!(action = %event.action, id = event.id, "Received unknown action"); 281 - } 297 + _ => {} 282 298 } 283 - 284 299 Ok(()) 285 300 } 286 301 287 302 async fn show_stats(pool: Pool<Sqlite>) -> Result<()> { 288 303 let rows = sqlx::query( 289 - "SELECT 290 - event_type, 291 - COUNT(*) as count, 292 - SUM(duration_ms) as total_ms, 293 - AVG(duration_ms) as avg_ms 294 - FROM events 295 - GROUP BY event_type 296 - ORDER BY total_ms DESC" 297 - ) 298 - .fetch_all(&pool) 299 - .await?; 300 - 301 - if rows.is_empty() { 302 - println!("No data available in database."); 303 - return Ok(()); 304 - } 304 + "SELECT event_type, COUNT(*) as count, SUM(duration_ms) as total_ms, AVG(duration_ms) as avg_ms 305 + FROM events GROUP BY event_type ORDER BY total_ms DESC" 306 + ).fetch_all(&pool).await?; 305 307 306 308 println!("{:<20} {:>10} {:>15} {:>15}", "Activity", "Count", "Avg Time", "Total Time"); 307 309 println!("{:-<20} {:->10} {:->15} {:->15}", "", "", "", ""); 308 310 309 311 for row in rows { 310 - let ty_code: i64 = row.get("event_type"); 311 - let count: i64 = row.get("count"); 312 - let avg: f64 = row.get("avg_ms"); 313 - let total: i64 = row.get("total_ms"); 314 - 315 - let label = ActivityType::from(ty_code as u64).to_string(); 316 - 317 - println!( 318 - "{:<20} {:>10} {:>14.2}s {:>14.2}s", 319 - label, 320 - count, 321 - avg / 1000.0, 322 - total as f64 / 1000.0 323 - ); 312 + let label = ActivityType::from(row.get::<i64, _>("event_type") as u64).to_string(); 313 + println!("{:<20} {:>10} {:>14.2}s {:>14.2}s", label, row.get::<i64, _>("count"), 314 + row.get::<f64, _>("avg_ms") / 1000.0, row.get::<i64, _>("total_ms") as f64 / 1000.0); 324 315 } 325 316 326 - println!("\nTop 5 Most Time-Consuming Tasks:"); 327 - println!("{:<12} {:<15} {}", "Duration", "Type", "Resource/Text"); 328 - println!("{:-<12} {:-<15} {:-<30}", "", "", ""); 329 - 330 - let top_events = sqlx::query( 331 - "SELECT event_type, duration_ms, text, drv_path 332 - FROM events 333 - WHERE duration_ms > 0 334 - ORDER BY duration_ms DESC 335 - LIMIT 5" 336 - ) 337 - .fetch_all(&pool) 338 - .await?; 317 + let cache = sqlx::query( 318 + "SELECT COUNT(*) FILTER (WHERE event_type = 108) as hits, 319 + COUNT(*) FILTER (WHERE event_type = 105) as misses, 320 + SUM(total_bytes) as bytes FROM events" 321 + ).fetch_one(&pool).await?; 339 322 340 - for row in top_events { 341 - let dur: i64 = row.get("duration_ms"); 342 - let ty_code: i64 = row.get("event_type"); 343 - let label = ActivityType::from(ty_code as u64).to_string(); 344 - let path: String = row.get::<Option<String>, _>("drv_path") 345 - .or_else(|| row.get::<Option<String>, _>("text")) 346 - .unwrap_or_else(|| "unknown".to_string()); 347 - 348 - let short_path = if path.len() > 60 { 349 - format!("...{}", &path[path.len()-57..]) 350 - } else { 351 - path 352 - }; 323 + let hits: i64 = cache.get("hits"); 324 + let misses: i64 = cache.get("misses"); 325 + let total = hits + misses; 326 + println!("\n--- Cache Performance ---\nHit Rate: {:.1}%\nData Fetched: {:.2} MB", 327 + if total > 0 { (hits as f64 / total as f64) * 100.0 } else { 0.0 }, 328 + cache.get::<i64, _>("bytes") as f64 / 1024.0 / 1024.0); 353 329 354 - println!( 355 - "{:>10.2}s {:<15} {}", 356 - dur as f64 / 1000.0, 357 - label, 358 - short_path 359 - ); 330 + println!("\n--- Top 5 Build Phases ---\n{:<20} {:>15}", "Phase", "Avg Duration"); 331 + let phases = sqlx::query("SELECT phase_name, AVG(duration_ms) as avg FROM phases GROUP BY phase_name ORDER BY avg DESC LIMIT 5") 332 + .fetch_all(&pool).await?; 333 + for p in phases { 334 + println!("{:<20} {:>14.2}s", p.get::<String, _>("phase_name"), p.get::<f64, _>("avg") / 1000.0); 360 335 } 361 - 362 336 Ok(()) 363 337 }