Nix Observability Daemon
observability nix
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

perf: dates -> int

+75 -30
+71 -18
src/daemon.rs
··· 19 19 h 20 20 } 21 21 22 + // When SCHEMA changes: (1) append a new hash to SCHEMA_HASHES, (2) add a migration to MIGRATIONS. 22 23 const SCHEMA: &str = " 23 24 CREATE TABLE IF NOT EXISTS events ( 24 25 id INTEGER PRIMARY KEY AUTOINCREMENT, ··· 28 29 text TEXT, 29 30 drv_path TEXT, 30 31 cache_url TEXT, 31 - start_time TEXT, 32 - end_time TEXT, 32 + start_time INTEGER, 33 + end_time INTEGER, 33 34 duration_ms INTEGER, 34 35 total_bytes INTEGER 35 36 ); 37 + CREATE INDEX IF NOT EXISTS idx_events_type_start ON events(event_type, start_time); 36 38 "; 37 39 38 - // Append a new hash entry when the schema changes - SCHEMA_VERSION auto-increments. 39 40 const SCHEMA_HASHES: &[u32] = &[ 40 - 0x9bc94a70, // v1 41 + 0x9bc94a70, // v1: TEXT timestamps, no indexes 42 + 0xee061d32, // v2: INTEGER timestamps (Unix seconds), idx_events_type_start 41 43 ]; 42 44 const SCHEMA_VERSION: u32 = SCHEMA_HASHES.len() as u32; 43 45 const _: () = assert!( 44 46 schema_hash(SCHEMA.as_bytes()) == SCHEMA_HASHES[SCHEMA_VERSION as usize - 1], 45 - "schema changed - append new hash to SCHEMA_HASHES" 47 + "schema changed - append new hash to SCHEMA_HASHES and add a migration in MIGRATIONS" 46 48 ); 49 + 50 + // (target_version, sql). Table rebuild because SQLite does not support ALTER COLUMN. 51 + const MIGRATIONS: &[(u32, &str)] = &[ 52 + (2, " 53 + CREATE TABLE events_new ( 54 + id INTEGER PRIMARY KEY AUTOINCREMENT, 55 + nix_id INTEGER, 56 + parent_id INTEGER, 57 + event_type INTEGER, 58 + text TEXT, 59 + drv_path TEXT, 60 + cache_url TEXT, 61 + start_time INTEGER, 62 + end_time INTEGER, 63 + duration_ms INTEGER, 64 + total_bytes INTEGER 65 + ); 66 + INSERT INTO events_new 67 + SELECT id, nix_id, parent_id, event_type, text, drv_path, cache_url, 68 + CAST(strftime('%s', start_time) AS INTEGER), 69 + CAST(strftime('%s', end_time) AS INTEGER), 70 + duration_ms, total_bytes 71 + FROM events; 72 + DROP TABLE events; 73 + ALTER TABLE events_new RENAME TO events; 74 + CREATE INDEX IF NOT EXISTS idx_events_type_start ON events(event_type, start_time); 75 + "), 76 + ]; 47 77 48 78 #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] 49 79 #[repr(u64)] ··· 194 224 let conn = Connection::open(path) 195 225 .with_context(|| format!("Failed to open database at {}", path.display()))?; 196 226 197 - // Check schema version; reset if mismatched. 198 - let version: u32 = conn 227 + let db_version: u32 = conn 199 228 .query_row("PRAGMA user_version", [], |r| r.get(0)) 200 229 .unwrap_or(0); 201 230 202 - if version != 0 && version != SCHEMA_VERSION { 203 - info!(current = version, expected = SCHEMA_VERSION, "Schema version mismatch, resetting database"); 204 - drop(conn); 205 - std::fs::remove_file(path) 206 - .with_context(|| format!("Failed to remove stale database at {}", path.display()))?; 207 - return open_db(path); 231 + assert!(db_version <= u32::MAX); 232 + 233 + if db_version > SCHEMA_VERSION { 234 + anyhow::bail!( 235 + "Database at {} was created by a newer version of nod \ 236 + (db version {}, this binary knows version {}). \ 237 + Delete it manually to start fresh, or upgrade nod.", 238 + path.display(), db_version, SCHEMA_VERSION 239 + ); 208 240 } 209 241 210 - conn.execute_batch(SCHEMA) 211 - .context("Failed to create schema")?; 212 - conn.execute_batch(&format!("PRAGMA user_version = {}", SCHEMA_VERSION)) 213 - .context("Failed to set schema version")?; 242 + if db_version == 0 { 243 + conn.execute_batch(SCHEMA).context("Failed to create schema")?; 244 + conn.execute_batch(&format!("PRAGMA user_version = {SCHEMA_VERSION}")) 245 + .context("Failed to set schema version")?; 246 + info!(version = SCHEMA_VERSION, "Initialized fresh database"); 247 + } else if db_version < SCHEMA_VERSION { 248 + info!(from = db_version, to = SCHEMA_VERSION, "Migrating database"); 249 + let mut current = db_version; 250 + for &(target, sql) in MIGRATIONS { 251 + if current >= target { continue; } 252 + assert!(target <= SCHEMA_VERSION); 253 + assert!(target == current + 1, "migration gap: {} -> {}", current, target); 254 + info!(from = current, to = target, "Running migration"); 255 + conn.execute_batch("BEGIN").context("Failed to begin migration transaction")?; 256 + conn.execute_batch(sql) 257 + .with_context(|| format!("Migration v{current} -> v{target} failed"))?; 258 + conn.execute_batch("COMMIT").context("Failed to commit migration")?; 259 + conn.execute_batch(&format!("PRAGMA user_version = {target}")) 260 + .context("Failed to update schema version after migration")?; 261 + current = target; 262 + } 263 + assert_eq!(current, SCHEMA_VERSION, "migrations did not reach current schema version"); 264 + info!(version = SCHEMA_VERSION, "Migration complete"); 265 + } 266 + 214 267 conn.execute_batch(" 215 268 PRAGMA journal_mode = WAL; 216 269 PRAGMA synchronous = NORMAL; ··· 362 415 rusqlite::params![ 363 416 act.id as i64, act.parent_id as i64, act.event_type as i64, 364 417 act.text, drv_path, cache_url, 365 - act.start_time.to_rfc3339(), end_time.to_rfc3339(), 418 + act.start_time.timestamp(), end_time.timestamp(), 366 419 duration_ms, act.total_bytes as i64, 367 420 ], 368 421 ).context("Failed to insert event")?;
+4 -12
src/stats.rs
··· 32 32 pub fn collect_stats(db: &Mutex<Connection>, since: Option<i64>) -> Result<Stats> { 33 33 let conn = db.lock().unwrap(); 34 34 35 - // SQL NULL makes the WHERE condition vacuously true, giving us "no filter". 36 - let since_str: Option<String> = since 37 - .and_then(|ts| chrono::DateTime::from_timestamp(ts, 0)) 38 - .map(|dt| dt.to_rfc3339()); 39 - let p = since_str.as_deref(); 35 + let p: Option<i64> = since; 40 36 41 37 let (build_count, build_total_ms, subst_count, subst_total_ms, download_bytes, download_ms) = 42 38 conn.query_row( ··· 254 250 255 251 let conn = db.lock().unwrap(); 256 252 257 - let since_str: Option<String> = since 258 - .and_then(|ts| chrono::DateTime::from_timestamp(ts, 0)) 259 - .map(|dt| dt.to_rfc3339()); 260 - let p = since_str.as_deref(); 253 + let p: Option<i64> = since; 261 254 let drv_ref = drv.as_deref(); 262 255 let fmt = bucket.strftime_fmt(); 263 256 264 - // Rows come out ordered by bucket then time, so grouping by sequential scan is valid. 265 257 // FileTransfer (101) has NULL drv_path and is intentionally excluded by the drv filter. 266 258 let mut stmt = conn.prepare( 267 - "SELECT strftime(?3, start_time), event_type, duration_ms, total_bytes 259 + "SELECT strftime(?3, start_time, 'unixepoch'), event_type, duration_ms, total_bytes 268 260 FROM events 269 261 WHERE event_type IN (101, 105, 108) 270 262 AND (?1 IS NULL OR start_time >= ?1) 271 263 AND (?2 IS NULL OR drv_path LIKE '%' || ?2 || '%') 272 - ORDER BY strftime(?3, start_time) ASC, start_time ASC", 264 + ORDER BY strftime(?3, start_time, 'unixepoch') ASC, start_time ASC", 273 265 ).context("Failed to prepare trend query")?; 274 266 275 267 let mut buckets: Vec<TrendBucket> = vec![];