very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust
fjall
at-protocol
atproto
indexer
1#![allow(unused_imports)]
2
3#[cfg(feature = "indexer")]
4pub(crate) mod crawler;
5pub(crate) mod filter;
6pub(crate) mod firehose;
7pub(crate) mod pds;
8pub(crate) mod repos;
9mod seed;
10pub(crate) mod stream;
11
12#[cfg(feature = "indexer")]
13mod indexer;
14#[cfg(feature = "indexer")]
15pub use indexer::*;
16
17#[cfg(feature = "relay")]
18mod relay;
19#[cfg(feature = "relay")]
20pub use relay::*;
21
22pub use filter::{FilterControl, FilterPatch, FilterSnapshot};
23pub use firehose::{FirehoseHandle, FirehoseSourceInfo};
24pub use pds::{PdsControl, PdsTierAssignment, PdsTierDefinition};
25pub use repos::{ListedRecord, Record, RecordList, RepoHandle, RepoInfo, ReposControl};
26use smol_str::{SmolStr, ToSmolStr};
27
28use std::collections::BTreeMap;
29use std::future::Future;
30use std::pin::Pin;
31use std::sync::Arc;
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::task::{Context, Poll};
34
35use futures::{FutureExt, Stream};
36use miette::{IntoDiagnostic, Result, WrapErr};
37use tokio::sync::{mpsc, watch};
38use tracing::{debug, error, info};
39
40#[cfg(feature = "indexer")]
41use crate::backfill::BackfillWorker;
42use crate::config::{Config, SignatureVerification};
43#[cfg(feature = "indexer")]
44use crate::db::load_persisted_crawler_sources;
45use crate::db::{self, filter as db_filter, keys, load_persisted_firehose_sources};
46use crate::filter::FilterMode;
47#[cfg(feature = "indexer")]
48use crate::ingest::indexer::FirehoseWorker;
49use crate::pds_meta::{PdsMeta, PdsMetaHandle};
50use crate::state::AppState;
51#[cfg(feature = "indexer_stream")]
52use crate::types::MarshallableEvt;
53use firehose::FirehoseShared;
54#[cfg(feature = "indexer_stream")]
55use stream::event_stream_thread;
56#[cfg(feature = "relay")]
57use stream::relay_stream_thread;
58use url::Url;
59
60#[derive(Debug, Clone)]
61/// infromation about a host hydrant is consuming from.
62pub struct Host {
63 /// hostname of the host.
64 pub name: SmolStr,
65 /// latest seq hydrant has processed from this host.
66 pub seq: i64,
67 /// the amount of accounts hydrant has seen from this host.
68 pub account_count: u64,
69 /// the status of this host in hydrant.
70 pub status: crate::pds_meta::HostStatus,
71}
72
73/// an event emitted by the hydrant event stream.
74///
75/// three variants are possible depending on the `type` field:
76/// - `"record"`: a repo record was created, updated, or deleted. carries a [`RecordEvt`].
77/// - `"identity"`: a DID's handle or PDS changed. carries an [`IdentityEvt`]. ephemeral, not replayable.
78/// - `"account"`: a repo's active/inactive status changed. carries an [`AccountEvt`]. ephemeral, not replayable.
79///
80/// the `id` field is a monotonically increasing sequence number usable as a cursor for [`Hydrant::subscribe`].
81#[cfg(feature = "indexer_stream")]
82pub type Event = MarshallableEvt<'static>;
83
84/// the top-level handle to a hydrant instance.
85///
86/// `Hydrant` is cheaply cloneable. all sub-handles share the same underlying state.
87/// construct it via [`Hydrant::new`] or [`Hydrant::from_env`], configure the filter
88/// and repos as needed, then call [`Hydrant::run`] to start all background components.
89///
90/// # example
91///
92/// ```rust,no_run
93/// use hydrant::control::Hydrant;
94///
95/// #[tokio::main]
96/// async fn main() -> miette::Result<()> {
97/// let hydrant = Hydrant::from_env().await?;
98///
99/// tokio::select! {
100/// r = hydrant.run()? => r,
101/// r = hydrant.serve(3000) => r,
102/// }
103/// }
104/// ```
105#[derive(Clone)]
106pub struct Hydrant {
107 #[cfg(feature = "indexer")]
108 pub crawler: crawler::CrawlerHandle,
109 pub firehose: FirehoseHandle,
110 #[cfg(feature = "indexer")]
111 pub backfill: BackfillHandle,
112 pub filter: FilterControl,
113 pub pds: PdsControl,
114 pub repos: ReposControl,
115 pub db: DbControl,
116 #[cfg(feature = "backlinks")]
117 pub backlinks: crate::backlinks::BacklinksControl,
118 pub(crate) state: Arc<AppState>,
119 config: Arc<Config>,
120 started: Arc<AtomicBool>,
121 _priv: (),
122}
123
124impl Hydrant {
125 /// Returns a reference to the internal DID resolver.
126 pub fn resolver(&self) -> &crate::resolver::Resolver {
127 &self.state.resolver
128 }
129
130 /// open the database and configure hydrant from `config`.
131 ///
132 /// this sets up the database, applies any filter configuration from `config`, and
133 /// initializes all sub-handles. no background tasks are started yet: call
134 /// [`run`](Self::run) to start all components and drive the instance.
135 pub async fn new(config: Config) -> Result<Self> {
136 info!("{config}");
137
138 #[cfg(feature = "relay")]
139 if config.only_index_links {
140 miette::bail!("HYDRANT_ONLY_INDEX_LINKS is not supported in relay mode");
141 }
142
143 // 1. open database and construct AppState
144 let state = AppState::new(&config)?;
145
146 // 2. apply any filter config from env variables
147 if config.full_network
148 || config.filter_signals.is_some()
149 || config.filter_collections.is_some()
150 || config.filter_excludes.is_some()
151 {
152 let filter_ks = state.db.filter.clone();
153 let inner = state.db.inner.clone();
154 let mode = config.full_network.then_some(FilterMode::Full);
155 let signals = config
156 .filter_signals
157 .clone()
158 .map(crate::patch::SetUpdate::Set);
159 let collections = config
160 .filter_collections
161 .clone()
162 .map(crate::patch::SetUpdate::Set);
163 let excludes = config
164 .filter_excludes
165 .clone()
166 .map(crate::patch::SetUpdate::Set);
167
168 tokio::task::spawn_blocking(move || {
169 let mut batch = inner.batch();
170 db_filter::apply_patch(
171 &mut batch,
172 &filter_ks,
173 mode,
174 signals,
175 collections,
176 excludes,
177 )?;
178 batch.commit().into_diagnostic()
179 })
180 .await
181 .into_diagnostic()??;
182
183 // 3. reload the live filter into the hot-path arc-swap
184 let new_filter = tokio::task::spawn_blocking({
185 let filter_ks = state.db.filter.clone();
186 move || db_filter::load(&filter_ks)
187 })
188 .await
189 .into_diagnostic()??;
190 state.filter.store(Arc::new(new_filter));
191 }
192
193 #[cfg(feature = "indexer")]
194 {
195 // 4. set crawler enabled state from config, evaluated against the post-patch filter
196 let post_patch_crawler = match config.enable_crawler {
197 Some(b) => b,
198 None => {
199 state.filter.load().mode == FilterMode::Full
200 || !config.crawler_sources.is_empty()
201 }
202 };
203 state.crawler_enabled.send_replace(post_patch_crawler);
204 }
205
206 let state = Arc::new(state);
207
208 Ok(Self {
209 firehose: FirehoseHandle::new(state.clone()),
210 filter: FilterControl(state.clone()),
211 pds: pds::PdsControl(state.clone()),
212 repos: ReposControl(state.clone()),
213 db: DbControl(state.clone()),
214 #[cfg(feature = "indexer")]
215 crawler: crawler::CrawlerHandle {
216 state: state.clone(),
217 shared: Arc::new(std::sync::OnceLock::new()),
218 tasks: Arc::new(scc::HashMap::new()),
219 persisted: Arc::new(scc::HashSet::new()),
220 },
221 #[cfg(feature = "indexer")]
222 backfill: BackfillHandle::new(state.clone()),
223 #[cfg(feature = "backlinks")]
224 backlinks: crate::backlinks::BacklinksControl(state.clone()),
225 state,
226 config: Arc::new(config),
227 started: Arc::new(AtomicBool::new(false)),
228 _priv: (),
229 })
230 }
231
232 /// reads config from environment variables and calls [`Hydrant::new`].
233 pub async fn from_env() -> Result<Self> {
234 Self::new(Config::from_env()?).await
235 }
236
237 /// start all background components and return a future that resolves when any
238 /// fatal component exits.
239 ///
240 /// starts the backfill worker, firehose ingestors, crawler, and worker thread.
241 /// resolves with `Ok(())` if a fatal component exits cleanly, or `Err(e)` if it
242 /// fails. intended for use in `tokio::select!` alongside [`serve`](Self::serve).
243 ///
244 /// returns an error if called more than once on the same `Hydrant` instance.
245 pub fn run(&self) -> Result<impl Future<Output = Result<()>>> {
246 let state = self.state.clone();
247 let config = self.config.clone();
248 #[cfg(feature = "indexer")]
249 let crawler = self.crawler.clone();
250 let firehose = self.firehose.clone();
251
252 if self.started.swap(true, Ordering::SeqCst) {
253 miette::bail!("Hydrant::run() called more than once");
254 }
255
256 let fut = async move {
257 // raw firehose events from pds/relay to RelayWorker
258 let (buffer_tx, buffer_rx) = mpsc::channel::<crate::ingest::IngestMessage>(500);
259
260 // validated IndexerMessages from RelayWorker/backfill to FirehoseWorker
261 #[cfg(feature = "indexer")]
262 let (indexer_tx, indexer_rx) =
263 mpsc::channel::<crate::ingest::indexer::IndexerMessage>(500);
264
265 // 5. spawn the backfill worker (not used in relay mode)
266 #[cfg(feature = "indexer")]
267 tokio::spawn({
268 let state = state.clone();
269 BackfillWorker::new(
270 state.clone(),
271 indexer_tx.clone(),
272 config.repo_fetch_timeout,
273 config.backfill_concurrency_limit,
274 matches!(
275 config.verify_signatures,
276 SignatureVerification::Full | SignatureVerification::BackfillOnly
277 ),
278 state.backfill_enabled.subscribe(),
279 )
280 .run()
281 });
282
283 // 6. re-queue any repos that lost their backfill state, then start the retry worker
284 #[cfg(feature = "indexer")]
285 {
286 if let Err(e) = tokio::task::spawn_blocking({
287 let state = state.clone();
288 move || crate::backfill::manager::queue_gone_backfills(&state)
289 })
290 .await
291 .into_diagnostic()?
292 {
293 error!(err = %e, "failed to queue gone backfills");
294 db::check_poisoned_report(&e);
295 }
296
297 std::thread::spawn({
298 let state = state.clone();
299 move || crate::backfill::manager::retry_worker(state)
300 });
301 }
302
303 // 7. ephemeral GC thread (not used in relay mode)
304 #[cfg(feature = "indexer_stream")]
305 if config.ephemeral {
306 let state = state.clone();
307 std::thread::Builder::new()
308 .name("ephemeral-gc".into())
309 .spawn(move || crate::db::ephemeral::ephemeral_ttl_worker(state))
310 .into_diagnostic()?;
311 }
312
313 // relay events TTL: relay_events keyspace grows unbounded without pruning
314 #[cfg(feature = "relay")]
315 {
316 let state = state.clone();
317 std::thread::Builder::new()
318 .name("relay-events-gc".into())
319 .spawn(move || crate::db::ephemeral::relay_events_ttl_worker(state))
320 .into_diagnostic()?;
321 }
322
323 // 8. cursor / counts persist thread
324 std::thread::spawn({
325 let state = state.clone();
326 let persist_interval = config.cursor_save_interval;
327 move || loop {
328 std::thread::sleep(persist_interval);
329
330 state.firehose_cursors.iter_sync(|relay, cursor| {
331 let seq = cursor.load(Ordering::SeqCst);
332 if seq > 0 {
333 if let Err(e) = db::set_firehose_cursor(&state.db, relay, seq) {
334 error!(relay = %relay, err = %e, "failed to save cursor");
335 db::check_poisoned_report(&e);
336 }
337 }
338 true
339 });
340
341 let checkpoint_watermark = match state.db.checkpoint_count_deltas() {
342 Ok(watermark) => watermark,
343 Err(e) => {
344 error!(err = %e, "failed to checkpoint count deltas");
345 db::check_poisoned_report(&e);
346 None
347 }
348 };
349
350 if let Err(e) = state.db.persist() {
351 error!(err = %e, "db persist failed");
352 db::check_poisoned_report(&e);
353 } else {
354 let watermark = checkpoint_watermark
355 .map(Ok)
356 .unwrap_or_else(|| db::load_count_delta_watermark(&state.db));
357 match watermark {
358 Ok(watermark) => state.db.mark_count_checkpoint_persisted(watermark),
359 Err(e) => {
360 error!(err = %e, "failed to load durable count checkpoint watermark");
361 db::check_poisoned_report(&e);
362 }
363 }
364 }
365 }
366 });
367
368 // 9. events/sec stats ticker
369 #[cfg(any(feature = "indexer_stream", feature = "relay"))]
370 tokio::spawn({
371 let state = state.clone();
372 let get_id = |state: &AppState| {
373 #[cfg(feature = "indexer_stream")]
374 let id = state.db.next_event_id.load(Ordering::Relaxed);
375 #[cfg(feature = "relay")]
376 let id = state.db.next_relay_seq.load(Ordering::Relaxed);
377 id
378 };
379 let mut last_id = get_id(&state);
380 let mut last_time = std::time::Instant::now();
381 let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
382 async move {
383 loop {
384 interval.tick().await;
385
386 let current_id = get_id(&state);
387 let current_time = std::time::Instant::now();
388 let delta = current_id.saturating_sub(last_id);
389
390 if delta == 0 {
391 debug!("no new events in 60s");
392 continue;
393 }
394
395 let elapsed = current_time.duration_since(last_time).as_secs_f64();
396 let rate = if elapsed > 0.0 {
397 delta as f64 / elapsed
398 } else {
399 0.0
400 };
401 info!("{rate:.2} events/s ({delta} events in {elapsed:.1}s)");
402
403 last_id = current_id;
404 last_time = current_time;
405 }
406 }
407 });
408
409 let (fatal_tx_inner, mut fatal_rx) = watch::channel(None);
410 let fatal_tx = Arc::new(fatal_tx_inner);
411
412 // 10. set shared and spawn firehose ingestors
413 firehose
414 .shared
415 .set(FirehoseShared {
416 buffer_tx: buffer_tx.clone(),
417 verify_signatures: matches!(
418 config.verify_signatures,
419 SignatureVerification::Full
420 ),
421 })
422 .ok()
423 .expect("firehose shared already set");
424 let fire_shared = firehose.shared.get().unwrap();
425
426 // add hosts from config
427 let relay_hosts = config.relays.clone();
428 if !relay_hosts.is_empty() {
429 info!(
430 relay_count = relay_hosts.len(),
431 hosts = relay_hosts
432 .iter()
433 .map(|h| h.url.as_str())
434 .collect::<Vec<_>>()
435 .join(", "),
436 "starting firehose ingestor(s)"
437 );
438 for source in &relay_hosts {
439 let _ = firehose
440 .known_sources
441 .insert_async(source.url.clone(), source.is_pds)
442 .await;
443 firehose
444 .spawn_firehose_ingestor(source, fire_shared, true)
445 .await?;
446 }
447 }
448
449 // add persisted hosts
450 let persisted_sources = tokio::task::spawn_blocking({
451 let state = state.clone();
452 move || load_persisted_firehose_sources(&state.db)
453 })
454 .await
455 .into_diagnostic()??;
456 for source in &persisted_sources {
457 let _ = firehose
458 .known_sources
459 .insert_async(source.url.clone(), source.is_pds)
460 .await;
461 if firehose.tasks.contains_async(&source.url).await {
462 continue;
463 }
464 firehose
465 .spawn_firehose_ingestor(source, fire_shared, true)
466 .await?;
467 }
468 // we use spawn_firehose_ingestor directly here since we dont want
469 // to go through the whole add_source machinery and checks
470 // its ok since we block here before running stuff like seed_hosts
471 // and whatnot
472
473 // 10c. seed firehose PDS sources from listHosts on configured seed URLs
474 if !config.seed_hosts.is_empty() {
475 let seed_urls = config.seed_hosts.clone();
476 let firehose = firehose.clone();
477 let state = state.clone();
478 tokio::spawn(async move {
479 seed::seed_from_list_hosts(&seed_urls, &firehose, &state).await;
480 });
481 }
482
483 // 10d. periodic retry of offline firehose sources
484 if let Some(retry_interval) = config.offline_host_retry_interval {
485 tokio::spawn({
486 let firehose = firehose.clone();
487 async move {
488 loop {
489 tokio::time::sleep(retry_interval).await;
490
491 let mut to_restart: Vec<(Url, bool)> = Vec::new();
492 {
493 let meta = firehose.state.pds_meta.load();
494 firehose
495 .known_sources
496 .iter_async(|url, &is_pds| {
497 if firehose.tasks.contains_sync(url) {
498 return true;
499 }
500 let host = url.host_str().unwrap_or(url.as_str());
501 if meta.is_banned(host) {
502 return true;
503 }
504 to_restart.push((url.clone(), is_pds));
505 true
506 })
507 .await;
508 }
509
510 for (url, is_pds) in to_restart {
511 let _ = firehose.restart_source(url, is_pds).await;
512 }
513 }
514 }
515 });
516 }
517
518 // 11. spawn crawler infrastructure
519 #[cfg(feature = "indexer")]
520 {
521 use crate::crawler::{
522 CrawlerStats, CrawlerWorker, InFlight, RetryProducer, SignalChecker,
523 };
524 use crate::util::throttle::Throttler;
525
526 let http = reqwest::Client::builder()
527 .user_agent(concat!(
528 env!("CARGO_PKG_NAME"),
529 "/",
530 env!("CARGO_PKG_VERSION")
531 ))
532 .gzip(true)
533 .build()
534 .expect("that reqwest will build");
535 let pds_throttler = state.throttler.clone();
536 let in_flight = InFlight::new();
537 let stats = CrawlerStats::new(
538 state.clone(),
539 config
540 .crawler_sources
541 .iter()
542 .map(|s| s.url.clone())
543 .collect(),
544 pds_throttler.clone(),
545 );
546 let checker = SignalChecker {
547 http: http.clone(),
548 state: state.clone(),
549 throttler: pds_throttler,
550 };
551
552 info!(
553 max_pending = config.crawler_max_pending_repos,
554 resume_pending = config.crawler_resume_pending_repos,
555 enabled = *state.crawler_enabled.borrow(),
556 "starting crawler worker"
557 );
558 let (worker, tx) = CrawlerWorker::new(
559 state.clone(),
560 config.crawler_max_pending_repos,
561 config.crawler_resume_pending_repos,
562 stats.clone(),
563 );
564 tokio::spawn(async move {
565 worker.run().await;
566 error!("crawler worker exited unexpectedly, aborting");
567 std::process::abort();
568 });
569
570 let ticker = tokio::spawn(stats.clone().task());
571 tokio::spawn(async move {
572 match ticker.await {
573 Err(e) => error!(err = ?e, "stats ticker panicked, aborting"),
574 Ok(()) => error!("stats ticker exited unexpectedly, aborting"),
575 }
576 std::process::abort();
577 });
578
579 tokio::spawn(
580 RetryProducer {
581 checker: checker.clone(),
582 in_flight: in_flight.clone(),
583 tx: tx.clone(),
584 }
585 .run(),
586 );
587
588 // set shared objects so CrawlerHandle methods can use them
589 crawler
590 .shared
591 .set(crawler::CrawlerShared {
592 http,
593 checker,
594 in_flight,
595 tx,
596 stats,
597 })
598 .ok()
599 .expect("crawler shared already set");
600 let shared = crawler.shared.get().unwrap();
601
602 // spawn initial sources from config
603 for source in config.crawler_sources.iter() {
604 let enabled_rx = state.crawler_enabled.subscribe();
605 let handle = crawler::spawn_crawler_producer(
606 source,
607 &shared.http,
608 &state,
609 &shared.checker,
610 &shared.in_flight,
611 &shared.tx,
612 &shared.stats,
613 enabled_rx,
614 );
615 let _ = crawler.tasks.insert_async(source.url.clone(), handle).await;
616 }
617
618 let persisted_sources = tokio::task::spawn_blocking({
619 let state = state.clone();
620 move || load_persisted_crawler_sources(&state.db)
621 })
622 .await
623 .into_diagnostic()??;
624
625 for source in &persisted_sources {
626 let _ = crawler.persisted.insert_async(source.url.clone()).await;
627 if crawler.tasks.contains_async(&source.url).await {
628 continue;
629 }
630 let enabled_rx = state.crawler_enabled.subscribe();
631 let handle = crawler::spawn_crawler_producer(
632 source,
633 &shared.http,
634 &state,
635 &shared.checker,
636 &shared.in_flight,
637 &shared.tx,
638 &shared.stats,
639 enabled_rx,
640 );
641 let _ = crawler.tasks.insert_async(source.url.clone(), handle).await;
642 }
643 }
644
645 // 12. spawn the relay worker
646 let relay_worker = std::thread::spawn({
647 let state = state.clone();
648 let handle = tokio::runtime::Handle::current();
649 let config = config.clone();
650
651 #[cfg(feature = "indexer")]
652 let hook = indexer_tx.clone();
653
654 move || {
655 crate::ingest::relay::RelayWorker::new(
656 state,
657 buffer_rx,
658 #[cfg(feature = "indexer")]
659 hook,
660 matches!(config.verify_signatures, SignatureVerification::Full),
661 config.firehose_workers,
662 crate::ingest::validation::ValidationOptions {
663 verify_mst: config.verify_mst,
664 rev_clock_skew_secs: config.rev_clock_skew_secs,
665 },
666 )
667 .run(handle)
668 }
669 });
670
671 let tx = Arc::clone(&fatal_tx);
672 tokio::spawn(
673 tokio::task::spawn_blocking(move || {
674 relay_worker
675 .join()
676 .map_err(|e| miette::miette!("relay worker died: {e:?}"))
677 })
678 .map(move |r| {
679 let result = r.into_diagnostic().flatten().flatten();
680 let _ = tx.send(Some(result.map_err(|e| e.to_string())));
681 }),
682 );
683
684 // 13. spawn the firehose worker (if enabled)
685 #[cfg(feature = "indexer")]
686 let firehose_worker = std::thread::spawn({
687 let state = state.clone();
688 let handle = tokio::runtime::Handle::current();
689 let config = config.clone();
690 move || FirehoseWorker::new(state, indexer_rx, config.firehose_workers).run(handle)
691 });
692
693 #[cfg(feature = "indexer")]
694 {
695 let tx = Arc::clone(&fatal_tx);
696 tokio::spawn(
697 tokio::task::spawn_blocking(move || {
698 firehose_worker
699 .join()
700 .map_err(|e| miette::miette!("firehose worker died: {e:?}"))
701 })
702 .map(move |r| {
703 let result = r.into_diagnostic().flatten().flatten();
704 let _ = tx.send(Some(result.map_err(|e| e.to_string())));
705 }),
706 );
707 }
708
709 // drop the local fatal_tx so the watch channel is only kept alive by the
710 // spawned tasks. when all fatal tasks exit (and drop their tx clones),
711 // fatal_rx.changed() returns Err and we return Ok(()).
712 drop(fatal_tx);
713
714 loop {
715 match fatal_rx.changed().await {
716 Ok(()) => {
717 if let Some(result) = fatal_rx.borrow().clone() {
718 return result.map_err(|s| miette::miette!("{s}"));
719 }
720 }
721 // all fatal_tx clones dropped: all tasks finished cleanly
722 Err(_) => return Ok(()),
723 }
724 }
725 };
726 Ok(fut)
727 }
728
729 /// return database counts and on-disk sizes for all keyspaces.
730 ///
731 /// counts include: `repos`, `pending`, `resync`, `records`, `blocks`, `events`,
732 /// `error_ratelimited`, `error_transport`, `error_generic`.
733 ///
734 /// sizes are in bytes, reported per keyspace.
735 pub async fn stats(&self) -> Result<StatsResponse> {
736 let state = self.state.clone();
737
738 #[allow(unused_mut)]
739 let mut count_keys = vec![
740 "repos",
741 "error_ratelimited",
742 "error_transport",
743 "error_generic",
744 ];
745
746 #[cfg(feature = "indexer")]
747 {
748 count_keys.push("pending");
749 count_keys.push("records");
750 count_keys.push("blocks");
751 count_keys.push("resync");
752 }
753
754 #[cfg_attr(
755 not(any(feature = "indexer_stream", feature = "relay")),
756 allow(unused_mut)
757 )]
758 let mut counts: BTreeMap<&'static str, u64> =
759 futures::future::join_all(count_keys.into_iter().map(|name| {
760 let state = state.clone();
761 async move { (name, state.db.get_count(name).await) }
762 }))
763 .await
764 .into_iter()
765 .collect();
766
767 #[cfg(feature = "indexer_stream")]
768 counts.insert("events", state.db.events.approximate_len() as u64);
769
770 #[cfg(feature = "relay")]
771 counts.insert(
772 "relay_events",
773 state.db.relay_events.approximate_len() as u64,
774 );
775
776 let sizes = tokio::task::spawn_blocking(move || {
777 let mut s = BTreeMap::new();
778 s.insert("repos", state.db.repos.disk_space());
779 s.insert("cursors", state.db.cursors.disk_space());
780 s.insert("counts", state.db.counts.disk_space());
781 s.insert("filter", state.db.filter.disk_space());
782 s.insert("crawler", state.db.crawler.disk_space());
783
784 #[cfg(feature = "indexer")]
785 {
786 s.insert("records", state.db.records.disk_space());
787 s.insert("blocks", state.db.blocks.disk_space());
788 s.insert("pending", state.db.pending.disk_space());
789 s.insert("resync", state.db.resync.disk_space());
790 s.insert("resync_buffer", state.db.resync_buffer.disk_space());
791 }
792 #[cfg(feature = "indexer_stream")]
793 s.insert("events", state.db.events.disk_space());
794
795 #[cfg(feature = "relay")]
796 s.insert("relay_events", state.db.relay_events.disk_space());
797
798 #[cfg(feature = "backlinks")]
799 s.insert("backlinks", state.db.backlinks.disk_space());
800
801 s
802 })
803 .await
804 .into_diagnostic()?;
805
806 Ok(StatsResponse { counts, sizes })
807 }
808
809 /// returns a future that runs the HTTP management API server on `0.0.0.0:{port}`.
810 ///
811 /// the server exposes all management endpoints (`/filter`, `/repos`, `/ingestion`,
812 /// `/stream`, `/stats`, `/db/*`, `/xrpc/*`). it runs indefinitely and resolves
813 /// only on error.
814 ///
815 /// intended for `tokio::spawn` or inclusion in a `select!` / task list. the clone
816 /// of `self` is deferred until the future is first polled.
817 ///
818 /// to disable the HTTP API entirely, simply don't call this method.
819 pub fn serve(&self, port: u16) -> impl Future<Output = Result<()>> {
820 let hydrant = self.clone();
821 async move { crate::api::serve(hydrant, port).await }
822 }
823
824 /// returns a future that runs the debug HTTP API server on `127.0.0.1:{port}`.
825 ///
826 /// exposes internal inspection endpoints (`/debug/get`, `/debug/iter`, etc.).
827 /// binds only to loopback.
828 pub fn serve_debug(&self, port: u16) -> impl Future<Output = Result<()>> {
829 let state = self.state.clone();
830 async move { crate::api::serve_debug(state, port).await }
831 }
832
833 /// get the status of a (firehose) host we are consuming from.
834 ///
835 /// returns the seq we are on for this host.
836 pub async fn get_host_status(&self, hostname: &str) -> Result<Option<Host>> {
837 let state = self.state.clone();
838 let hostname = hostname.to_smolstr();
839
840 tokio::task::spawn_blocking(move || {
841 let key = keys::firehose_cursor_key(&hostname);
842
843 let mut seq = 0;
844 if let Some(cursor_bytes) = state.db.cursors.get(&key).into_diagnostic()? {
845 seq = i64::from_be_bytes(cursor_bytes.as_ref().try_into().into_diagnostic()?);
846 } else {
847 // if it has no cursor, check if it's explicitly tracked in hosts map
848 // or firehose tasks (recently added via API but no messages yet)
849 let meta = state.pds_meta.load();
850 if !meta.hosts.contains_key(hostname.as_str()) {
851 // we should also allow it if it's an active firehose ingestor
852 let mut found_in_cursors = false;
853 state.firehose_cursors.iter_sync(|u, _| {
854 if u.host_str() == Some(hostname.as_str()) {
855 found_in_cursors = true;
856 }
857 !found_in_cursors // continue if not found
858 });
859
860 if !found_in_cursors {
861 return Ok(None);
862 }
863 }
864 }
865
866 let account_count = state
867 .db
868 .get_count_sync(&keys::pds_account_count_key(&hostname));
869 let status = state.pds_meta.load().status(&hostname);
870
871 Ok(Some(Host {
872 name: hostname.into(),
873 seq,
874 account_count,
875 status,
876 }))
877 })
878 .await
879 .into_diagnostic()?
880 }
881
882 /// enumerates all hosts hydrant is consuming from.
883 ///
884 /// returns hosts enumerated in this pagination and the cursor to paginate from.
885 pub async fn list_hosts(
886 &self,
887 cursor: Option<&str>,
888 limit: usize,
889 ) -> Result<(Vec<Host>, Option<SmolStr>)> {
890 let state = self.state.clone();
891 let cursor = cursor.map(str::to_string);
892
893 tokio::task::spawn_blocking(move || {
894 let prefix_end = {
895 let mut end = keys::FIREHOSE_CURSOR_PREFIX.to_vec();
896 *end.last_mut().unwrap() += 1;
897 end
898 };
899 let start_bound = match cursor.as_deref() {
900 Some(host) => std::ops::Bound::Excluded(keys::firehose_cursor_key(host)),
901 None => std::ops::Bound::Included(keys::FIREHOSE_CURSOR_PREFIX.to_vec()),
902 };
903
904 // fetch one extra item to detect whether there is a next page
905 let mut hosts: Vec<Host> = Vec::with_capacity(limit + 1);
906 for item in state
907 .db
908 .cursors
909 .range((start_bound, std::ops::Bound::Excluded(prefix_end)))
910 .take(limit + 1)
911 {
912 let (k, v) = item.into_inner().into_diagnostic()?;
913 let hostname = std::str::from_utf8(&k[keys::FIREHOSE_CURSOR_PREFIX.len()..])
914 .into_diagnostic()
915 .wrap_err("firehose cursor key contains non-utf8 hostname")?;
916 let seq = i64::from_be_bytes(
917 v.as_ref()
918 .try_into()
919 .into_diagnostic()
920 .wrap_err("cursor value is not 8 bytes")?,
921 );
922 let account_count = state
923 .db
924 .get_count_sync(&keys::pds_account_count_key(hostname));
925 let status = state.pds_meta.load().status(hostname);
926 hosts.push(Host {
927 name: hostname.into(),
928 seq,
929 account_count,
930 status,
931 });
932 }
933
934 let next_cursor = if hosts.len() > limit {
935 hosts.pop();
936 hosts.last().map(|h| h.name.clone())
937 } else {
938 None
939 };
940
941 Ok((hosts, next_cursor))
942 })
943 .await
944 .into_diagnostic()?
945 }
946}
947
948impl axum::extract::FromRef<Hydrant> for Arc<AppState> {
949 fn from_ref(h: &Hydrant) -> Self {
950 h.state.clone()
951 }
952}
953
954/// database statistics returned by [`Hydrant::stats`].
955#[derive(serde::Serialize)]
956pub struct StatsResponse {
957 /// record counts per logical category (repos, records, events, error kinds, etc.)
958 pub counts: BTreeMap<&'static str, u64>,
959 /// on-disk size in bytes per keyspace
960 pub sizes: BTreeMap<&'static str, u64>,
961}
962
963/// control over database maintenance operations.
964///
965/// all methods pause the crawler, firehose, and backfill worker for the duration
966/// of the operation and restore their prior state on completion, whether or not
967/// the operation succeeds.
968#[derive(Clone)]
969pub struct DbControl(Arc<AppState>);
970
971impl DbControl {
972 /// trigger a major compaction of all keyspaces in parallel.
973 ///
974 /// compaction reclaims disk space from deleted/updated keys and improves
975 /// read performance. can take several minutes on large datasets.
976 pub async fn compact(&self) -> Result<()> {
977 let state = self.0.clone();
978 state
979 .with_ingestion_paused(async || state.db.compact().await)
980 .await
981 }
982
983 /// train zstd compression dictionaries for the `repos`, `blocks`, and `events` keyspaces.
984 ///
985 /// dictionaries are written to `dict_{name}.bin` files inside the database folder.
986 /// a restart is required to apply them. training samples data blocks from the
987 /// existing database, so the database must have a reasonable amount of data first.
988 pub async fn train_dicts(&self) -> Result<()> {
989 let state = self.0.clone();
990 state
991 .with_ingestion_paused(async || {
992 let train = |name: &'static str| {
993 let state = state.clone();
994 tokio::task::spawn_blocking(move || state.db.train_dict(name))
995 .map(|res: Result<_, _>| res.into_diagnostic().flatten())
996 };
997 tokio::try_join!(train("repos"), train("blocks"), train("events")).map(|_| ())
998 })
999 .await
1000 }
1001}