very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
59
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 1001 lines 39 kB view raw
1#![allow(unused_imports)] 2 3#[cfg(feature = "indexer")] 4pub(crate) mod crawler; 5pub(crate) mod filter; 6pub(crate) mod firehose; 7pub(crate) mod pds; 8pub(crate) mod repos; 9mod seed; 10pub(crate) mod stream; 11 12#[cfg(feature = "indexer")] 13mod indexer; 14#[cfg(feature = "indexer")] 15pub use indexer::*; 16 17#[cfg(feature = "relay")] 18mod relay; 19#[cfg(feature = "relay")] 20pub use relay::*; 21 22pub use filter::{FilterControl, FilterPatch, FilterSnapshot}; 23pub use firehose::{FirehoseHandle, FirehoseSourceInfo}; 24pub use pds::{PdsControl, PdsTierAssignment, PdsTierDefinition}; 25pub use repos::{ListedRecord, Record, RecordList, RepoHandle, RepoInfo, ReposControl}; 26use smol_str::{SmolStr, ToSmolStr}; 27 28use std::collections::BTreeMap; 29use std::future::Future; 30use std::pin::Pin; 31use std::sync::Arc; 32use std::sync::atomic::{AtomicBool, Ordering}; 33use std::task::{Context, Poll}; 34 35use futures::{FutureExt, Stream}; 36use miette::{IntoDiagnostic, Result, WrapErr}; 37use tokio::sync::{mpsc, watch}; 38use tracing::{debug, error, info}; 39 40#[cfg(feature = "indexer")] 41use crate::backfill::BackfillWorker; 42use crate::config::{Config, SignatureVerification}; 43#[cfg(feature = "indexer")] 44use crate::db::load_persisted_crawler_sources; 45use crate::db::{self, filter as db_filter, keys, load_persisted_firehose_sources}; 46use crate::filter::FilterMode; 47#[cfg(feature = "indexer")] 48use crate::ingest::indexer::FirehoseWorker; 49use crate::pds_meta::{PdsMeta, PdsMetaHandle}; 50use crate::state::AppState; 51#[cfg(feature = "indexer_stream")] 52use crate::types::MarshallableEvt; 53use firehose::FirehoseShared; 54#[cfg(feature = "indexer_stream")] 55use stream::event_stream_thread; 56#[cfg(feature = "relay")] 57use stream::relay_stream_thread; 58use url::Url; 59 60#[derive(Debug, Clone)] 61/// infromation about a host hydrant is consuming from. 62pub struct Host { 63 /// hostname of the host. 64 pub name: SmolStr, 65 /// latest seq hydrant has processed from this host. 66 pub seq: i64, 67 /// the amount of accounts hydrant has seen from this host. 68 pub account_count: u64, 69 /// the status of this host in hydrant. 70 pub status: crate::pds_meta::HostStatus, 71} 72 73/// an event emitted by the hydrant event stream. 74/// 75/// three variants are possible depending on the `type` field: 76/// - `"record"`: a repo record was created, updated, or deleted. carries a [`RecordEvt`]. 77/// - `"identity"`: a DID's handle or PDS changed. carries an [`IdentityEvt`]. ephemeral, not replayable. 78/// - `"account"`: a repo's active/inactive status changed. carries an [`AccountEvt`]. ephemeral, not replayable. 79/// 80/// the `id` field is a monotonically increasing sequence number usable as a cursor for [`Hydrant::subscribe`]. 81#[cfg(feature = "indexer_stream")] 82pub type Event = MarshallableEvt<'static>; 83 84/// the top-level handle to a hydrant instance. 85/// 86/// `Hydrant` is cheaply cloneable. all sub-handles share the same underlying state. 87/// construct it via [`Hydrant::new`] or [`Hydrant::from_env`], configure the filter 88/// and repos as needed, then call [`Hydrant::run`] to start all background components. 89/// 90/// # example 91/// 92/// ```rust,no_run 93/// use hydrant::control::Hydrant; 94/// 95/// #[tokio::main] 96/// async fn main() -> miette::Result<()> { 97/// let hydrant = Hydrant::from_env().await?; 98/// 99/// tokio::select! { 100/// r = hydrant.run()? => r, 101/// r = hydrant.serve(3000) => r, 102/// } 103/// } 104/// ``` 105#[derive(Clone)] 106pub struct Hydrant { 107 #[cfg(feature = "indexer")] 108 pub crawler: crawler::CrawlerHandle, 109 pub firehose: FirehoseHandle, 110 #[cfg(feature = "indexer")] 111 pub backfill: BackfillHandle, 112 pub filter: FilterControl, 113 pub pds: PdsControl, 114 pub repos: ReposControl, 115 pub db: DbControl, 116 #[cfg(feature = "backlinks")] 117 pub backlinks: crate::backlinks::BacklinksControl, 118 pub(crate) state: Arc<AppState>, 119 config: Arc<Config>, 120 started: Arc<AtomicBool>, 121 _priv: (), 122} 123 124impl Hydrant { 125 /// Returns a reference to the internal DID resolver. 126 pub fn resolver(&self) -> &crate::resolver::Resolver { 127 &self.state.resolver 128 } 129 130 /// open the database and configure hydrant from `config`. 131 /// 132 /// this sets up the database, applies any filter configuration from `config`, and 133 /// initializes all sub-handles. no background tasks are started yet: call 134 /// [`run`](Self::run) to start all components and drive the instance. 135 pub async fn new(config: Config) -> Result<Self> { 136 info!("{config}"); 137 138 #[cfg(feature = "relay")] 139 if config.only_index_links { 140 miette::bail!("HYDRANT_ONLY_INDEX_LINKS is not supported in relay mode"); 141 } 142 143 // 1. open database and construct AppState 144 let state = AppState::new(&config)?; 145 146 // 2. apply any filter config from env variables 147 if config.full_network 148 || config.filter_signals.is_some() 149 || config.filter_collections.is_some() 150 || config.filter_excludes.is_some() 151 { 152 let filter_ks = state.db.filter.clone(); 153 let inner = state.db.inner.clone(); 154 let mode = config.full_network.then_some(FilterMode::Full); 155 let signals = config 156 .filter_signals 157 .clone() 158 .map(crate::patch::SetUpdate::Set); 159 let collections = config 160 .filter_collections 161 .clone() 162 .map(crate::patch::SetUpdate::Set); 163 let excludes = config 164 .filter_excludes 165 .clone() 166 .map(crate::patch::SetUpdate::Set); 167 168 tokio::task::spawn_blocking(move || { 169 let mut batch = inner.batch(); 170 db_filter::apply_patch( 171 &mut batch, 172 &filter_ks, 173 mode, 174 signals, 175 collections, 176 excludes, 177 )?; 178 batch.commit().into_diagnostic() 179 }) 180 .await 181 .into_diagnostic()??; 182 183 // 3. reload the live filter into the hot-path arc-swap 184 let new_filter = tokio::task::spawn_blocking({ 185 let filter_ks = state.db.filter.clone(); 186 move || db_filter::load(&filter_ks) 187 }) 188 .await 189 .into_diagnostic()??; 190 state.filter.store(Arc::new(new_filter)); 191 } 192 193 #[cfg(feature = "indexer")] 194 { 195 // 4. set crawler enabled state from config, evaluated against the post-patch filter 196 let post_patch_crawler = match config.enable_crawler { 197 Some(b) => b, 198 None => { 199 state.filter.load().mode == FilterMode::Full 200 || !config.crawler_sources.is_empty() 201 } 202 }; 203 state.crawler_enabled.send_replace(post_patch_crawler); 204 } 205 206 let state = Arc::new(state); 207 208 Ok(Self { 209 firehose: FirehoseHandle::new(state.clone()), 210 filter: FilterControl(state.clone()), 211 pds: pds::PdsControl(state.clone()), 212 repos: ReposControl(state.clone()), 213 db: DbControl(state.clone()), 214 #[cfg(feature = "indexer")] 215 crawler: crawler::CrawlerHandle { 216 state: state.clone(), 217 shared: Arc::new(std::sync::OnceLock::new()), 218 tasks: Arc::new(scc::HashMap::new()), 219 persisted: Arc::new(scc::HashSet::new()), 220 }, 221 #[cfg(feature = "indexer")] 222 backfill: BackfillHandle::new(state.clone()), 223 #[cfg(feature = "backlinks")] 224 backlinks: crate::backlinks::BacklinksControl(state.clone()), 225 state, 226 config: Arc::new(config), 227 started: Arc::new(AtomicBool::new(false)), 228 _priv: (), 229 }) 230 } 231 232 /// reads config from environment variables and calls [`Hydrant::new`]. 233 pub async fn from_env() -> Result<Self> { 234 Self::new(Config::from_env()?).await 235 } 236 237 /// start all background components and return a future that resolves when any 238 /// fatal component exits. 239 /// 240 /// starts the backfill worker, firehose ingestors, crawler, and worker thread. 241 /// resolves with `Ok(())` if a fatal component exits cleanly, or `Err(e)` if it 242 /// fails. intended for use in `tokio::select!` alongside [`serve`](Self::serve). 243 /// 244 /// returns an error if called more than once on the same `Hydrant` instance. 245 pub fn run(&self) -> Result<impl Future<Output = Result<()>>> { 246 let state = self.state.clone(); 247 let config = self.config.clone(); 248 #[cfg(feature = "indexer")] 249 let crawler = self.crawler.clone(); 250 let firehose = self.firehose.clone(); 251 252 if self.started.swap(true, Ordering::SeqCst) { 253 miette::bail!("Hydrant::run() called more than once"); 254 } 255 256 let fut = async move { 257 // raw firehose events from pds/relay to RelayWorker 258 let (buffer_tx, buffer_rx) = mpsc::channel::<crate::ingest::IngestMessage>(500); 259 260 // validated IndexerMessages from RelayWorker/backfill to FirehoseWorker 261 #[cfg(feature = "indexer")] 262 let (indexer_tx, indexer_rx) = 263 mpsc::channel::<crate::ingest::indexer::IndexerMessage>(500); 264 265 // 5. spawn the backfill worker (not used in relay mode) 266 #[cfg(feature = "indexer")] 267 tokio::spawn({ 268 let state = state.clone(); 269 BackfillWorker::new( 270 state.clone(), 271 indexer_tx.clone(), 272 config.repo_fetch_timeout, 273 config.backfill_concurrency_limit, 274 matches!( 275 config.verify_signatures, 276 SignatureVerification::Full | SignatureVerification::BackfillOnly 277 ), 278 state.backfill_enabled.subscribe(), 279 ) 280 .run() 281 }); 282 283 // 6. re-queue any repos that lost their backfill state, then start the retry worker 284 #[cfg(feature = "indexer")] 285 { 286 if let Err(e) = tokio::task::spawn_blocking({ 287 let state = state.clone(); 288 move || crate::backfill::manager::queue_gone_backfills(&state) 289 }) 290 .await 291 .into_diagnostic()? 292 { 293 error!(err = %e, "failed to queue gone backfills"); 294 db::check_poisoned_report(&e); 295 } 296 297 std::thread::spawn({ 298 let state = state.clone(); 299 move || crate::backfill::manager::retry_worker(state) 300 }); 301 } 302 303 // 7. ephemeral GC thread (not used in relay mode) 304 #[cfg(feature = "indexer_stream")] 305 if config.ephemeral { 306 let state = state.clone(); 307 std::thread::Builder::new() 308 .name("ephemeral-gc".into()) 309 .spawn(move || crate::db::ephemeral::ephemeral_ttl_worker(state)) 310 .into_diagnostic()?; 311 } 312 313 // relay events TTL: relay_events keyspace grows unbounded without pruning 314 #[cfg(feature = "relay")] 315 { 316 let state = state.clone(); 317 std::thread::Builder::new() 318 .name("relay-events-gc".into()) 319 .spawn(move || crate::db::ephemeral::relay_events_ttl_worker(state)) 320 .into_diagnostic()?; 321 } 322 323 // 8. cursor / counts persist thread 324 std::thread::spawn({ 325 let state = state.clone(); 326 let persist_interval = config.cursor_save_interval; 327 move || loop { 328 std::thread::sleep(persist_interval); 329 330 state.firehose_cursors.iter_sync(|relay, cursor| { 331 let seq = cursor.load(Ordering::SeqCst); 332 if seq > 0 { 333 if let Err(e) = db::set_firehose_cursor(&state.db, relay, seq) { 334 error!(relay = %relay, err = %e, "failed to save cursor"); 335 db::check_poisoned_report(&e); 336 } 337 } 338 true 339 }); 340 341 let checkpoint_watermark = match state.db.checkpoint_count_deltas() { 342 Ok(watermark) => watermark, 343 Err(e) => { 344 error!(err = %e, "failed to checkpoint count deltas"); 345 db::check_poisoned_report(&e); 346 None 347 } 348 }; 349 350 if let Err(e) = state.db.persist() { 351 error!(err = %e, "db persist failed"); 352 db::check_poisoned_report(&e); 353 } else { 354 let watermark = checkpoint_watermark 355 .map(Ok) 356 .unwrap_or_else(|| db::load_count_delta_watermark(&state.db)); 357 match watermark { 358 Ok(watermark) => state.db.mark_count_checkpoint_persisted(watermark), 359 Err(e) => { 360 error!(err = %e, "failed to load durable count checkpoint watermark"); 361 db::check_poisoned_report(&e); 362 } 363 } 364 } 365 } 366 }); 367 368 // 9. events/sec stats ticker 369 #[cfg(any(feature = "indexer_stream", feature = "relay"))] 370 tokio::spawn({ 371 let state = state.clone(); 372 let get_id = |state: &AppState| { 373 #[cfg(feature = "indexer_stream")] 374 let id = state.db.next_event_id.load(Ordering::Relaxed); 375 #[cfg(feature = "relay")] 376 let id = state.db.next_relay_seq.load(Ordering::Relaxed); 377 id 378 }; 379 let mut last_id = get_id(&state); 380 let mut last_time = std::time::Instant::now(); 381 let mut interval = tokio::time::interval(std::time::Duration::from_secs(60)); 382 async move { 383 loop { 384 interval.tick().await; 385 386 let current_id = get_id(&state); 387 let current_time = std::time::Instant::now(); 388 let delta = current_id.saturating_sub(last_id); 389 390 if delta == 0 { 391 debug!("no new events in 60s"); 392 continue; 393 } 394 395 let elapsed = current_time.duration_since(last_time).as_secs_f64(); 396 let rate = if elapsed > 0.0 { 397 delta as f64 / elapsed 398 } else { 399 0.0 400 }; 401 info!("{rate:.2} events/s ({delta} events in {elapsed:.1}s)"); 402 403 last_id = current_id; 404 last_time = current_time; 405 } 406 } 407 }); 408 409 let (fatal_tx_inner, mut fatal_rx) = watch::channel(None); 410 let fatal_tx = Arc::new(fatal_tx_inner); 411 412 // 10. set shared and spawn firehose ingestors 413 firehose 414 .shared 415 .set(FirehoseShared { 416 buffer_tx: buffer_tx.clone(), 417 verify_signatures: matches!( 418 config.verify_signatures, 419 SignatureVerification::Full 420 ), 421 }) 422 .ok() 423 .expect("firehose shared already set"); 424 let fire_shared = firehose.shared.get().unwrap(); 425 426 // add hosts from config 427 let relay_hosts = config.relays.clone(); 428 if !relay_hosts.is_empty() { 429 info!( 430 relay_count = relay_hosts.len(), 431 hosts = relay_hosts 432 .iter() 433 .map(|h| h.url.as_str()) 434 .collect::<Vec<_>>() 435 .join(", "), 436 "starting firehose ingestor(s)" 437 ); 438 for source in &relay_hosts { 439 let _ = firehose 440 .known_sources 441 .insert_async(source.url.clone(), source.is_pds) 442 .await; 443 firehose 444 .spawn_firehose_ingestor(source, fire_shared, true) 445 .await?; 446 } 447 } 448 449 // add persisted hosts 450 let persisted_sources = tokio::task::spawn_blocking({ 451 let state = state.clone(); 452 move || load_persisted_firehose_sources(&state.db) 453 }) 454 .await 455 .into_diagnostic()??; 456 for source in &persisted_sources { 457 let _ = firehose 458 .known_sources 459 .insert_async(source.url.clone(), source.is_pds) 460 .await; 461 if firehose.tasks.contains_async(&source.url).await { 462 continue; 463 } 464 firehose 465 .spawn_firehose_ingestor(source, fire_shared, true) 466 .await?; 467 } 468 // we use spawn_firehose_ingestor directly here since we dont want 469 // to go through the whole add_source machinery and checks 470 // its ok since we block here before running stuff like seed_hosts 471 // and whatnot 472 473 // 10c. seed firehose PDS sources from listHosts on configured seed URLs 474 if !config.seed_hosts.is_empty() { 475 let seed_urls = config.seed_hosts.clone(); 476 let firehose = firehose.clone(); 477 let state = state.clone(); 478 tokio::spawn(async move { 479 seed::seed_from_list_hosts(&seed_urls, &firehose, &state).await; 480 }); 481 } 482 483 // 10d. periodic retry of offline firehose sources 484 if let Some(retry_interval) = config.offline_host_retry_interval { 485 tokio::spawn({ 486 let firehose = firehose.clone(); 487 async move { 488 loop { 489 tokio::time::sleep(retry_interval).await; 490 491 let mut to_restart: Vec<(Url, bool)> = Vec::new(); 492 { 493 let meta = firehose.state.pds_meta.load(); 494 firehose 495 .known_sources 496 .iter_async(|url, &is_pds| { 497 if firehose.tasks.contains_sync(url) { 498 return true; 499 } 500 let host = url.host_str().unwrap_or(url.as_str()); 501 if meta.is_banned(host) { 502 return true; 503 } 504 to_restart.push((url.clone(), is_pds)); 505 true 506 }) 507 .await; 508 } 509 510 for (url, is_pds) in to_restart { 511 let _ = firehose.restart_source(url, is_pds).await; 512 } 513 } 514 } 515 }); 516 } 517 518 // 11. spawn crawler infrastructure 519 #[cfg(feature = "indexer")] 520 { 521 use crate::crawler::{ 522 CrawlerStats, CrawlerWorker, InFlight, RetryProducer, SignalChecker, 523 }; 524 use crate::util::throttle::Throttler; 525 526 let http = reqwest::Client::builder() 527 .user_agent(concat!( 528 env!("CARGO_PKG_NAME"), 529 "/", 530 env!("CARGO_PKG_VERSION") 531 )) 532 .gzip(true) 533 .build() 534 .expect("that reqwest will build"); 535 let pds_throttler = state.throttler.clone(); 536 let in_flight = InFlight::new(); 537 let stats = CrawlerStats::new( 538 state.clone(), 539 config 540 .crawler_sources 541 .iter() 542 .map(|s| s.url.clone()) 543 .collect(), 544 pds_throttler.clone(), 545 ); 546 let checker = SignalChecker { 547 http: http.clone(), 548 state: state.clone(), 549 throttler: pds_throttler, 550 }; 551 552 info!( 553 max_pending = config.crawler_max_pending_repos, 554 resume_pending = config.crawler_resume_pending_repos, 555 enabled = *state.crawler_enabled.borrow(), 556 "starting crawler worker" 557 ); 558 let (worker, tx) = CrawlerWorker::new( 559 state.clone(), 560 config.crawler_max_pending_repos, 561 config.crawler_resume_pending_repos, 562 stats.clone(), 563 ); 564 tokio::spawn(async move { 565 worker.run().await; 566 error!("crawler worker exited unexpectedly, aborting"); 567 std::process::abort(); 568 }); 569 570 let ticker = tokio::spawn(stats.clone().task()); 571 tokio::spawn(async move { 572 match ticker.await { 573 Err(e) => error!(err = ?e, "stats ticker panicked, aborting"), 574 Ok(()) => error!("stats ticker exited unexpectedly, aborting"), 575 } 576 std::process::abort(); 577 }); 578 579 tokio::spawn( 580 RetryProducer { 581 checker: checker.clone(), 582 in_flight: in_flight.clone(), 583 tx: tx.clone(), 584 } 585 .run(), 586 ); 587 588 // set shared objects so CrawlerHandle methods can use them 589 crawler 590 .shared 591 .set(crawler::CrawlerShared { 592 http, 593 checker, 594 in_flight, 595 tx, 596 stats, 597 }) 598 .ok() 599 .expect("crawler shared already set"); 600 let shared = crawler.shared.get().unwrap(); 601 602 // spawn initial sources from config 603 for source in config.crawler_sources.iter() { 604 let enabled_rx = state.crawler_enabled.subscribe(); 605 let handle = crawler::spawn_crawler_producer( 606 source, 607 &shared.http, 608 &state, 609 &shared.checker, 610 &shared.in_flight, 611 &shared.tx, 612 &shared.stats, 613 enabled_rx, 614 ); 615 let _ = crawler.tasks.insert_async(source.url.clone(), handle).await; 616 } 617 618 let persisted_sources = tokio::task::spawn_blocking({ 619 let state = state.clone(); 620 move || load_persisted_crawler_sources(&state.db) 621 }) 622 .await 623 .into_diagnostic()??; 624 625 for source in &persisted_sources { 626 let _ = crawler.persisted.insert_async(source.url.clone()).await; 627 if crawler.tasks.contains_async(&source.url).await { 628 continue; 629 } 630 let enabled_rx = state.crawler_enabled.subscribe(); 631 let handle = crawler::spawn_crawler_producer( 632 source, 633 &shared.http, 634 &state, 635 &shared.checker, 636 &shared.in_flight, 637 &shared.tx, 638 &shared.stats, 639 enabled_rx, 640 ); 641 let _ = crawler.tasks.insert_async(source.url.clone(), handle).await; 642 } 643 } 644 645 // 12. spawn the relay worker 646 let relay_worker = std::thread::spawn({ 647 let state = state.clone(); 648 let handle = tokio::runtime::Handle::current(); 649 let config = config.clone(); 650 651 #[cfg(feature = "indexer")] 652 let hook = indexer_tx.clone(); 653 654 move || { 655 crate::ingest::relay::RelayWorker::new( 656 state, 657 buffer_rx, 658 #[cfg(feature = "indexer")] 659 hook, 660 matches!(config.verify_signatures, SignatureVerification::Full), 661 config.firehose_workers, 662 crate::ingest::validation::ValidationOptions { 663 verify_mst: config.verify_mst, 664 rev_clock_skew_secs: config.rev_clock_skew_secs, 665 }, 666 ) 667 .run(handle) 668 } 669 }); 670 671 let tx = Arc::clone(&fatal_tx); 672 tokio::spawn( 673 tokio::task::spawn_blocking(move || { 674 relay_worker 675 .join() 676 .map_err(|e| miette::miette!("relay worker died: {e:?}")) 677 }) 678 .map(move |r| { 679 let result = r.into_diagnostic().flatten().flatten(); 680 let _ = tx.send(Some(result.map_err(|e| e.to_string()))); 681 }), 682 ); 683 684 // 13. spawn the firehose worker (if enabled) 685 #[cfg(feature = "indexer")] 686 let firehose_worker = std::thread::spawn({ 687 let state = state.clone(); 688 let handle = tokio::runtime::Handle::current(); 689 let config = config.clone(); 690 move || FirehoseWorker::new(state, indexer_rx, config.firehose_workers).run(handle) 691 }); 692 693 #[cfg(feature = "indexer")] 694 { 695 let tx = Arc::clone(&fatal_tx); 696 tokio::spawn( 697 tokio::task::spawn_blocking(move || { 698 firehose_worker 699 .join() 700 .map_err(|e| miette::miette!("firehose worker died: {e:?}")) 701 }) 702 .map(move |r| { 703 let result = r.into_diagnostic().flatten().flatten(); 704 let _ = tx.send(Some(result.map_err(|e| e.to_string()))); 705 }), 706 ); 707 } 708 709 // drop the local fatal_tx so the watch channel is only kept alive by the 710 // spawned tasks. when all fatal tasks exit (and drop their tx clones), 711 // fatal_rx.changed() returns Err and we return Ok(()). 712 drop(fatal_tx); 713 714 loop { 715 match fatal_rx.changed().await { 716 Ok(()) => { 717 if let Some(result) = fatal_rx.borrow().clone() { 718 return result.map_err(|s| miette::miette!("{s}")); 719 } 720 } 721 // all fatal_tx clones dropped: all tasks finished cleanly 722 Err(_) => return Ok(()), 723 } 724 } 725 }; 726 Ok(fut) 727 } 728 729 /// return database counts and on-disk sizes for all keyspaces. 730 /// 731 /// counts include: `repos`, `pending`, `resync`, `records`, `blocks`, `events`, 732 /// `error_ratelimited`, `error_transport`, `error_generic`. 733 /// 734 /// sizes are in bytes, reported per keyspace. 735 pub async fn stats(&self) -> Result<StatsResponse> { 736 let state = self.state.clone(); 737 738 #[allow(unused_mut)] 739 let mut count_keys = vec![ 740 "repos", 741 "error_ratelimited", 742 "error_transport", 743 "error_generic", 744 ]; 745 746 #[cfg(feature = "indexer")] 747 { 748 count_keys.push("pending"); 749 count_keys.push("records"); 750 count_keys.push("blocks"); 751 count_keys.push("resync"); 752 } 753 754 #[cfg_attr( 755 not(any(feature = "indexer_stream", feature = "relay")), 756 allow(unused_mut) 757 )] 758 let mut counts: BTreeMap<&'static str, u64> = 759 futures::future::join_all(count_keys.into_iter().map(|name| { 760 let state = state.clone(); 761 async move { (name, state.db.get_count(name).await) } 762 })) 763 .await 764 .into_iter() 765 .collect(); 766 767 #[cfg(feature = "indexer_stream")] 768 counts.insert("events", state.db.events.approximate_len() as u64); 769 770 #[cfg(feature = "relay")] 771 counts.insert( 772 "relay_events", 773 state.db.relay_events.approximate_len() as u64, 774 ); 775 776 let sizes = tokio::task::spawn_blocking(move || { 777 let mut s = BTreeMap::new(); 778 s.insert("repos", state.db.repos.disk_space()); 779 s.insert("cursors", state.db.cursors.disk_space()); 780 s.insert("counts", state.db.counts.disk_space()); 781 s.insert("filter", state.db.filter.disk_space()); 782 s.insert("crawler", state.db.crawler.disk_space()); 783 784 #[cfg(feature = "indexer")] 785 { 786 s.insert("records", state.db.records.disk_space()); 787 s.insert("blocks", state.db.blocks.disk_space()); 788 s.insert("pending", state.db.pending.disk_space()); 789 s.insert("resync", state.db.resync.disk_space()); 790 s.insert("resync_buffer", state.db.resync_buffer.disk_space()); 791 } 792 #[cfg(feature = "indexer_stream")] 793 s.insert("events", state.db.events.disk_space()); 794 795 #[cfg(feature = "relay")] 796 s.insert("relay_events", state.db.relay_events.disk_space()); 797 798 #[cfg(feature = "backlinks")] 799 s.insert("backlinks", state.db.backlinks.disk_space()); 800 801 s 802 }) 803 .await 804 .into_diagnostic()?; 805 806 Ok(StatsResponse { counts, sizes }) 807 } 808 809 /// returns a future that runs the HTTP management API server on `0.0.0.0:{port}`. 810 /// 811 /// the server exposes all management endpoints (`/filter`, `/repos`, `/ingestion`, 812 /// `/stream`, `/stats`, `/db/*`, `/xrpc/*`). it runs indefinitely and resolves 813 /// only on error. 814 /// 815 /// intended for `tokio::spawn` or inclusion in a `select!` / task list. the clone 816 /// of `self` is deferred until the future is first polled. 817 /// 818 /// to disable the HTTP API entirely, simply don't call this method. 819 pub fn serve(&self, port: u16) -> impl Future<Output = Result<()>> { 820 let hydrant = self.clone(); 821 async move { crate::api::serve(hydrant, port).await } 822 } 823 824 /// returns a future that runs the debug HTTP API server on `127.0.0.1:{port}`. 825 /// 826 /// exposes internal inspection endpoints (`/debug/get`, `/debug/iter`, etc.). 827 /// binds only to loopback. 828 pub fn serve_debug(&self, port: u16) -> impl Future<Output = Result<()>> { 829 let state = self.state.clone(); 830 async move { crate::api::serve_debug(state, port).await } 831 } 832 833 /// get the status of a (firehose) host we are consuming from. 834 /// 835 /// returns the seq we are on for this host. 836 pub async fn get_host_status(&self, hostname: &str) -> Result<Option<Host>> { 837 let state = self.state.clone(); 838 let hostname = hostname.to_smolstr(); 839 840 tokio::task::spawn_blocking(move || { 841 let key = keys::firehose_cursor_key(&hostname); 842 843 let mut seq = 0; 844 if let Some(cursor_bytes) = state.db.cursors.get(&key).into_diagnostic()? { 845 seq = i64::from_be_bytes(cursor_bytes.as_ref().try_into().into_diagnostic()?); 846 } else { 847 // if it has no cursor, check if it's explicitly tracked in hosts map 848 // or firehose tasks (recently added via API but no messages yet) 849 let meta = state.pds_meta.load(); 850 if !meta.hosts.contains_key(hostname.as_str()) { 851 // we should also allow it if it's an active firehose ingestor 852 let mut found_in_cursors = false; 853 state.firehose_cursors.iter_sync(|u, _| { 854 if u.host_str() == Some(hostname.as_str()) { 855 found_in_cursors = true; 856 } 857 !found_in_cursors // continue if not found 858 }); 859 860 if !found_in_cursors { 861 return Ok(None); 862 } 863 } 864 } 865 866 let account_count = state 867 .db 868 .get_count_sync(&keys::pds_account_count_key(&hostname)); 869 let status = state.pds_meta.load().status(&hostname); 870 871 Ok(Some(Host { 872 name: hostname.into(), 873 seq, 874 account_count, 875 status, 876 })) 877 }) 878 .await 879 .into_diagnostic()? 880 } 881 882 /// enumerates all hosts hydrant is consuming from. 883 /// 884 /// returns hosts enumerated in this pagination and the cursor to paginate from. 885 pub async fn list_hosts( 886 &self, 887 cursor: Option<&str>, 888 limit: usize, 889 ) -> Result<(Vec<Host>, Option<SmolStr>)> { 890 let state = self.state.clone(); 891 let cursor = cursor.map(str::to_string); 892 893 tokio::task::spawn_blocking(move || { 894 let prefix_end = { 895 let mut end = keys::FIREHOSE_CURSOR_PREFIX.to_vec(); 896 *end.last_mut().unwrap() += 1; 897 end 898 }; 899 let start_bound = match cursor.as_deref() { 900 Some(host) => std::ops::Bound::Excluded(keys::firehose_cursor_key(host)), 901 None => std::ops::Bound::Included(keys::FIREHOSE_CURSOR_PREFIX.to_vec()), 902 }; 903 904 // fetch one extra item to detect whether there is a next page 905 let mut hosts: Vec<Host> = Vec::with_capacity(limit + 1); 906 for item in state 907 .db 908 .cursors 909 .range((start_bound, std::ops::Bound::Excluded(prefix_end))) 910 .take(limit + 1) 911 { 912 let (k, v) = item.into_inner().into_diagnostic()?; 913 let hostname = std::str::from_utf8(&k[keys::FIREHOSE_CURSOR_PREFIX.len()..]) 914 .into_diagnostic() 915 .wrap_err("firehose cursor key contains non-utf8 hostname")?; 916 let seq = i64::from_be_bytes( 917 v.as_ref() 918 .try_into() 919 .into_diagnostic() 920 .wrap_err("cursor value is not 8 bytes")?, 921 ); 922 let account_count = state 923 .db 924 .get_count_sync(&keys::pds_account_count_key(hostname)); 925 let status = state.pds_meta.load().status(hostname); 926 hosts.push(Host { 927 name: hostname.into(), 928 seq, 929 account_count, 930 status, 931 }); 932 } 933 934 let next_cursor = if hosts.len() > limit { 935 hosts.pop(); 936 hosts.last().map(|h| h.name.clone()) 937 } else { 938 None 939 }; 940 941 Ok((hosts, next_cursor)) 942 }) 943 .await 944 .into_diagnostic()? 945 } 946} 947 948impl axum::extract::FromRef<Hydrant> for Arc<AppState> { 949 fn from_ref(h: &Hydrant) -> Self { 950 h.state.clone() 951 } 952} 953 954/// database statistics returned by [`Hydrant::stats`]. 955#[derive(serde::Serialize)] 956pub struct StatsResponse { 957 /// record counts per logical category (repos, records, events, error kinds, etc.) 958 pub counts: BTreeMap<&'static str, u64>, 959 /// on-disk size in bytes per keyspace 960 pub sizes: BTreeMap<&'static str, u64>, 961} 962 963/// control over database maintenance operations. 964/// 965/// all methods pause the crawler, firehose, and backfill worker for the duration 966/// of the operation and restore their prior state on completion, whether or not 967/// the operation succeeds. 968#[derive(Clone)] 969pub struct DbControl(Arc<AppState>); 970 971impl DbControl { 972 /// trigger a major compaction of all keyspaces in parallel. 973 /// 974 /// compaction reclaims disk space from deleted/updated keys and improves 975 /// read performance. can take several minutes on large datasets. 976 pub async fn compact(&self) -> Result<()> { 977 let state = self.0.clone(); 978 state 979 .with_ingestion_paused(async || state.db.compact().await) 980 .await 981 } 982 983 /// train zstd compression dictionaries for the `repos`, `blocks`, and `events` keyspaces. 984 /// 985 /// dictionaries are written to `dict_{name}.bin` files inside the database folder. 986 /// a restart is required to apply them. training samples data blocks from the 987 /// existing database, so the database must have a reasonable amount of data first. 988 pub async fn train_dicts(&self) -> Result<()> { 989 let state = self.0.clone(); 990 state 991 .with_ingestion_paused(async || { 992 let train = |name: &'static str| { 993 let state = state.clone(); 994 tokio::task::spawn_blocking(move || state.db.train_dict(name)) 995 .map(|res: Result<_, _>| res.into_diagnostic().flatten()) 996 }; 997 tokio::try_join!(train("repos"), train("blocks"), train("events")).map(|_| ()) 998 }) 999 .await 1000 } 1001}