Nix Observability Daemon
observability nix
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

improvements!

+604 -46
+290
Cargo.lock
··· 15 15 ] 16 16 17 17 [[package]] 18 + name = "aho-corasick" 19 + version = "1.1.4" 20 + source = "registry+https://github.com/rust-lang/crates.io-index" 21 + checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301" 22 + dependencies = [ 23 + "memchr", 24 + ] 25 + 26 + [[package]] 18 27 name = "android_system_properties" 19 28 version = "0.1.5" 20 29 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 22 31 dependencies = [ 23 32 "libc", 24 33 ] 34 + 35 + [[package]] 36 + name = "anes" 37 + version = "0.1.6" 38 + source = "registry+https://github.com/rust-lang/crates.io-index" 39 + checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" 25 40 26 41 [[package]] 27 42 name = "anstream" ··· 104 119 checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" 105 120 106 121 [[package]] 122 + name = "cast" 123 + version = "0.3.0" 124 + source = "registry+https://github.com/rust-lang/crates.io-index" 125 + checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" 126 + 127 + [[package]] 107 128 name = "cc" 108 129 version = "1.2.56" 109 130 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 133 154 ] 134 155 135 156 [[package]] 157 + name = "ciborium" 158 + version = "0.2.2" 159 + source = "registry+https://github.com/rust-lang/crates.io-index" 160 + checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e" 161 + dependencies = [ 162 + "ciborium-io", 163 + "ciborium-ll", 164 + "serde", 165 + ] 166 + 167 + [[package]] 168 + name = "ciborium-io" 169 + version = "0.2.2" 170 + source = "registry+https://github.com/rust-lang/crates.io-index" 171 + checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757" 172 + 173 + [[package]] 174 + name = "ciborium-ll" 175 + version = "0.2.2" 176 + source = "registry+https://github.com/rust-lang/crates.io-index" 177 + checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9" 178 + dependencies = [ 179 + "ciborium-io", 180 + "half", 181 + ] 182 + 183 + [[package]] 136 184 name = "clap" 137 185 version = "4.5.60" 138 186 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 185 233 checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" 186 234 187 235 [[package]] 236 + name = "criterion" 237 + version = "0.5.1" 238 + source = "registry+https://github.com/rust-lang/crates.io-index" 239 + checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f" 240 + dependencies = [ 241 + "anes", 242 + "cast", 243 + "ciborium", 244 + "clap", 245 + "criterion-plot", 246 + "is-terminal", 247 + "itertools", 248 + "num-traits", 249 + "once_cell", 250 + "oorandom", 251 + "plotters", 252 + "rayon", 253 + "regex", 254 + "serde", 255 + "serde_derive", 256 + "serde_json", 257 + "tinytemplate", 258 + "walkdir", 259 + ] 260 + 261 + [[package]] 262 + name = "criterion-plot" 263 + version = "0.5.0" 264 + source = "registry+https://github.com/rust-lang/crates.io-index" 265 + checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1" 266 + dependencies = [ 267 + "cast", 268 + "itertools", 269 + ] 270 + 271 + [[package]] 272 + name = "crossbeam-deque" 273 + version = "0.8.6" 274 + source = "registry+https://github.com/rust-lang/crates.io-index" 275 + checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" 276 + dependencies = [ 277 + "crossbeam-epoch", 278 + "crossbeam-utils", 279 + ] 280 + 281 + [[package]] 282 + name = "crossbeam-epoch" 283 + version = "0.9.18" 284 + source = "registry+https://github.com/rust-lang/crates.io-index" 285 + checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" 286 + dependencies = [ 287 + "crossbeam-utils", 288 + ] 289 + 290 + [[package]] 291 + name = "crossbeam-utils" 292 + version = "0.8.21" 293 + source = "registry+https://github.com/rust-lang/crates.io-index" 294 + checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" 295 + 296 + [[package]] 297 + name = "crunchy" 298 + version = "0.2.4" 299 + source = "registry+https://github.com/rust-lang/crates.io-index" 300 + checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" 301 + 302 + [[package]] 188 303 name = "directories" 189 304 version = "5.0.1" 190 305 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 204 319 "redox_users", 205 320 "windows-sys 0.48.0", 206 321 ] 322 + 323 + [[package]] 324 + name = "either" 325 + version = "1.15.0" 326 + source = "registry+https://github.com/rust-lang/crates.io-index" 327 + checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" 207 328 208 329 [[package]] 209 330 name = "errno" ··· 245 366 ] 246 367 247 368 [[package]] 369 + name = "half" 370 + version = "2.7.1" 371 + source = "registry+https://github.com/rust-lang/crates.io-index" 372 + checksum = "6ea2d84b969582b4b1864a92dc5d27cd2b77b622a8d79306834f1be5ba20d84b" 373 + dependencies = [ 374 + "cfg-if", 375 + "crunchy", 376 + "zerocopy", 377 + ] 378 + 379 + [[package]] 248 380 name = "hashbrown" 249 381 version = "0.14.5" 250 382 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 269 401 checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" 270 402 271 403 [[package]] 404 + name = "hermit-abi" 405 + version = "0.5.2" 406 + source = "registry+https://github.com/rust-lang/crates.io-index" 407 + checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" 408 + 409 + [[package]] 272 410 name = "iana-time-zone" 273 411 version = "0.1.65" 274 412 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 293 431 ] 294 432 295 433 [[package]] 434 + name = "is-terminal" 435 + version = "0.4.17" 436 + source = "registry+https://github.com/rust-lang/crates.io-index" 437 + checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" 438 + dependencies = [ 439 + "hermit-abi", 440 + "libc", 441 + "windows-sys 0.61.2", 442 + ] 443 + 444 + [[package]] 296 445 name = "is_terminal_polyfill" 297 446 version = "1.70.2" 298 447 source = "registry+https://github.com/rust-lang/crates.io-index" 299 448 checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" 449 + 450 + [[package]] 451 + name = "itertools" 452 + version = "0.10.5" 453 + source = "registry+https://github.com/rust-lang/crates.io-index" 454 + checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" 455 + dependencies = [ 456 + "either", 457 + ] 300 458 301 459 [[package]] 302 460 name = "itoa" ··· 385 543 "anyhow", 386 544 "chrono", 387 545 "clap", 546 + "criterion", 388 547 "directories", 389 548 "rusqlite", 390 549 "serde", ··· 425 584 checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" 426 585 427 586 [[package]] 587 + name = "oorandom" 588 + version = "11.1.5" 589 + source = "registry+https://github.com/rust-lang/crates.io-index" 590 + checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e" 591 + 592 + [[package]] 428 593 name = "option-ext" 429 594 version = "0.2.0" 430 595 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 466 631 checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" 467 632 468 633 [[package]] 634 + name = "plotters" 635 + version = "0.3.7" 636 + source = "registry+https://github.com/rust-lang/crates.io-index" 637 + checksum = "5aeb6f403d7a4911efb1e33402027fc44f29b5bf6def3effcc22d7bb75f2b747" 638 + dependencies = [ 639 + "num-traits", 640 + "plotters-backend", 641 + "plotters-svg", 642 + "wasm-bindgen", 643 + "web-sys", 644 + ] 645 + 646 + [[package]] 647 + name = "plotters-backend" 648 + version = "0.3.7" 649 + source = "registry+https://github.com/rust-lang/crates.io-index" 650 + checksum = "df42e13c12958a16b3f7f4386b9ab1f3e7933914ecea48da7139435263a4172a" 651 + 652 + [[package]] 653 + name = "plotters-svg" 654 + version = "0.3.7" 655 + source = "registry+https://github.com/rust-lang/crates.io-index" 656 + checksum = "51bae2ac328883f7acdfea3d66a7c35751187f870bc81f94563733a154d7a670" 657 + dependencies = [ 658 + "plotters-backend", 659 + ] 660 + 661 + [[package]] 469 662 name = "proc-macro2" 470 663 version = "1.0.106" 471 664 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 484 677 ] 485 678 486 679 [[package]] 680 + name = "rayon" 681 + version = "1.11.0" 682 + source = "registry+https://github.com/rust-lang/crates.io-index" 683 + checksum = "368f01d005bf8fd9b1206fb6fa653e6c4a81ceb1466406b81792d87c5677a58f" 684 + dependencies = [ 685 + "either", 686 + "rayon-core", 687 + ] 688 + 689 + [[package]] 690 + name = "rayon-core" 691 + version = "1.13.0" 692 + source = "registry+https://github.com/rust-lang/crates.io-index" 693 + checksum = "22e18b0f0062d30d4230b2e85ff77fdfe4326feb054b9783a3460d8435c8ab91" 694 + dependencies = [ 695 + "crossbeam-deque", 696 + "crossbeam-utils", 697 + ] 698 + 699 + [[package]] 487 700 name = "redox_syscall" 488 701 version = "0.5.18" 489 702 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 504 717 ] 505 718 506 719 [[package]] 720 + name = "regex" 721 + version = "1.12.3" 722 + source = "registry+https://github.com/rust-lang/crates.io-index" 723 + checksum = "e10754a14b9137dd7b1e3e5b0493cc9171fdd105e0ab477f51b72e7f3ac0e276" 724 + dependencies = [ 725 + "aho-corasick", 726 + "memchr", 727 + "regex-automata", 728 + "regex-syntax", 729 + ] 730 + 731 + [[package]] 732 + name = "regex-automata" 733 + version = "0.4.14" 734 + source = "registry+https://github.com/rust-lang/crates.io-index" 735 + checksum = "6e1dd4122fc1595e8162618945476892eefca7b88c52820e74af6262213cae8f" 736 + dependencies = [ 737 + "aho-corasick", 738 + "memchr", 739 + "regex-syntax", 740 + ] 741 + 742 + [[package]] 743 + name = "regex-syntax" 744 + version = "0.8.10" 745 + source = "registry+https://github.com/rust-lang/crates.io-index" 746 + checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" 747 + 748 + [[package]] 507 749 name = "rusqlite" 508 750 version = "0.31.0" 509 751 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 522 764 version = "1.0.22" 523 765 source = "registry+https://github.com/rust-lang/crates.io-index" 524 766 checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" 767 + 768 + [[package]] 769 + name = "same-file" 770 + version = "1.0.6" 771 + source = "registry+https://github.com/rust-lang/crates.io-index" 772 + checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" 773 + dependencies = [ 774 + "winapi-util", 775 + ] 525 776 526 777 [[package]] 527 778 name = "scopeguard" ··· 660 911 ] 661 912 662 913 [[package]] 914 + name = "tinytemplate" 915 + version = "1.2.1" 916 + source = "registry+https://github.com/rust-lang/crates.io-index" 917 + checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" 918 + dependencies = [ 919 + "serde", 920 + "serde_json", 921 + ] 922 + 923 + [[package]] 663 924 name = "tokio" 664 925 version = "1.50.0" 665 926 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 775 1036 checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" 776 1037 777 1038 [[package]] 1039 + name = "walkdir" 1040 + version = "2.5.0" 1041 + source = "registry+https://github.com/rust-lang/crates.io-index" 1042 + checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" 1043 + dependencies = [ 1044 + "same-file", 1045 + "winapi-util", 1046 + ] 1047 + 1048 + [[package]] 778 1049 name = "wasi" 779 1050 version = "0.11.1+wasi-snapshot-preview1" 780 1051 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 823 1094 checksum = "75a326b8c223ee17883a4251907455a2431acc2791c98c26279376490c378c16" 824 1095 dependencies = [ 825 1096 "unicode-ident", 1097 + ] 1098 + 1099 + [[package]] 1100 + name = "web-sys" 1101 + version = "0.3.91" 1102 + source = "registry+https://github.com/rust-lang/crates.io-index" 1103 + checksum = "854ba17bb104abfb26ba36da9729addc7ce7f06f5c0f90f3c391f8461cca21f9" 1104 + dependencies = [ 1105 + "js-sys", 1106 + "wasm-bindgen", 1107 + ] 1108 + 1109 + [[package]] 1110 + name = "winapi-util" 1111 + version = "0.1.11" 1112 + source = "registry+https://github.com/rust-lang/crates.io-index" 1113 + checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" 1114 + dependencies = [ 1115 + "windows-sys 0.61.2", 826 1116 ] 827 1117 828 1118 [[package]]
+15
Cargo.toml
··· 3 3 version = "0.1.0" 4 4 edition = "2024" 5 5 6 + [lib] 7 + name = "nod" 8 + path = "src/lib.rs" 9 + 10 + [[bin]] 11 + name = "nod" 12 + path = "src/main.rs" 13 + 14 + [[bench]] 15 + name = "queries" 16 + harness = false 17 + 6 18 [dependencies] 7 19 tokio = { version = "1", features = ["full"] } 8 20 tracing = "0.1" ··· 14 26 directories = "5.0" 15 27 serde = { version = "1.0", features = ["derive"] } 16 28 serde_json = "1.0" 29 + 30 + [dev-dependencies] 31 + criterion = { version = "0.5", features = ["html_reports"] }
+109
benches/queries.rs
··· 1 + use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; 2 + use nod::stats::{collect_stats, collect_trend, BucketSize, SortField}; 3 + use rusqlite::Connection; 4 + use std::sync::Mutex; 5 + 6 + // Seed an in-memory database with n rows spread evenly across one year. 7 + // Row mix: 60% builds (105), 30% substitutions (108), 10% downloads (101). 8 + // Uses a single transaction and a prepared statement for speed. 9 + fn seed(conn: &Connection, n: usize) { 10 + conn.execute_batch(" 11 + CREATE TABLE events ( 12 + id INTEGER PRIMARY KEY AUTOINCREMENT, 13 + nix_id INTEGER, 14 + parent_id INTEGER, 15 + event_type INTEGER, 16 + text TEXT, 17 + drv_path TEXT, 18 + cache_url TEXT, 19 + start_time INTEGER, 20 + end_time INTEGER, 21 + duration_ms INTEGER, 22 + total_bytes INTEGER 23 + ); 24 + CREATE INDEX idx_events_type_start ON events(event_type, start_time); 25 + CREATE INDEX idx_events_start_time ON events(start_time); 26 + PRAGMA journal_mode = WAL; 27 + PRAGMA synchronous = NORMAL; 28 + ").unwrap(); 29 + 30 + let base: i64 = 1_700_000_000; // 2023-11-14 31 + let span: i64 = 365 * 86400; 32 + 33 + conn.execute_batch("BEGIN").unwrap(); 34 + let mut stmt = conn.prepare( 35 + "INSERT INTO events (event_type, drv_path, cache_url, start_time, end_time, duration_ms, total_bytes) 36 + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)", 37 + ).unwrap(); 38 + 39 + for i in 0..n { 40 + let event_type: i64 = if i % 10 < 6 { 105 } else if i % 10 < 9 { 108 } else { 101 }; 41 + let start = base + (i as i64 * span / n as i64); 42 + // Varied durations 1ms–10min. wrapping_mul avoids overflow; abs() ensures positive. 43 + let duration_ms: i64 = 1 + (i as i64).wrapping_mul(6364136223846793005).abs() % 600_000; 44 + let total_bytes: i64 = if event_type == 101 { (i as i64).wrapping_mul(104729).abs() % 500_000_000 } else { 0 }; 45 + let drv_path: Option<String> = if event_type != 101 { 46 + Some(format!("/nix/store/{:032x}-pkg-{}.drv", i as u128, i % 200)) 47 + } else { 48 + None 49 + }; 50 + let cache_url: Option<&str> = if event_type == 108 { Some("https://cache.nixos.org") } else { None }; 51 + 52 + stmt.execute(rusqlite::params![ 53 + event_type, drv_path, cache_url, 54 + start, start + duration_ms / 1000, 55 + duration_ms, total_bytes, 56 + ]).unwrap(); 57 + } 58 + 59 + conn.execute_batch("COMMIT").unwrap(); 60 + } 61 + 62 + fn bench_collect_stats(c: &mut Criterion) { 63 + let mut group = c.benchmark_group("collect_stats"); 64 + 65 + for n in [10_000usize, 100_000, 1_000_000] { 66 + let conn = Connection::open_in_memory().unwrap(); 67 + seed(&conn, n); 68 + let db = Mutex::new(conn); 69 + 70 + group.bench_with_input(BenchmarkId::new("no_filter", n), &n, |b, _| { 71 + b.iter(|| collect_stats(&db, None, None, SortField::Duration, 10, false).unwrap()) 72 + }); 73 + 74 + group.bench_with_input(BenchmarkId::new("grouped", n), &n, |b, _| { 75 + b.iter(|| collect_stats(&db, None, None, SortField::Count, 10, true).unwrap()) 76 + }); 77 + } 78 + 79 + group.finish(); 80 + } 81 + 82 + fn bench_collect_trend(c: &mut Criterion) { 83 + let mut group = c.benchmark_group("collect_trend"); 84 + 85 + for n in [10_000usize, 100_000, 1_000_000] { 86 + let conn = Connection::open_in_memory().unwrap(); 87 + seed(&conn, n); 88 + let db = Mutex::new(conn); 89 + 90 + // aggregate (raw=false): SQL window function median — the default code path. 91 + group.bench_with_input(BenchmarkId::new("aggregate/month", n), &n, |b, _| { 92 + b.iter(|| collect_trend(&db, None, BucketSize::Month, None, false).unwrap()) 93 + }); 94 + 95 + group.bench_with_input(BenchmarkId::new("aggregate/day", n), &n, |b, _| { 96 + b.iter(|| collect_trend(&db, None, BucketSize::Day, None, false).unwrap()) 97 + }); 98 + 99 + // raw (raw=true): sends all durations for Mann-Whitney — the memory-heavy path. 100 + group.bench_with_input(BenchmarkId::new("raw/month", n), &n, |b, _| { 101 + b.iter(|| collect_trend(&db, None, BucketSize::Month, None, true).unwrap()) 102 + }); 103 + } 104 + 105 + group.finish(); 106 + } 107 + 108 + criterion_group!(benches, bench_collect_stats, bench_collect_trend); 109 + criterion_main!(benches);
+21 -11
src/daemon.rs
··· 35 35 total_bytes INTEGER 36 36 ); 37 37 CREATE INDEX IF NOT EXISTS idx_events_type_start ON events(event_type, start_time); 38 + CREATE INDEX IF NOT EXISTS idx_events_start_time ON events(start_time); 38 39 "; 39 40 40 41 const SCHEMA_HASHES: &[u32] = &[ 41 42 0x9bc94a70, // v1: TEXT timestamps, no indexes 42 43 0xee061d32, // v2: INTEGER timestamps (Unix seconds), idx_events_type_start 44 + 0x7c09711e, // v3: idx_events_start_time 43 45 ]; 44 46 const SCHEMA_VERSION: u32 = SCHEMA_HASHES.len() as u32; 45 47 const _: () = assert!( ··· 47 49 "schema changed - append new hash to SCHEMA_HASHES and add a migration in MIGRATIONS" 48 50 ); 49 51 50 - // (target_version, sql). Table rebuild because SQLite does not support ALTER COLUMN. 52 + // (target_version, sql). Must be ordered by target version ascending. 53 + // Table rebuild used for v2 because SQLite does not support ALTER COLUMN. 51 54 const MIGRATIONS: &[(u32, &str)] = &[ 52 55 (2, " 53 56 CREATE TABLE events_new ( ··· 73 76 ALTER TABLE events_new RENAME TO events; 74 77 CREATE INDEX IF NOT EXISTS idx_events_type_start ON events(event_type, start_time); 75 78 "), 79 + (3, "CREATE INDEX IF NOT EXISTS idx_events_start_time ON events(start_time);"), 76 80 ]; 77 81 78 82 #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] ··· 202 206 since: Option<i64>, 203 207 bucket: BucketSize, 204 208 drv: Option<String>, 209 + #[serde(default)] 210 + raw: bool, 205 211 }, 206 212 Clean { 207 213 // Unix timestamp; None means delete everything. ··· 285 291 } 286 292 287 293 conn.execute_batch(" 288 - PRAGMA journal_mode = WAL; 289 - PRAGMA synchronous = NORMAL; 290 - PRAGMA temp_store = MEMORY; 291 - PRAGMA mmap_size = 134217728; 292 - PRAGMA cache_size = -8000; 293 - PRAGMA wal_autocheckpoint = 0; 294 + PRAGMA journal_mode = WAL; 295 + PRAGMA synchronous = NORMAL; 296 + PRAGMA temp_store = MEMORY; 297 + PRAGMA mmap_size = 134217728; 298 + PRAGMA cache_size = -8000; 299 + PRAGMA wal_autocheckpoint = 1000; 294 300 ").context("Failed to configure database")?; 295 301 296 302 Ok(conn) ··· 301 307 let cutoff = Utc::now().timestamp() - retain_days as i64 * 86400; 302 308 let deleted = conn.execute("DELETE FROM events WHERE start_time < ?1", [cutoff]) 303 309 .context("Retention DELETE failed")?; 304 - conn.execute_batch("PRAGMA wal_checkpoint(PASSIVE)") 305 - .context("Retention checkpoint failed")?; 310 + // TRUNCATE resets and shrinks the WAL file; use RESTART as fallback if readers 311 + // are active (TRUNCATE fails when a reader holds the WAL open). 312 + if conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE)").is_err() { 313 + conn.execute_batch("PRAGMA wal_checkpoint(RESTART)") 314 + .context("Retention checkpoint failed")?; 315 + } 306 316 if deleted > 0 { 307 317 info!(deleted, retain_days, "Retention cleanup removed rows"); 308 318 } ··· 380 390 writer.write_all((serde_json::to_string(&stats)? + "\n").as_bytes()).await?; 381 391 break; 382 392 } 383 - Ok(SocketMessage::Command(ClientCommand::GetTrend { since, bucket, drv })) => { 393 + Ok(SocketMessage::Command(ClientCommand::GetTrend { since, bucket, drv, raw })) => { 384 394 let db = Arc::clone(&db); 385 - let trend = tokio::task::spawn_blocking(move || collect_trend(&db.reader, since, bucket, drv)) 395 + let trend = tokio::task::spawn_blocking(move || collect_trend(&db.reader, since, bucket, drv, raw)) 386 396 .await??; 387 397 writer.write_all((serde_json::to_string(&trend)? + "\n").as_bytes()).await?; 388 398 break;
+2
src/lib.rs
··· 1 + pub mod daemon; 2 + pub mod stats;
+8 -7
src/main.rs
··· 1 - mod daemon; 2 - mod stats; 3 - 4 1 use anyhow::{Context, Result}; 5 2 use chrono::Utc; 6 3 use clap::{Parser, Subcommand}; ··· 14 11 use tokio::net::UnixStream; 15 12 use tracing::{error, info}; 16 13 17 - use daemon::{open_db, run_daemon, DbConnections}; 18 - use stats::{ 14 + use nod::daemon::{open_db, run_daemon, DbConnections}; 15 + use nod::stats::{ 19 16 collect_trend, display_stats, display_trend, display_trend_test, output_csv_trend, 20 17 BucketSize, SortField, Stats, Trend, 21 18 }; ··· 276 273 assert!(limit > 0); 277 274 278 275 if let Some(bucket_size) = bucket { 276 + // raw=true only when Mann-Whitney test output is requested — sends all durations. 277 + let raw = matches!(output, OutputFormat::Test); 279 278 let cmd = serde_json::json!({ 280 279 "action": "get_trend", 281 280 "since": since, 282 281 "bucket": bucket_size, 283 282 "drv": drv, 283 + "raw": raw, 284 284 }); 285 285 stream.write_all((cmd.to_string() + "\n").as_bytes()).await?; 286 286 ··· 326 326 let conn = Mutex::new(conn); 327 327 328 328 if let Some(bucket_size) = bucket { 329 - let trend = collect_trend(&conn, since, bucket_size, drv)?; 329 + let raw = matches!(output, OutputFormat::Test); 330 + let trend = collect_trend(&conn, since, bucket_size, drv, raw)?; 330 331 display_trend_output(&trend, output); 331 332 } else { 332 - let s = stats::collect_stats(&conn, since, drv.as_deref(), sort, limit, group)?; 333 + let s = nod::stats::collect_stats(&conn, since, drv.as_deref(), sort, limit, group)?; 333 334 display_stats(s); 334 335 } 335 336
+159 -28
src/stats.rs
··· 227 227 if x >= 0.0 { result } else { -result } 228 228 } 229 229 230 - fn median_sorted(sorted: &[i64]) -> f64 { 230 + pub fn median_sorted(sorted: &[i64]) -> f64 { 231 231 assert!(!sorted.is_empty()); 232 232 let n = sorted.len(); 233 233 if n % 2 == 0 { ··· 237 237 } 238 238 } 239 239 240 - fn fmt_ms(ms: i64) -> String { 240 + pub fn fmt_ms(ms: i64) -> String { 241 241 if ms < 1000 { 242 242 format!("{}ms", ms) 243 243 } else if ms < 60_000 { ··· 261 261 } 262 262 263 263 impl BucketSize { 264 - fn strftime_fmt(&self) -> &'static str { 264 + pub fn strftime_fmt(&self) -> &'static str { 265 265 match self { 266 266 BucketSize::Hour => "%Y-%m-%dT%H", 267 267 BucketSize::Day => "%Y-%m-%d", ··· 280 280 } 281 281 } 282 282 283 - // Raw durations are carried per-bucket so adjacent buckets can be compared 284 - // without a second round-trip to the daemon. 285 283 #[derive(Debug, Serialize, Deserialize)] 286 284 pub struct TrendBucket { 287 285 pub bucket: String, 286 + // Precomputed aggregates — always populated; computed client-side in raw mode, 287 + // computed SQL-side in aggregate mode. 288 + #[serde(default)] 289 + pub build_count: i64, 290 + #[serde(default)] 291 + pub build_median_ms: i64, 292 + #[serde(default)] 293 + pub subst_count: i64, 294 + #[serde(default)] 295 + pub subst_median_ms: i64, 296 + pub download_bytes: i64, 297 + // Raw durations — only populated when raw=true (needed for Mann-Whitney). 298 + #[serde(default)] 288 299 pub build_durations: Vec<i64>, 300 + #[serde(default)] 289 301 pub subst_durations: Vec<i64>, 290 - pub download_bytes: i64, 291 302 } 292 303 293 304 #[derive(Debug, Serialize, Deserialize)] ··· 297 308 pub drv_filter: Option<String>, 298 309 } 299 310 311 + // raw=false: compute per-bucket median in SQL via window functions — no raw durations 312 + // sent over the socket. Use this for all display modes except Mann-Whitney. 313 + // raw=true: return all durations for Mann-Whitney (--output test). Memory-intensive 314 + // for large datasets; only use when genuinely needed. 300 315 pub fn collect_trend( 301 316 db: &Mutex<Connection>, 302 317 since: Option<i64>, 303 318 bucket: BucketSize, 304 319 drv: Option<String>, 320 + raw: bool, 305 321 ) -> Result<Trend> { 306 322 if let Some(ref d) = drv { 307 323 assert!(!d.is_empty(), "drv filter must not be empty"); 308 324 } 309 325 310 326 let conn = db.lock().unwrap(); 311 - 312 327 let drv_ref = drv.as_deref(); 313 328 let fmt = bucket.strftime_fmt(); 314 329 330 + let buckets = if raw { 331 + collect_trend_raw(&conn, since, drv_ref, fmt)? 332 + } else { 333 + collect_trend_aggregate(&conn, since, drv_ref, fmt)? 334 + }; 335 + 336 + for i in 1..buckets.len() { 337 + assert!(buckets[i].bucket > buckets[i - 1].bucket, "buckets must be strictly ascending"); 338 + } 339 + 340 + Ok(Trend { buckets, bucket_size: bucket, drv_filter: drv }) 341 + } 342 + 343 + // Fetches all raw durations. ORDER BY start_time (not strftime) so the index 344 + // (event_type, start_time) can be used for ordering — strftime is monotone in 345 + // start_time so bucket grouping is preserved. 346 + fn collect_trend_raw( 347 + conn: &Connection, 348 + since: Option<i64>, 349 + drv: Option<&str>, 350 + fmt: &str, 351 + ) -> Result<Vec<TrendBucket>> { 315 352 // FileTransfer (101) has NULL drv_path and is intentionally excluded by the drv filter. 316 353 let mut stmt = conn.prepare( 317 354 "SELECT strftime(?3, start_time, 'unixepoch'), event_type, duration_ms, total_bytes ··· 319 356 WHERE event_type IN (101, 105, 108) 320 357 AND (?1 IS NULL OR start_time >= ?1) 321 358 AND (?2 IS NULL OR drv_path LIKE '%' || ?2 || '%') 322 - ORDER BY strftime(?3, start_time, 'unixepoch') ASC, start_time ASC", 323 - ).context("Failed to prepare trend query")?; 359 + ORDER BY start_time ASC", 360 + ).context("Failed to prepare raw trend query")?; 324 361 325 362 let mut buckets: Vec<TrendBucket> = vec![]; 326 - for row in stmt.query_map(rusqlite::params![since, drv_ref, fmt], |r| { 363 + for row in stmt.query_map(rusqlite::params![since, drv, fmt], |r| { 327 364 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)?, r.get::<_, i64>(2)?, r.get::<_, i64>(3)?)) 328 365 })?.filter_map(|r| r.ok()) { 329 366 let (b, etype, dur, bytes) = row; 330 367 if buckets.last().map(|x: &TrendBucket| x.bucket.as_str()) != Some(&b) { 331 - buckets.push(TrendBucket { bucket: b, build_durations: vec![], subst_durations: vec![], download_bytes: 0 }); 368 + buckets.push(TrendBucket { 369 + bucket: b, build_count: 0, build_median_ms: 0, 370 + subst_count: 0, subst_median_ms: 0, download_bytes: 0, 371 + build_durations: vec![], subst_durations: vec![], 372 + }); 332 373 } 333 374 let last = buckets.last_mut().unwrap(); 334 375 match etype { ··· 339 380 } 340 381 } 341 382 342 - for i in 1..buckets.len() { 343 - assert!(buckets[i].bucket > buckets[i - 1].bucket, "buckets must be strictly ascending"); 383 + // Compute aggregates from raw durations so display functions can use them uniformly. 384 + for b in &mut buckets { 385 + if !b.build_durations.is_empty() { 386 + let mut s = b.build_durations.clone(); s.sort_unstable(); 387 + b.build_count = s.len() as i64; 388 + b.build_median_ms = median_sorted(&s) as i64; 389 + } 390 + if !b.subst_durations.is_empty() { 391 + let mut s = b.subst_durations.clone(); s.sort_unstable(); 392 + b.subst_count = s.len() as i64; 393 + b.subst_median_ms = median_sorted(&s) as i64; 394 + } 395 + } 396 + 397 + Ok(buckets) 398 + } 399 + 400 + // Computes per-bucket median entirely in SQL using window functions. Returns no 401 + // raw durations — only counts and medians. Memory usage is proportional to the 402 + // number of distinct buckets, not the number of rows. 403 + fn collect_trend_aggregate( 404 + conn: &Connection, 405 + since: Option<i64>, 406 + drv: Option<&str>, 407 + fmt: &str, 408 + ) -> Result<Vec<TrendBucket>> { 409 + // Window function median: ROW_NUMBER orders rows by duration within each 410 + // (bucket, event_type) partition; we take the one or two middle rows and AVG them. 411 + // Integer division: odd n → single middle row; even n → average of two middle rows. 412 + let mut stmt = conn.prepare( 413 + "SELECT bucket, event_type, 414 + CAST(AVG(duration_ms) AS INTEGER) AS median_ms, 415 + MAX(n) AS cnt 416 + FROM ( 417 + SELECT strftime(?3, start_time, 'unixepoch') AS bucket, 418 + event_type, duration_ms, 419 + ROW_NUMBER() OVER ( 420 + PARTITION BY strftime(?3, start_time, 'unixepoch'), event_type 421 + ORDER BY duration_ms 422 + ) AS rn, 423 + COUNT(*) OVER ( 424 + PARTITION BY strftime(?3, start_time, 'unixepoch'), event_type 425 + ) AS n 426 + FROM events 427 + WHERE event_type IN (105, 108) 428 + AND (?1 IS NULL OR start_time >= ?1) 429 + AND (?2 IS NULL OR drv_path LIKE '%' || ?2 || '%') 430 + ) 431 + WHERE rn IN ((n + 1) / 2, (n + 2) / 2) 432 + GROUP BY bucket, event_type 433 + ORDER BY bucket ASC", 434 + ).context("Failed to prepare aggregate trend query")?; 435 + 436 + let mut buckets: Vec<TrendBucket> = vec![]; 437 + for row in stmt.query_map(rusqlite::params![since, drv, fmt], |r| { 438 + Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)?, r.get::<_, i64>(2)?, r.get::<_, i64>(3)?)) 439 + })?.filter_map(|r| r.ok()) { 440 + let (b, etype, median_ms, cnt) = row; 441 + assert!(median_ms >= 0); 442 + assert!(cnt > 0); 443 + if buckets.last().map(|x: &TrendBucket| x.bucket.as_str()) != Some(&b) { 444 + buckets.push(TrendBucket { 445 + bucket: b, build_count: 0, build_median_ms: 0, 446 + subst_count: 0, subst_median_ms: 0, download_bytes: 0, 447 + build_durations: vec![], subst_durations: vec![], 448 + }); 449 + } 450 + let last = buckets.last_mut().unwrap(); 451 + match etype { 452 + 105 => { last.build_median_ms = median_ms; last.build_count = cnt; } 453 + 108 => { last.subst_median_ms = median_ms; last.subst_count = cnt; } 454 + _ => {} 455 + } 456 + } 457 + 458 + // Separate query for download bytes — FileTransfer has no meaningful duration median. 459 + let mut dl_stmt = conn.prepare( 460 + "SELECT strftime(?2, start_time, 'unixepoch') AS bucket, SUM(total_bytes) 461 + FROM events 462 + WHERE event_type = 101 463 + AND (?1 IS NULL OR start_time >= ?1) 464 + GROUP BY bucket 465 + ORDER BY bucket ASC", 466 + ).context("Failed to prepare download bytes query")?; 467 + 468 + for row in dl_stmt.query_map(rusqlite::params![since, fmt], |r| { 469 + Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)?)) 470 + })?.filter_map(|r| r.ok()) { 471 + let (b, bytes) = row; 472 + assert!(bytes >= 0); 473 + // Merge into existing bucket or create a download-only bucket. 474 + if let Some(existing) = buckets.iter_mut().find(|x| x.bucket == b) { 475 + existing.download_bytes = bytes; 476 + } else { 477 + buckets.push(TrendBucket { 478 + bucket: b, build_count: 0, build_median_ms: 0, 479 + subst_count: 0, subst_median_ms: 0, download_bytes: bytes, 480 + build_durations: vec![], subst_durations: vec![], 481 + }); 482 + } 344 483 } 345 484 346 - Ok(Trend { buckets, bucket_size: bucket, drv_filter: drv }) 485 + buckets.sort_unstable_by(|a, b| a.bucket.cmp(&b.bucket)); 486 + Ok(buckets) 347 487 } 348 488 349 489 pub fn display_trend(trend: &Trend) { ··· 365 505 } 366 506 367 507 for b in &trend.buckets { 368 - let mut bs = b.build_durations.clone(); bs.sort_unstable(); 369 - let mut ss = b.subst_durations.clone(); ss.sort_unstable(); 370 - let build_med = if bs.is_empty() { 0 } else { median_sorted(&bs) as i64 }; 371 - let subst_med = if ss.is_empty() { 0 } else { median_sorted(&ss) as i64 }; 372 - 373 508 print!("{:<bw$} {:>6} {:>10} {:>6} {:>10}", 374 - b.bucket, b.build_durations.len(), fmt_ms(build_med), 375 - b.subst_durations.len(), fmt_ms(subst_med)); 509 + b.bucket, b.build_count, fmt_ms(b.build_median_ms), 510 + b.subst_count, fmt_ms(b.subst_median_ms)); 376 511 if has_downloads { print!(" {:>8.1}", b.download_bytes as f64 / 1_048_576.0); } 377 512 println!(); 378 513 } ··· 438 573 pub fn output_csv_trend(trend: &Trend) { 439 574 println!("period,build_count,build_median_ms,subst_count,subst_median_ms,download_bytes"); 440 575 for b in &trend.buckets { 441 - let mut bs = b.build_durations.clone(); bs.sort_unstable(); 442 - let mut ss = b.subst_durations.clone(); ss.sort_unstable(); 443 - let build_med = if bs.is_empty() { 0 } else { median_sorted(&bs) as i64 }; 444 - let subst_med = if ss.is_empty() { 0 } else { median_sorted(&ss) as i64 }; 445 - assert!(build_med >= 0); 446 - assert!(subst_med >= 0); 447 - println!("{},{},{},{},{},{}", b.bucket, b.build_durations.len(), build_med, b.subst_durations.len(), subst_med, b.download_bytes); 576 + assert!(b.build_median_ms >= 0); 577 + assert!(b.subst_median_ms >= 0); 578 + println!("{},{},{},{},{},{}", b.bucket, b.build_count, b.build_median_ms, b.subst_count, b.subst_median_ms, b.download_bytes); 448 579 } 449 580 } 450 581