Nix Observability Daemon
observability
nix
1use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
2use nod::stats::{collect_stats, collect_trend, BucketSize, SortField, TodaySummary};
3use rusqlite::Connection;
4use std::sync::Mutex;
5
6const BASE_TIME: i64 = 1_700_000_000; // 2023-11-14
7const YEAR_SPAN: i64 = 365 * 86400;
8
9// Seed n rows spread evenly across one year.
10// Row mix: 60% builds (105), 30% substitutions (108), 10% downloads (101).
11fn seed(conn: &Connection, n: usize) {
12 conn.execute_batch("
13 CREATE TABLE events (
14 id INTEGER PRIMARY KEY AUTOINCREMENT,
15 nix_id INTEGER, parent_id INTEGER, event_type INTEGER,
16 text TEXT, drv_path TEXT, cache_url TEXT,
17 start_time INTEGER, end_time INTEGER,
18 duration_ms INTEGER, total_bytes INTEGER
19 );
20 CREATE INDEX idx_events_type_start ON events(event_type, start_time, duration_ms, total_bytes);
21 CREATE INDEX idx_events_start_cover ON events(start_time, event_type, duration_ms, total_bytes);
22 CREATE TABLE daily_stats (
23 day INTEGER NOT NULL, event_type INTEGER NOT NULL,
24 count INTEGER NOT NULL DEFAULT 0, total_ms INTEGER NOT NULL DEFAULT 0,
25 total_bytes INTEGER NOT NULL DEFAULT 0,
26 PRIMARY KEY (day, event_type)
27 );
28 CREATE TABLE daily_cache_stats (
29 day INTEGER NOT NULL, cache_url TEXT NOT NULL,
30 count INTEGER NOT NULL DEFAULT 0, total_ms INTEGER NOT NULL DEFAULT 0,
31 PRIMARY KEY (day, cache_url)
32 );
33 PRAGMA journal_mode = WAL;
34 PRAGMA synchronous = NORMAL;
35 ").unwrap();
36
37 conn.execute_batch("BEGIN").unwrap();
38 let mut stmt = conn.prepare(
39 "INSERT INTO events (event_type, drv_path, cache_url, start_time, end_time, duration_ms, total_bytes)
40 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
41 ).unwrap();
42
43 for i in 0..n {
44 let event_type: i64 = if i % 10 < 6 { 105 } else if i % 10 < 9 { 108 } else { 101 };
45 let start = BASE_TIME + (i as i64 * YEAR_SPAN / n as i64);
46 let duration_ms: i64 = 1 + (i as i64).wrapping_mul(6364136223846793005).abs() % 600_000;
47 let total_bytes: i64 = if event_type == 101 { (i as i64).wrapping_mul(104729).abs() % 500_000_000 } else { 0 };
48 let drv_path: Option<String> = if event_type != 101 {
49 Some(format!("/nix/store/{:032x}-pkg-{}.drv", i as u128, i % 200))
50 } else {
51 None
52 };
53 let cache_url: Option<&str> = if event_type == 108 { Some("https://cache.nixos.org") } else { None };
54
55 stmt.execute(rusqlite::params![
56 event_type, drv_path, cache_url,
57 start, start + duration_ms / 1000,
58 duration_ms, total_bytes,
59 ]).unwrap();
60 }
61
62 conn.execute_batch("COMMIT").unwrap();
63
64 // Backfill daily_stats from seeded events (mirrors daemon migration v5).
65 conn.execute_batch("
66 INSERT INTO daily_stats (day, event_type, count, total_ms, total_bytes)
67 SELECT start_time / 86400, event_type,
68 COUNT(*), COALESCE(SUM(duration_ms), 0), COALESCE(SUM(total_bytes), 0)
69 FROM events WHERE event_type IN (101, 105, 108)
70 GROUP BY 1, 2
71 ON CONFLICT (day, event_type) DO UPDATE SET
72 count = count + excluded.count,
73 total_ms = total_ms + excluded.total_ms,
74 total_bytes = total_bytes + excluded.total_bytes;
75 INSERT INTO daily_cache_stats (day, cache_url, count, total_ms)
76 SELECT start_time / 86400, cache_url, COUNT(*), COALESCE(SUM(duration_ms), 0)
77 FROM events WHERE event_type = 108 AND cache_url IS NOT NULL
78 GROUP BY 1, 2
79 ON CONFLICT (day, cache_url) DO UPDATE SET
80 count = count + excluded.count,
81 total_ms = total_ms + excluded.total_ms;
82 ").unwrap();
83}
84
85fn bench_collect_stats(c: &mut Criterion) {
86 let mut group = c.benchmark_group("collect_stats");
87
88 // All bench data is from 2023 (BASE_TIME). Using today's real Unix day means all
89 // seeded rows are closed historical days in daily_stats; today snapshot is empty.
90 // This matches normal daemon usage: summary from daily_stats, slowest_builds from events.
91 let today_day = std::time::SystemTime::now()
92 .duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() as i64 / 86400;
93 let today_snap = TodaySummary { day: today_day, ..Default::default() };
94
95 for n in [10_000usize, 100_000, 1_000_000] {
96 let conn = Connection::open_in_memory().unwrap();
97 seed(&conn, n);
98 let db = Mutex::new(conn);
99
100 let t = today_snap.clone();
101 group.bench_with_input(BenchmarkId::new("all_time", n), &n, |b, _| {
102 b.iter(|| collect_stats(&db, None, None, SortField::Duration, 10, false, Some(t.clone())).unwrap())
103 });
104
105 let since = Some(BASE_TIME + YEAR_SPAN * 9 / 10);
106 let t = today_snap.clone();
107 group.bench_with_input(BenchmarkId::new("recent_10pct", n), &n, |b, _| {
108 b.iter(|| collect_stats(&db, since, None, SortField::Duration, 10, false, Some(t.clone())).unwrap())
109 });
110
111 let t = today_snap.clone();
112 group.bench_with_input(BenchmarkId::new("grouped", n), &n, |b, _| {
113 b.iter(|| collect_stats(&db, None, None, SortField::Count, 10, true, Some(t.clone())).unwrap())
114 });
115 }
116
117 group.finish();
118}
119
120fn bench_collect_trend(c: &mut Criterion) {
121 let mut group = c.benchmark_group("collect_trend");
122
123 let today_day = std::time::SystemTime::now()
124 .duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() as i64 / 86400;
125 let today_snap = TodaySummary { day: today_day, ..Default::default() };
126
127 for n in [10_000usize, 100_000, 1_000_000] {
128 let conn = Connection::open_in_memory().unwrap();
129 seed(&conn, n);
130 let db = Mutex::new(conn);
131
132 // full=false: daily_stats path (table/csv output).
133 let t = today_snap.clone();
134 group.bench_with_input(BenchmarkId::new("agg/all_time/month", n), &n, |b, _| {
135 b.iter(|| collect_trend(&db, None, BucketSize::Month, None, Some(t.clone()), false).unwrap())
136 });
137
138 let t = today_snap.clone();
139 group.bench_with_input(BenchmarkId::new("agg/all_time/day", n), &n, |b, _| {
140 b.iter(|| collect_trend(&db, None, BucketSize::Day, None, Some(t.clone()), false).unwrap())
141 });
142
143 let since = Some(BASE_TIME + YEAR_SPAN * 9 / 10);
144 let t = today_snap.clone();
145 group.bench_with_input(BenchmarkId::new("agg/recent_10pct/day", n), &n, |b, _| {
146 b.iter(|| collect_trend(&db, since, BucketSize::Day, None, Some(t.clone()), false).unwrap())
147 });
148
149 // full=true: events scan (--output test).
150 group.bench_with_input(BenchmarkId::new("full/all_time/month", n), &n, |b, _| {
151 b.iter(|| collect_trend(&db, None, BucketSize::Month, None, None, true).unwrap())
152 });
153
154 let since = Some(BASE_TIME + YEAR_SPAN * 9 / 10);
155 group.bench_with_input(BenchmarkId::new("full/recent_10pct/day", n), &n, |b, _| {
156 b.iter(|| collect_trend(&db, since, BucketSize::Day, None, None, true).unwrap())
157 });
158 }
159
160 group.finish();
161}
162
163criterion_group!(benches, bench_collect_stats, bench_collect_trend);
164criterion_main!(benches);