···11use std::sync::Arc;
22use std::sync::atomic::Ordering;
3344-use miette::{IntoDiagnostic, Result};
44+use miette::{Context, IntoDiagnostic, Result};
55use tokio::sync::watch;
66use tracing::{error, info};
77use url::Url;
···98989999 /// reset the stored cursor for the given relay URL.
100100 ///
101101- /// clears the `firehose_cursor|{url}` entry from the cursors keyspace and zeroes the
102102- /// in-memory cursor. the next connection will tail live events from the current head.
101101+ /// clears the `firehose_cursor|{host}|{scheme}` entry from the cursors keyspace and zeroes
102102+ /// the in-memory cursor. the next connection will tail live events from the current head.
103103 pub async fn reset_cursor(&self, url: &str) -> Result<()> {
104104+ let relay_url = Url::parse(url)
105105+ .into_diagnostic()
106106+ .wrap_err_with(|| format!("invalid relay url: {url:?}"))?;
107107+ let key = keys::firehose_cursor_key_from_url(&relay_url);
104108 let db = self.state.db.clone();
105105- let key = keys::firehose_cursor_key(url);
106109 tokio::task::spawn_blocking(move || db.cursors.remove(key).into_diagnostic())
107110 .await
108111 .into_diagnostic()??;
109112110110- if let Ok(relay_url) = Url::parse(url) {
111111- self.state.relay_cursors.peek_with(&relay_url, |_, c| {
112112- c.store(0, Ordering::SeqCst);
113113- });
114114- }
113113+ self.state.relay_cursors.peek_with(&relay_url, |_, c| {
114114+ c.store(0, Ordering::SeqCst);
115115+ });
115116 Ok(())
116117 }
117118
+96-2
src/control/mod.rs
···88pub use filter::{FilterControl, FilterPatch, FilterSnapshot};
99pub use firehose::{FirehoseHandle, FirehoseSourceInfo};
1010pub use repos::{ListedRecord, Record, RecordList, RepoHandle, RepoInfo, ReposControl};
1111+use smol_str::{SmolStr, ToSmolStr};
11121213use std::collections::BTreeMap;
1314use std::future::Future;
···1718use std::task::{Context, Poll};
18191920use futures::{FutureExt, Stream};
2020-use miette::{IntoDiagnostic, Result};
2121+use miette::{IntoDiagnostic, Result, WrapErr};
2122use tokio::sync::{mpsc, watch};
2223use tracing::{debug, error, info};
23242425use crate::backfill::BackfillWorker;
2526use crate::config::{Config, SignatureVerification};
2627use crate::db::{
2727- self, filter as db_filter, load_persisted_crawler_sources, load_persisted_firehose_sources,
2828+ self, filter as db_filter, keys, load_persisted_crawler_sources,
2929+ load_persisted_firehose_sources,
2830};
2931use crate::filter::FilterMode;
3032use crate::ingest::worker::FirehoseWorker;
···3436use crawler::{CrawlerShared, spawn_crawler_producer};
3537use firehose::{FirehoseShared, spawn_firehose_ingestor};
3638use stream::event_stream_thread;
3939+4040+/// infromation about a host hydrant is consuming from.
4141+pub struct Host {
4242+ pub name: SmolStr,
4343+ pub seq: i64,
4444+}
37453846/// an event emitted by the hydrant event stream.
3947///
···655663 pub fn serve_debug(&self, port: u16) -> impl Future<Output = Result<()>> {
656664 let state = self.state.clone();
657665 async move { crate::api::serve_debug(state, port).await }
666666+ }
667667+668668+ /// get the status of a (firehose) host we are consuming from.
669669+ ///
670670+ /// returns the seq we are on for this host.
671671+ pub async fn get_host_status(&self, hostname: &str) -> Result<Option<Host>> {
672672+ let db = self.state.db.clone();
673673+ let hostname = hostname.to_smolstr();
674674+675675+ tokio::task::spawn_blocking(move || {
676676+ let key = keys::firehose_cursor_key(&hostname);
677677+ let Some(seq) = db.cursors.get(&key).into_diagnostic()? else {
678678+ return Ok(None);
679679+ };
680680+ let seq = i64::from_be_bytes(
681681+ seq.as_ref()
682682+ .try_into()
683683+ .into_diagnostic()
684684+ .wrap_err("cursor value is not 8 bytes")?,
685685+ );
686686+687687+ Ok(Some(Host {
688688+ name: hostname.into(),
689689+ seq,
690690+ }))
691691+ })
692692+ .await
693693+ .into_diagnostic()?
694694+ }
695695+696696+ /// enumerates all hosts hydrant is consuming from.
697697+ ///
698698+ /// returns hosts enumerated in this pagination and the cursor to paginate from.
699699+ pub async fn list_hosts(
700700+ &self,
701701+ cursor: Option<&str>,
702702+ limit: usize,
703703+ ) -> Result<(Vec<Host>, Option<SmolStr>)> {
704704+ let db = self.state.db.clone();
705705+ let cursor = cursor.map(str::to_string);
706706+707707+ tokio::task::spawn_blocking(move || {
708708+ let prefix_end = {
709709+ let mut end = keys::FIREHOSE_CURSOR_PREFIX.to_vec();
710710+ *end.last_mut().unwrap() += 1;
711711+ end
712712+ };
713713+ let start_bound = match cursor.as_deref() {
714714+ Some(host) => std::ops::Bound::Excluded(keys::firehose_cursor_key(host)),
715715+ None => std::ops::Bound::Included(keys::FIREHOSE_CURSOR_PREFIX.to_vec()),
716716+ };
717717+718718+ // fetch one extra item to detect whether there is a next page
719719+ let mut hosts: Vec<Host> = Vec::with_capacity(limit + 1);
720720+ for item in db
721721+ .cursors
722722+ .range((start_bound, std::ops::Bound::Excluded(prefix_end)))
723723+ .take(limit + 1)
724724+ {
725725+ let (k, v) = item.into_inner().into_diagnostic()?;
726726+ let hostname = std::str::from_utf8(&k[keys::FIREHOSE_CURSOR_PREFIX.len()..])
727727+ .into_diagnostic()
728728+ .wrap_err("firehose cursor key contains non-utf8 hostname")?;
729729+ let seq = i64::from_be_bytes(
730730+ v.as_ref()
731731+ .try_into()
732732+ .into_diagnostic()
733733+ .wrap_err("cursor value is not 8 bytes")?,
734734+ );
735735+ hosts.push(Host {
736736+ name: hostname.into(),
737737+ seq,
738738+ });
739739+ }
740740+741741+ let next_cursor = if hosts.len() > limit {
742742+ hosts.pop();
743743+ hosts.last().map(|h| h.name.clone())
744744+ } else {
745745+ None
746746+ };
747747+748748+ Ok((hosts, next_cursor))
749749+ })
750750+ .await
751751+ .into_diagnostic()?
658752 }
659753}
660754
···11+use fjall::OwnedWriteBatch;
22+use miette::{Context, IntoDiagnostic, Result};
33+44+use crate::db::Db;
55+use crate::db::keys::VERSIONING_KEY;
66+77+mod v1;
88+99+type MigrationFn = fn(&Db, &mut OwnedWriteBatch) -> Result<()>;
1010+1111+/// ordered list of migrations. migration at index `i` upgrades the schema from version `i` to `i+1`.
1212+const MIGRATIONS: &[(&str, MigrationFn)] =
1313+ &[("stable_firehose_cursors", v1::stable_firehose_cursors)];
1414+1515+fn read_version(db: &Db) -> Result<u64> {
1616+ db.counts
1717+ .get(VERSIONING_KEY)
1818+ .into_diagnostic()?
1919+ .map(|r| {
2020+ r.as_ref()
2121+ .try_into()
2222+ .into_diagnostic()
2323+ .wrap_err("db version key expected to be 8 bytes")
2424+ .map(u64::from_be_bytes)
2525+ })
2626+ .transpose()
2727+ .map(|v| v.unwrap_or(0))
2828+}
2929+3030+/// run all pending database migrations in order.
3131+///
3232+/// each migration and its version bump are committed atomically. safe to run on a fresh
3333+/// database (all migrations are no-ops when no relevant data exists). called during [`Db::open`].
3434+pub(super) fn run(db: &Db) -> Result<()> {
3535+ let version = read_version(db)? as usize;
3636+ for (i, (name, migration)) in MIGRATIONS.iter().enumerate().skip(version) {
3737+ tracing::info!("db: running migration {name} (v{i} -> v{})", i + 1);
3838+ let mut batch = db.inner.batch();
3939+ migration(db, &mut batch)?;
4040+ let new_version = (i + 1) as u64;
4141+ batch.insert(&db.counts, VERSIONING_KEY, new_version.to_be_bytes());
4242+ batch.commit().into_diagnostic()?;
4343+ tracing::info!("db: migration {name} complete");
4444+ }
4545+ Ok(())
4646+}
+40
src/db/migration/v1.rs
···11+use std::str::FromStr;
22+33+use fjall::OwnedWriteBatch;
44+use miette::{Context, IntoDiagnostic, Result};
55+use url::Url;
66+77+use crate::db::{Db, keys};
88+99+/// migrates firehose cursors from `firehose_cursor|{url}` to `firehose_cursor|{host}`.
1010+pub(super) fn stable_firehose_cursors(db: &Db, batch: &mut OwnedWriteBatch) -> Result<()> {
1111+ let entries: Vec<(Vec<u8>, Vec<u8>)> = db
1212+ .cursors
1313+ .prefix(keys::FIREHOSE_CURSOR_PREFIX)
1414+ .map(|item| {
1515+ let (k, v) = item.into_inner().into_diagnostic()?;
1616+ Ok((k.to_vec(), v.to_vec()))
1717+ })
1818+ .collect::<Result<_>>()?;
1919+2020+ for (old_key, value) in entries {
2121+ let suffix = &old_key[keys::FIREHOSE_CURSOR_PREFIX.len()..];
2222+ // old-format: suffix is a full URL containing "://" (e.g. "wss://bsky.network")
2323+ // new-format (v1): suffix is just a hostname, no "://"
2424+ if !suffix.windows(3).any(|w| w == b"://") {
2525+ continue; // already in new format
2626+ }
2727+ let url_str = std::str::from_utf8(suffix)
2828+ .into_diagnostic()
2929+ .wrap_err("firehose cursor key contains non-utf8 url")?;
3030+ let url = Url::from_str(url_str)
3131+ .into_diagnostic()
3232+ .wrap_err_with(|| format!("firehose cursor key contains invalid url {url_str:?}"))?;
3333+3434+ let new_key = keys::v1::firehose_cursor_key_from_url(&url);
3535+ batch.insert(&db.cursors, &new_key, &value);
3636+ batch.remove(&db.cursors, &old_key);
3737+ }
3838+3939+ Ok(())
4040+}
+36-52
src/db/mod.rs
···2424pub mod ephemeral;
2525pub mod filter;
2626pub mod keys;
2727+pub mod migration;
2728pub mod types;
28292930use tokio::sync::broadcast;
···376377 // when adding new keyspaces, make sure to add them to the /stats endpoint
377378 // and also update any relevant /debug/* endpoints
378379380380+ let (event_tx, _) = broadcast::channel(10000);
381381+382382+ let this = Self {
383383+ inner: db,
384384+ path: cfg.database_path.clone(),
385385+ repos,
386386+ records,
387387+ blocks,
388388+ cursors,
389389+ pending,
390390+ resync,
391391+ resync_buffer,
392392+ events,
393393+ counts,
394394+ filter,
395395+ crawler,
396396+ #[cfg(feature = "backlinks")]
397397+ backlinks,
398398+ event_tx,
399399+ counts_map: HashMap::new(),
400400+ next_event_id: Arc::new(AtomicU64::new(0)),
401401+ };
402402+403403+ migration::run(&this)?;
404404+379405 let mut last_id = 0;
380380- if let Some(guard) = events.iter().next_back() {
406406+ if let Some(guard) = this.events.iter().next_back() {
381407 let k = guard.key().into_diagnostic()?;
382408 last_id = u64::from_be_bytes(
383409 k.as_ref()
···386412 .wrap_err("expected to be id (8 bytes)")?,
387413 );
388414 }
415415+ // relaxed is fine since we are just initializing the db
416416+ this.next_event_id
417417+ .store(last_id + 1, std::sync::atomic::Ordering::Relaxed);
389418390419 // load counts into memory
391391- let counts_map = HashMap::new();
392392- for guard in counts.prefix(keys::COUNT_KS_PREFIX) {
420420+ for guard in this.counts.prefix(keys::COUNT_KS_PREFIX) {
393421 let (k, v) = guard.into_inner().into_diagnostic()?;
394422 let name = std::str::from_utf8(&k[keys::COUNT_KS_PREFIX.len()..])
395423 .into_diagnostic()
396424 .wrap_err("expected valid utf8 for ks count key")?;
397397- let _ = counts_map.insert_sync(
425425+ let _ = this.counts_map.insert_sync(
398426 SmolStr::new(name),
399427 u64::from_be_bytes(v.as_ref().try_into().unwrap()),
400428 );
401429 }
402402- // ensure critical counts are initialized
403403- for ks_name in ["repos", "pending", "resync"] {
404404- let _ = counts_map
405405- .entry_sync(SmolStr::new(ks_name))
406406- .or_insert_with(|| {
407407- let ks = match ks_name {
408408- "repos" => &repos,
409409- "pending" => &pending,
410410- "resync" => &resync,
411411- _ => unreachable!(),
412412- };
413413- ks.iter().count() as u64
414414- });
415415- }
416430417417- let (event_tx, _) = broadcast::channel(10000);
418418-419419- Ok(Self {
420420- inner: db,
421421- path: cfg.database_path.clone(),
422422- repos,
423423- records,
424424- blocks,
425425- cursors,
426426- pending,
427427- resync,
428428- resync_buffer,
429429- events,
430430- counts,
431431- filter,
432432- crawler,
433433- #[cfg(feature = "backlinks")]
434434- backlinks,
435435- event_tx,
436436- counts_map,
437437- next_event_id: Arc::new(AtomicU64::new(last_id + 1)),
438438- })
431431+ Ok(this)
439432 }
440433441434 pub fn train_dict(&self, ks_name: &str) -> Result<()> {
···751744pub fn set_firehose_cursor(db: &Db, relay: &Url, cursor: i64) -> Result<()> {
752745 db.cursors
753746 .insert(
754754- keys::firehose_cursor_key(relay.as_str()),
747747+ keys::firehose_cursor_key_from_url(relay),
755748 cursor.to_be_bytes(),
756749 )
757750 .into_diagnostic()
758751}
759752760753pub async fn get_firehose_cursor(db: &Db, relay: &Url) -> Result<Option<i64>> {
761761- let per_relay_key = keys::firehose_cursor_key(relay.as_str());
762762- if let Some(v) = Db::get(db.cursors.clone(), per_relay_key).await? {
763763- return Ok(Some(i64::from_be_bytes(
764764- v.as_ref()
765765- .try_into()
766766- .into_diagnostic()
767767- .wrap_err("cursor is not 8 bytes")?,
768768- )));
769769- }
770770-771771- Db::get(db.cursors.clone(), keys::CURSOR_KEY)
754754+ let key = keys::firehose_cursor_key_from_url(relay);
755755+ Db::get(db.cursors.clone(), key)
772756 .await?
773757 .map(|v| {
774758 Ok(i64::from_be_bytes(
-3
src/state.rs
···4444 let filter = new_handle(filter_config);
45454646 let relay_cursors = scc::HashIndex::new();
4747- for url in &config.relays {
4848- let _ = relay_cursors.insert_sync(url.clone(), AtomicI64::new(0));
4949- }
50475148 let (crawler_enabled, _) = watch::channel(crawler_default);
5249 let (firehose_enabled, _) = watch::channel(config.enable_firehose);