···11+//! Persistent system statistics and cardinality sketches.
22+//!
33+//! Values are held in memory as atomics/mutexes, loaded from storage on startup,
44+//! and persisted periodically (every 60 s) and on clean shutdown. Prometheus
55+//! metrics reset on restart; these stats give a long-running overview across
66+//! restarts without precision requirements.
77+//!
88+//! Keys use the `met` prefix in the default keyspace.
99+1010+use std::collections::hash_map::DefaultHasher;
1111+use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
1212+use std::sync::{Arc, Mutex};
1313+1414+use cardinality_estimator_safe::{Element, Sketch};
1515+use tracing::error;
1616+1717+use super::{DbRef, PREFIX_META, error::StorageResult};
1818+1919+type Sk = Sketch<12, 6>;
2020+2121+pub struct Stats {
2222+ // ── Counters ─────────────────────────────────────────────────────────────
2323+ /// Unix timestamp of the very first startup; set once, never updated.
2424+ pub first_startup_secs: AtomicU64,
2525+ pub startup_count: AtomicU64,
2626+ /// New repos ever enqueued for resync.
2727+ pub repos_queued_total: AtomicU64,
2828+ pub collection_births_total: AtomicU64,
2929+ pub collection_deaths_total: AtomicU64,
3030+ /// Successful full resyncs completed.
3131+ pub resyncs_completed_total: AtomicU64,
3232+ /// Approximate queue depth; may drift slightly after a crash.
3333+ pub resync_queue_depth: AtomicI64,
3434+ /// Approximate count of buffered in-flight events.
3535+ pub resync_buffer_count: AtomicI64,
3636+3737+ // ── Cardinality sketches (~3 KiB each at full HLL) ────────────────────────
3838+ /// Distinct NSID strings ever indexed.
3939+ /// BOOOOoooooo mutexes (they should be ok, we only grab them briefly for writes and serializes)
4040+ pub sketch_collections: Mutex<Sk>,
4141+ /// All DIDs ever seen.
4242+ pub sketch_accounts_all: Mutex<Sk>,
4343+ /// DIDs with ≥1 successful resync.
4444+ pub sketch_accounts_resynced: Mutex<Sk>,
4545+ /// DIDs indexed in sync1.1 strict mode.
4646+ pub sketch_accounts_commit_strict: Mutex<Sk>,
4747+ /// DIDs indexed in sync1.1 lenient mode.
4848+ pub sketch_accounts_commit_lenient: Mutex<Sk>,
4949+ /// DIDs desynchronized by commit failure.
5050+ pub sketch_accounts_desynced: Mutex<Sk>,
5151+ /// Distinct PDS hostnames seen.
5252+ pub sketch_pds_hosts: Mutex<Sk>,
5353+}
5454+5555+impl Default for Stats {
5656+ fn default() -> Self {
5757+ Stats {
5858+ first_startup_secs: AtomicU64::new(0),
5959+ startup_count: AtomicU64::new(0),
6060+ repos_queued_total: AtomicU64::new(0),
6161+ collection_births_total: AtomicU64::new(0),
6262+ collection_deaths_total: AtomicU64::new(0),
6363+ resyncs_completed_total: AtomicU64::new(0),
6464+ resync_queue_depth: AtomicI64::new(0),
6565+ resync_buffer_count: AtomicI64::new(0),
6666+ sketch_collections: Mutex::new(Sk::default()),
6767+ sketch_accounts_all: Mutex::new(Sk::default()),
6868+ sketch_accounts_resynced: Mutex::new(Sk::default()),
6969+ sketch_accounts_commit_strict: Mutex::new(Sk::default()),
7070+ sketch_accounts_commit_lenient: Mutex::new(Sk::default()),
7171+ sketch_accounts_desynced: Mutex::new(Sk::default()),
7272+ sketch_pds_hosts: Mutex::new(Sk::default()),
7373+ }
7474+ }
7575+}
7676+7777+pub type StatsRef = Arc<Stats>;
7878+7979+/// Insert a string into a sketch.
8080+pub fn insert_str(sketch: &Mutex<Sk>, s: &str) {
8181+ sketch
8282+ .lock()
8383+ .unwrap()
8484+ .insert(Element::from_hasher_default::<DefaultHasher>(s));
8585+}
8686+8787+// ---------------------------------------------------------------------------
8888+// Key constants
8989+// ---------------------------------------------------------------------------
9090+9191+const K_FIRST_STARTUP: &[u8] = b"first_startup";
9292+const K_STARTUP_COUNT: &[u8] = b"startup_count";
9393+const K_REPOS_QUEUED: &[u8] = b"repos_queued_total";
9494+const K_BIRTHS: &[u8] = b"births_total";
9595+const K_DEATHS: &[u8] = b"deaths_total";
9696+const K_RESYNCS_DONE: &[u8] = b"resyncs_completed_total";
9797+const K_QUEUE_DEPTH: &[u8] = b"queue_depth";
9898+const K_BUFFER_COUNT: &[u8] = b"buffer_count";
9999+const K_SK_COLLECTIONS: &[u8] = b"sketch_collections";
100100+const K_SK_ACCOUNTS_ALL: &[u8] = b"sketch_accounts_all";
101101+const K_SK_ACCOUNTS_RSYNCD: &[u8] = b"sketch_accounts_resynced";
102102+const K_SK_ACCOUNTS_S11: &[u8] = b"sketch_accounts_commit_strict";
103103+const K_SK_ACCOUNTS_LNT: &[u8] = b"sketch_accounts_commit_lenient";
104104+const K_SK_ACCOUNTS_DSY: &[u8] = b"sketch_accounts_desynced";
105105+const K_SK_PDS_HOSTS: &[u8] = b"sketch_pds_hosts";
106106+107107+// ---------------------------------------------------------------------------
108108+// Storage helpers
109109+// ---------------------------------------------------------------------------
110110+111111+fn full_key(suffix: &[u8]) -> Vec<u8> {
112112+ let mut k = Vec::with_capacity(PREFIX_META.len() + suffix.len());
113113+ k.extend_from_slice(&PREFIX_META);
114114+ k.extend_from_slice(suffix);
115115+ k
116116+}
117117+118118+fn read_u64(ks: &fjall::Keyspace, suffix: &[u8]) -> StorageResult<u64> {
119119+ match ks.get(full_key(suffix))? {
120120+ None => Ok(0),
121121+ Some(v) if v.len() >= 8 => Ok(u64::from_be_bytes(v[..8].try_into().unwrap())),
122122+ Some(_) => Ok(0),
123123+ }
124124+}
125125+126126+fn write_u64(ks: &fjall::Keyspace, suffix: &[u8], val: u64) -> StorageResult<()> {
127127+ ks.insert(full_key(suffix), val.to_be_bytes())?;
128128+ Ok(())
129129+}
130130+131131+fn read_i64(ks: &fjall::Keyspace, suffix: &[u8]) -> StorageResult<i64> {
132132+ Ok(read_u64(ks, suffix)? as i64)
133133+}
134134+135135+fn write_i64(ks: &fjall::Keyspace, suffix: &[u8], val: i64) -> StorageResult<()> {
136136+ write_u64(ks, suffix, val as u64)
137137+}
138138+139139+fn read_sketch(ks: &fjall::Keyspace, suffix: &[u8]) -> Sk {
140140+ let Some(bytes) = ks.get(full_key(suffix)).ok().flatten() else {
141141+ return Sk::default();
142142+ };
143143+ match postcard::from_bytes(bytes.as_ref()) {
144144+ Ok(sk) => sk,
145145+ Err(e) => {
146146+ error!(key = ?suffix, error = %e, "failed to deserialize cardinality sketch; using fresh sketch");
147147+ Sk::default()
148148+ }
149149+ }
150150+}
151151+152152+fn write_sketch(ks: &fjall::Keyspace, suffix: &[u8], sketch: &Mutex<Sk>) -> StorageResult<()> {
153153+ let bytes = postcard::to_stdvec(&*sketch.lock().unwrap()).unwrap_or_default();
154154+ ks.insert(full_key(suffix), bytes)?;
155155+ Ok(())
156156+}
157157+158158+// ---------------------------------------------------------------------------
159159+// Public API
160160+// ---------------------------------------------------------------------------
161161+162162+/// Load stats from storage on startup.
163163+///
164164+/// Missing keys default to zero / empty sketch. After loading:
165165+/// - If `first_startup_secs` is 0, writes the current time immediately.
166166+/// - Increments `startup_count` and writes it immediately.
167167+pub(super) fn load(ks: &fjall::Keyspace) -> StorageResult<StatsRef> {
168168+ let stats = Stats {
169169+ first_startup_secs: AtomicU64::new(read_u64(ks, K_FIRST_STARTUP)?),
170170+ startup_count: AtomicU64::new(read_u64(ks, K_STARTUP_COUNT)?),
171171+ repos_queued_total: AtomicU64::new(read_u64(ks, K_REPOS_QUEUED)?),
172172+ collection_births_total: AtomicU64::new(read_u64(ks, K_BIRTHS)?),
173173+ collection_deaths_total: AtomicU64::new(read_u64(ks, K_DEATHS)?),
174174+ resyncs_completed_total: AtomicU64::new(read_u64(ks, K_RESYNCS_DONE)?),
175175+ resync_queue_depth: AtomicI64::new(read_i64(ks, K_QUEUE_DEPTH)?),
176176+ resync_buffer_count: AtomicI64::new(read_i64(ks, K_BUFFER_COUNT)?),
177177+ sketch_collections: Mutex::new(read_sketch(ks, K_SK_COLLECTIONS)),
178178+ sketch_accounts_all: Mutex::new(read_sketch(ks, K_SK_ACCOUNTS_ALL)),
179179+ sketch_accounts_resynced: Mutex::new(read_sketch(ks, K_SK_ACCOUNTS_RSYNCD)),
180180+ sketch_accounts_commit_strict: Mutex::new(read_sketch(ks, K_SK_ACCOUNTS_S11)),
181181+ sketch_accounts_commit_lenient: Mutex::new(read_sketch(ks, K_SK_ACCOUNTS_LNT)),
182182+ sketch_accounts_desynced: Mutex::new(read_sketch(ks, K_SK_ACCOUNTS_DSY)),
183183+ sketch_pds_hosts: Mutex::new(read_sketch(ks, K_SK_PDS_HOSTS)),
184184+ };
185185+186186+ // Set first_startup_secs if this is the very first startup.
187187+ if stats.first_startup_secs.load(Ordering::Relaxed) == 0 {
188188+ let now = crate::util::unix_now();
189189+ stats.first_startup_secs.store(now, Ordering::Relaxed);
190190+ write_u64(ks, K_FIRST_STARTUP, now)?;
191191+ }
192192+193193+ // Bump startup_count.
194194+ let new_count = stats.startup_count.fetch_add(1, Ordering::Relaxed) + 1;
195195+ write_u64(ks, K_STARTUP_COUNT, new_count)?;
196196+197197+ Ok(Arc::new(stats))
198198+}
199199+200200+/// Persist all stats to storage.
201201+pub fn save(db: &DbRef) -> StorageResult<()> {
202202+ let s = &db.stats;
203203+ let ks = &db.ks;
204204+205205+ write_u64(
206206+ ks,
207207+ K_FIRST_STARTUP,
208208+ s.first_startup_secs.load(Ordering::Relaxed),
209209+ )?;
210210+ write_u64(ks, K_STARTUP_COUNT, s.startup_count.load(Ordering::Relaxed))?;
211211+ write_u64(
212212+ ks,
213213+ K_REPOS_QUEUED,
214214+ s.repos_queued_total.load(Ordering::Relaxed),
215215+ )?;
216216+ write_u64(
217217+ ks,
218218+ K_BIRTHS,
219219+ s.collection_births_total.load(Ordering::Relaxed),
220220+ )?;
221221+ write_u64(
222222+ ks,
223223+ K_DEATHS,
224224+ s.collection_deaths_total.load(Ordering::Relaxed),
225225+ )?;
226226+ write_u64(
227227+ ks,
228228+ K_RESYNCS_DONE,
229229+ s.resyncs_completed_total.load(Ordering::Relaxed),
230230+ )?;
231231+ write_i64(
232232+ ks,
233233+ K_QUEUE_DEPTH,
234234+ s.resync_queue_depth.load(Ordering::Relaxed),
235235+ )?;
236236+ write_i64(
237237+ ks,
238238+ K_BUFFER_COUNT,
239239+ s.resync_buffer_count.load(Ordering::Relaxed),
240240+ )?;
241241+242242+ write_sketch(ks, K_SK_COLLECTIONS, &s.sketch_collections)?;
243243+ write_sketch(ks, K_SK_ACCOUNTS_ALL, &s.sketch_accounts_all)?;
244244+ write_sketch(ks, K_SK_ACCOUNTS_RSYNCD, &s.sketch_accounts_resynced)?;
245245+ write_sketch(ks, K_SK_ACCOUNTS_S11, &s.sketch_accounts_commit_strict)?;
246246+ write_sketch(ks, K_SK_ACCOUNTS_LNT, &s.sketch_accounts_commit_lenient)?;
247247+ write_sketch(ks, K_SK_ACCOUNTS_DSY, &s.sketch_accounts_desynced)?;
248248+ write_sketch(ks, K_SK_PDS_HOSTS, &s.sketch_pds_hosts)?;
249249+250250+ Ok(())
251251+}
252252+253253+// ---------------------------------------------------------------------------
254254+// Tests
255255+// ---------------------------------------------------------------------------
256256+257257+#[cfg(test)]
258258+mod tests {
259259+ use super::*;
260260+ use crate::storage::open_temporary;
261261+262262+ #[test]
263263+ fn first_startup_set_on_open() {
264264+ // open_temporary already calls meta::load internally.
265265+ let db = open_temporary().unwrap();
266266+ assert!(db.stats.first_startup_secs.load(Ordering::Relaxed) > 0);
267267+ assert_eq!(db.stats.startup_count.load(Ordering::Relaxed), 1);
268268+ }
269269+270270+ #[test]
271271+ fn startup_count_increments_on_reload() {
272272+ let db = open_temporary().unwrap();
273273+ let first_ts = db.stats.first_startup_secs.load(Ordering::Relaxed);
274274+ assert_eq!(db.stats.startup_count.load(Ordering::Relaxed), 1);
275275+276276+ // Simulate a second startup: save current stats, then call load again.
277277+ save(&db).unwrap();
278278+ let s2 = load(&db.ks).unwrap();
279279+ assert_eq!(s2.startup_count.load(Ordering::Relaxed), 2);
280280+ // first_startup_secs must not change.
281281+ assert_eq!(s2.first_startup_secs.load(Ordering::Relaxed), first_ts);
282282+ }
283283+284284+ #[test]
285285+ fn save_and_load_counters_roundtrip() {
286286+ let db = open_temporary().unwrap();
287287+288288+ db.stats.repos_queued_total.store(42, Ordering::Relaxed);
289289+ db.stats.collection_births_total.store(7, Ordering::Relaxed);
290290+ db.stats.collection_deaths_total.store(3, Ordering::Relaxed);
291291+ db.stats
292292+ .resyncs_completed_total
293293+ .store(100, Ordering::Relaxed);
294294+ db.stats.resync_queue_depth.store(-5, Ordering::Relaxed);
295295+296296+ save(&db).unwrap();
297297+298298+ let s2 = load(&db.ks).unwrap();
299299+ assert_eq!(s2.repos_queued_total.load(Ordering::Relaxed), 42);
300300+ assert_eq!(s2.collection_births_total.load(Ordering::Relaxed), 7);
301301+ assert_eq!(s2.collection_deaths_total.load(Ordering::Relaxed), 3);
302302+ assert_eq!(s2.resyncs_completed_total.load(Ordering::Relaxed), 100);
303303+ assert_eq!(s2.resync_queue_depth.load(Ordering::Relaxed), -5);
304304+ }
305305+306306+ #[test]
307307+ fn sketch_roundtrips_through_save_load() {
308308+ let db = open_temporary().unwrap();
309309+310310+ insert_str(&db.stats.sketch_collections, "app.bsky.feed.post");
311311+ insert_str(&db.stats.sketch_collections, "app.bsky.actor.profile");
312312+ let est_before = db.stats.sketch_collections.lock().unwrap().estimate();
313313+314314+ save(&db).unwrap();
315315+316316+ let s2 = load(&db.ks).unwrap();
317317+ let est_after = s2.sketch_collections.lock().unwrap().estimate();
318318+ assert_eq!(est_before, est_after);
319319+ }
320320+}
+8
src/storage/mod.rs
···33pub mod error;
44pub mod firehose_cursor;
55pub mod list_hosts_cursor;
66+pub mod meta;
67pub mod pds_host;
78pub mod repo;
89pub mod resync_buffer;
910pub mod resync_queue;
10111112pub(crate) use error::{StorageError, StorageResult};
1313+pub(crate) use meta::StatsRef;
1214pub(crate) use repo::Account;
13151416// ---------------------------------------------------------------------------
···4244pub(super) const PREFIX_PDS_HOST: KeyPrefix = *b"pdh";
4345/// listHosts walk cursor (per upstream relay host). See [`list_hosts_cursor`].
4446pub(super) const PREFIX_LIST_HOSTS: KeyPrefix = *b"lhs";
4747+/// Persistent system stats and cardinality sketches. See [`meta`].
4848+pub(super) const PREFIX_META: KeyPrefix = *b"met";
45494650use std::path::Path;
4751use std::sync::Arc;
···6064 /// across sequential reads) and Lz4 compression at all levels (higher
6165 /// on-disk density means more data fits in the block cache).
6266 pub(crate) index_ks: fjall::Keyspace,
6767+ /// Persistent system stats and cardinality sketches, loaded on open.
6868+ pub(crate) stats: StatsRef,
6369}
64706571/// Cheaply-cloneable reference to the shared database.
···103109 fjall::CompressionType::Lz4,
104110 ))
105111 })?;
112112+ let stats = meta::load(&ks)?;
106113 Ok(Arc::new(Db {
107114 database,
108115 ks,
109116 index_ks,
117117+ stats,
110118 }))
111119}