very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
60
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 55 lines 1.9 kB view raw
1use fjall::OwnedWriteBatch; 2use miette::{Context, IntoDiagnostic, Result}; 3 4use crate::db::Db; 5use crate::db::keys::VERSIONING_KEY; 6 7mod v1; 8mod v2; 9mod v3; 10mod v4; 11mod v5; 12 13type MigrationFn = fn(&Db, &mut OwnedWriteBatch) -> Result<()>; 14 15/// ordered list of migrations. migration at index `i` upgrades the schema from version `i` to `i+1`. 16const MIGRATIONS: &[(&str, MigrationFn)] = &[ 17 ("stable_firehose_cursors", v1::stable_firehose_cursors), 18 ("repo_state_root_commit", v2::repo_state_root_commit), 19 ("firehose_source_is_pds", v3::firehose_source_is_pds), 20 ("repo_state_active", v4::repo_state_active), 21 ("pds_meta_layout", v5::pds_meta_layout), 22]; 23 24fn read_version(db: &Db) -> Result<u64> { 25 db.counts 26 .get(VERSIONING_KEY) 27 .into_diagnostic()? 28 .map(|r| { 29 r.as_ref() 30 .try_into() 31 .into_diagnostic() 32 .wrap_err("db version key expected to be 8 bytes") 33 .map(u64::from_be_bytes) 34 }) 35 .transpose() 36 .map(|v| v.unwrap_or(0)) 37} 38 39/// run all pending database migrations in order. 40/// 41/// each migration and its version bump are committed atomically. safe to run on a fresh 42/// database (all migrations are no-ops when no relevant data exists). called during [`Db::open`]. 43pub(super) fn run(db: &Db) -> Result<()> { 44 let version = read_version(db)? as usize; 45 for (i, (name, migration)) in MIGRATIONS.iter().enumerate().skip(version) { 46 tracing::info!("db: running migration {name} (v{i} -> v{})", i + 1); 47 let mut batch = db.inner.batch(); 48 migration(db, &mut batch)?; 49 let new_version = (i + 1) as u64; 50 batch.insert(&db.counts, VERSIONING_KEY, new_version.to_be_bytes()); 51 batch.commit().into_diagnostic()?; 52 tracing::info!("db: migration {name} complete"); 53 } 54 Ok(()) 55}