very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust
fjall
at-protocol
atproto
indexer
1use fjall::OwnedWriteBatch;
2use miette::{Context, IntoDiagnostic, Result};
3
4use crate::db::Db;
5use crate::db::keys::VERSIONING_KEY;
6
7mod v1;
8mod v2;
9mod v3;
10mod v4;
11mod v5;
12
13type MigrationFn = fn(&Db, &mut OwnedWriteBatch) -> Result<()>;
14
15/// ordered list of migrations. migration at index `i` upgrades the schema from version `i` to `i+1`.
16const MIGRATIONS: &[(&str, MigrationFn)] = &[
17 ("stable_firehose_cursors", v1::stable_firehose_cursors),
18 ("repo_state_root_commit", v2::repo_state_root_commit),
19 ("firehose_source_is_pds", v3::firehose_source_is_pds),
20 ("repo_state_active", v4::repo_state_active),
21 ("pds_meta_layout", v5::pds_meta_layout),
22];
23
24fn read_version(db: &Db) -> Result<u64> {
25 db.counts
26 .get(VERSIONING_KEY)
27 .into_diagnostic()?
28 .map(|r| {
29 r.as_ref()
30 .try_into()
31 .into_diagnostic()
32 .wrap_err("db version key expected to be 8 bytes")
33 .map(u64::from_be_bytes)
34 })
35 .transpose()
36 .map(|v| v.unwrap_or(0))
37}
38
39/// run all pending database migrations in order.
40///
41/// each migration and its version bump are committed atomically. safe to run on a fresh
42/// database (all migrations are no-ops when no relevant data exists). called during [`Db::open`].
43pub(super) fn run(db: &Db) -> Result<()> {
44 let version = read_version(db)? as usize;
45 for (i, (name, migration)) in MIGRATIONS.iter().enumerate().skip(version) {
46 tracing::info!("db: running migration {name} (v{i} -> v{})", i + 1);
47 let mut batch = db.inner.batch();
48 migration(db, &mut batch)?;
49 let new_version = (i + 1) as u64;
50 batch.insert(&db.counts, VERSIONING_KEY, new_version.to_be_bytes());
51 batch.commit().into_diagnostic()?;
52 tracing::info!("db: migration {name} complete");
53 }
54 Ok(())
55}