Nix Observability Daemon
observability nix
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

chore(daemon): use separate reader and writer database connections:

We were seeing stat commands hanging with high amounts of events in
background, this should help.

+39 -51
+28 -45
src/daemon.rs
··· 183 183 184 184 struct State { 185 185 active_activities: HashMap<u64, Activity>, 186 - db: Arc<Mutex<Connection>>, 186 + } 187 + 188 + pub struct DbConnections { 189 + pub writer: Mutex<Connection>, 190 + pub reader: Mutex<Connection>, 187 191 } 188 192 189 193 pub fn open_db(path: &PathBuf) -> Result<Connection> { ··· 219 223 Ok(conn) 220 224 } 221 225 222 - pub async fn run_daemon(socket_path: PathBuf, db: Arc<Mutex<Connection>>) -> Result<()> { 226 + pub async fn run_daemon(socket_path: PathBuf, db: Arc<DbConnections>) -> Result<()> { 223 227 let state = Arc::new(Mutex::new(State { 224 228 active_activities: HashMap::new(), 225 - db, 226 229 })); 227 230 228 231 if socket_path.exists() { ··· 237 240 loop { 238 241 let (stream, _) = listener.accept().await?; 239 242 let state = Arc::clone(&state); 243 + let db = Arc::clone(&db); 240 244 tokio::spawn(async move { 241 - if let Err(e) = handle_connection(stream, state).await { 245 + if let Err(e) = handle_connection(stream, state, db).await { 242 246 error!("Connection error: {}", e); 243 247 } 244 248 }); 245 249 } 246 250 } 247 251 248 - async fn handle_connection(mut stream: UnixStream, state: Arc<Mutex<State>>) -> Result<()> { 252 + async fn handle_connection(mut stream: UnixStream, state: Arc<Mutex<State>>, db: Arc<DbConnections>) -> Result<()> { 249 253 let (reader, mut writer) = stream.split(); 250 254 let mut reader = BufReader::new(reader); 251 255 let mut line = String::new(); ··· 255 259 256 260 match serde_json::from_str::<SocketMessage>(line.trim()) { 257 261 Ok(SocketMessage::Command(ClientCommand::GetStats { since })) => { 258 - let db = state.lock().unwrap().db.clone(); 259 - let stats = tokio::task::spawn_blocking(move || collect_stats(&db, since)) 262 + let db = Arc::clone(&db); 263 + let stats = tokio::task::spawn_blocking(move || collect_stats(&db.reader, since)) 260 264 .await??; 261 265 writer.write_all((serde_json::to_string(&stats)? + "\n").as_bytes()).await?; 262 266 break; 263 267 } 264 268 Ok(SocketMessage::Command(ClientCommand::GetTrend { since, bucket, drv })) => { 265 - let db = state.lock().unwrap().db.clone(); 266 - let trend = tokio::task::spawn_blocking(move || collect_trend(&db, since, bucket, drv)) 269 + let db = Arc::clone(&db); 270 + let trend = tokio::task::spawn_blocking(move || collect_trend(&db.reader, since, bucket, drv)) 267 271 .await??; 268 272 writer.write_all((serde_json::to_string(&trend)? + "\n").as_bytes()).await?; 269 273 break; 270 274 } 271 275 Ok(SocketMessage::Command(ClientCommand::Clean)) => { 272 - let db = state.lock().unwrap().db.clone(); 276 + let db = Arc::clone(&db); 273 277 tokio::task::spawn_blocking(move || -> Result<()> { 274 - let conn = db.lock().unwrap(); 278 + let conn = db.writer.lock().unwrap(); 275 279 conn.execute_batch("DELETE FROM events; VACUUM; PRAGMA wal_checkpoint(TRUNCATE);")?; 276 280 Ok(()) 277 281 }).await??; ··· 280 284 break; 281 285 } 282 286 Ok(SocketMessage::Event(event)) => { 283 - if let Err(e) = process_event(event, &state) { 287 + if let Err(e) = process_event(event, &state, &db) { 284 288 error!("Failed to process event: {}", e); 285 289 } 286 290 } ··· 290 294 Ok(()) 291 295 } 292 296 293 - fn process_event(event: NixEvent, state: &Arc<Mutex<State>>) -> Result<()> { 297 + fn process_event(event: NixEvent, state: &Arc<Mutex<State>>, db: &Arc<DbConnections>) -> Result<()> { 294 298 let mut s = state.lock().unwrap(); 295 299 296 300 match event { ··· 302 306 fields.get(0).and_then(|v| v.as_str()).unwrap_or("").to_string() 303 307 }; 304 308 305 - info!( 306 - id, 307 - parent, 308 - act_type = %act_type, 309 - text = %text, 310 - fields = ?fields, 311 - "start" 312 - ); 309 + info!(id, parent, act_type = %act_type, text = %text, fields = ?fields, "start"); 313 310 314 311 s.active_activities.insert(id, Activity { 315 312 id, ··· 325 322 let res_type = ResultType::from(event_type); 326 323 327 324 if res_type != ResultType::BuildLogLine && res_type != ResultType::Progress && res_type != ResultType::SetExpected { 328 - info!( 329 - id, 330 - res_type = %res_type, 331 - fields = ?fields, 332 - "result" 333 - ); 325 + info!(id, res_type = %res_type, fields = ?fields, "result"); 334 326 } 335 327 336 328 if let Some(act) = s.active_activities.get_mut(&id) { ··· 348 340 let duration_ms = end_time.signed_duration_since(act.start_time).num_milliseconds(); 349 341 350 342 info!( 351 - id = act.id, 352 - act_type = %act_type, 353 - duration_ms, 354 - total_bytes = act.total_bytes, 355 - text = %act.text, 356 - fields = ?act.fields, 343 + id = act.id, act_type = %act_type, duration_ms, 344 + total_bytes = act.total_bytes, text = %act.text, fields = ?act.fields, 357 345 "stop" 358 346 ); 359 347 ··· 367 355 _ => None, 368 356 }; 369 357 370 - let conn = s.db.lock().unwrap(); 371 - conn.execute( 358 + drop(s); 359 + db.writer.lock().unwrap().execute( 372 360 "INSERT INTO events (nix_id, parent_id, event_type, text, drv_path, cache_url, start_time, end_time, duration_ms, total_bytes) 373 361 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)", 374 362 rusqlite::params![ 375 - act.id as i64, 376 - act.parent_id as i64, 377 - act.event_type as i64, 378 - act.text, 379 - drv_path, 380 - cache_url, 381 - act.start_time.to_rfc3339(), 382 - end_time.to_rfc3339(), 383 - duration_ms, 384 - act.total_bytes as i64, 363 + act.id as i64, act.parent_id as i64, act.event_type as i64, 364 + act.text, drv_path, cache_url, 365 + act.start_time.to_rfc3339(), end_time.to_rfc3339(), 366 + duration_ms, act.total_bytes as i64, 385 367 ], 386 368 ).context("Failed to insert event")?; 387 369 } 388 370 } 389 371 } 372 + 390 373 Ok(()) 391 374 }
+8 -3
src/main.rs
··· 11 11 use tokio::net::UnixStream; 12 12 use tracing::{error, info}; 13 13 14 - use daemon::{open_db, run_daemon}; 14 + use daemon::{open_db, run_daemon, DbConnections}; 15 15 use stats::{display_stats, display_trend, display_trend_test, output_csv_trend, BucketSize, Stats, Trend}; 16 16 17 17 #[derive(Clone, clap::ValueEnum)] ··· 133 133 anyhow::bail!("Daemon already running at {}", socket_path.display()); 134 134 } 135 135 136 - let conn = open_db(&db_path)?; 137 - run_daemon(socket_path, Arc::new(Mutex::new(conn))).await.context("Daemon failed")? 136 + let writer = open_db(&db_path)?; 137 + let reader = open_db(&db_path)?; 138 + let db = Arc::new(DbConnections { 139 + writer: Mutex::new(writer), 140 + reader: Mutex::new(reader), 141 + }); 142 + run_daemon(socket_path, db).await.context("Daemon failed")? 138 143 } 139 144 Commands::Stats { socket, days, months, years } => { 140 145 let socket_path = socket.unwrap_or_else(|| {
+3 -3
src/stats.rs
··· 1 1 use anyhow::{Context, Result}; 2 2 use rusqlite::Connection; 3 3 use serde::{Deserialize, Serialize}; 4 - use std::sync::{Arc, Mutex}; 4 + use std::sync::Mutex; 5 5 6 6 #[derive(Debug, Serialize, Deserialize)] 7 7 pub struct Stats { ··· 29 29 pub count: i64, 30 30 } 31 31 32 - pub fn collect_stats(db: &Arc<Mutex<Connection>>, since: Option<i64>) -> Result<Stats> { 32 + pub fn collect_stats(db: &Mutex<Connection>, since: Option<i64>) -> Result<Stats> { 33 33 let conn = db.lock().unwrap(); 34 34 35 35 // SQL NULL makes the WHERE condition vacuously true, giving us "no filter". ··· 243 243 } 244 244 245 245 pub fn collect_trend( 246 - db: &Arc<Mutex<Connection>>, 246 + db: &Mutex<Connection>, 247 247 since: Option<i64>, 248 248 bucket: BucketSize, 249 249 drv: Option<String>,