very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
59
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 960 lines 37 kB view raw
1use crate::pds_meta::{TierPolicy, TierRule}; 2use miette::Result; 3use serde::{Deserialize, Serialize}; 4use smol_str::{SmolStr, ToSmolStr}; 5use std::collections::HashMap; 6use std::fmt; 7use std::path::PathBuf; 8use std::str::FromStr; 9use std::time::Duration; 10use url::Url; 11 12/// rate limit parameters for a named tier of PDS connections. 13/// 14/// the per-second limit is `max(per_second_base, accounts * per_second_account_mul)`, 15/// giving a floor at `per_second_base` that scales up with the PDS's active account count. 16#[derive(Debug, Clone, Copy)] 17pub struct RateTier { 18 /// floor for the per-second limit, regardless of account count. 19 pub per_second_base: u64, 20 /// per-second events allowed per active account on this PDS. 21 pub per_second_account_mul: f64, 22 /// per-hour limit. 23 pub per_hour: u64, 24 /// per-day limit. 25 pub per_day: u64, 26 /// maximum active account limit for this host before dropping tracking of new accounts 27 pub account_limit: Option<u64>, 28} 29 30impl RateTier { 31 /// built-in "trusted" tier: high limits for well-behaved PDS operators. 32 pub fn trusted() -> Self { 33 Self { 34 per_second_base: 5000, 35 per_second_account_mul: 10.0, 36 per_hour: 5000 * 3600, 37 per_day: 5000 * 86400, 38 account_limit: Some(10_000_000), 39 } 40 } 41 42 /// built-in "default" tier: conservative limits for unknown PDS operators. 43 pub fn default_tier() -> Self { 44 Self { 45 per_second_base: 50, 46 per_second_account_mul: 0.5, 47 per_hour: 1000 * 3600, 48 per_day: 1000 * 86400, 49 account_limit: Some(100), 50 } 51 } 52 53 /// parse `base/mul/hourly/daily[/account_limit]` format used by `HYDRANT_RATE_TIERS`. 54 fn parse(s: &str) -> Option<Self> { 55 let parts: Vec<&str> = s.split('/').collect(); 56 if parts.len() < 4 || parts.len() > 5 { 57 return None; 58 } 59 Some(Self { 60 per_second_base: parts[0].parse().ok()?, 61 per_second_account_mul: parts[1].parse().ok()?, 62 per_hour: parts[2].parse().ok()?, 63 per_day: parts[3].parse().ok()?, 64 account_limit: parts.get(4).and_then(|p| p.parse().ok()), 65 }) 66 } 67} 68 69/// this is for internal use only, please don't use this macro. 70#[doc(hidden)] 71#[macro_export] 72macro_rules! __cfg { 73 (@val $key:expr) => { 74 std::env::var(concat!("HYDRANT_", $key)) 75 }; 76 ($key:expr, $default:expr, sec) => { 77 cfg!(@val $key) 78 .ok() 79 .and_then(|s| humantime::parse_duration(&s).ok()) 80 .unwrap_or($default) 81 }; 82 ($key:expr, $default:expr) => { 83 cfg!(@val $key) 84 .ok() 85 .and_then(|s| s.parse().ok()) 86 .unwrap_or($default.to_owned()) 87 .into() 88 }; 89} 90use crate::__cfg as cfg; 91 92/// loads `.env` from the current directory, setting any variables not already in the environment. 93fn load_dotenv() { 94 let Ok(contents) = std::fs::read_to_string(".env") else { 95 return; 96 }; 97 for line in contents.lines() { 98 let line = line.trim(); 99 if line.is_empty() || line.starts_with('#') { 100 continue; 101 } 102 let Some((key, val)) = line.split_once('=') else { 103 continue; 104 }; 105 let key = key.trim(); 106 let val = val.trim(); 107 let val = val 108 .strip_prefix('"') 109 .and_then(|v| v.strip_suffix('"')) 110 .or_else(|| val.strip_prefix('\'').and_then(|v| v.strip_suffix('\''))) 111 .unwrap_or(val); 112 if std::env::var(key).is_err() { 113 // SAFETY: single-threaded at startup; no other threads are reading env yet. 114 unsafe { std::env::set_var(key, val) }; 115 } 116 } 117} 118 119#[derive(Debug, Clone, Copy, PartialEq, Eq)] 120pub enum CrawlerMode { 121 /// enumerate via `com.atproto.sync.listRepos`, check signals with `describeRepo`. 122 ListRepos, 123 /// enumerate via `com.atproto.sync.listReposByCollection` for each configured signal. 124 /// note: if no signals are specified, this won't crawl for any repos. 125 ByCollection, 126} 127 128impl CrawlerMode { 129 fn default_for(full_network: bool) -> Self { 130 full_network 131 .then_some(Self::ListRepos) 132 .unwrap_or(Self::ByCollection) 133 } 134} 135 136impl Serialize for CrawlerMode { 137 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> 138 where 139 S: serde::Serializer, 140 { 141 serializer.serialize_str(&self.to_smolstr()) 142 } 143} 144 145impl<'de> Deserialize<'de> for CrawlerMode { 146 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> 147 where 148 D: serde::Deserializer<'de>, 149 { 150 let s = String::deserialize(deserializer)?; 151 FromStr::from_str(&s).map_err(serde::de::Error::custom) 152 } 153} 154 155impl FromStr for CrawlerMode { 156 type Err = miette::Error; 157 fn from_str(s: &str) -> Result<Self> { 158 match s { 159 "list_repos" | "list-repos" => Ok(Self::ListRepos), 160 "by_collection" | "by-collection" => Ok(Self::ByCollection), 161 _ => Err(miette::miette!( 162 "invalid crawler mode: expected 'list_repos' or 'by_collection'" 163 )), 164 } 165 } 166} 167 168impl fmt::Display for CrawlerMode { 169 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 170 match self { 171 Self::ListRepos => write!(f, "list_repos"), 172 Self::ByCollection => write!(f, "by_collection"), 173 } 174 } 175} 176 177/// a single crawler source: a URL and the mode used to enumerate it. 178#[derive(Debug, Clone)] 179pub struct CrawlerSource { 180 pub url: Url, 181 pub mode: CrawlerMode, 182} 183 184/// a single firehose source: a URL and whether it is a direct PDS connection. 185/// 186/// set via `HYDRANT_RELAY_HOSTS` as a comma-separated list of `[pds::]url` entries. 187/// e.g. `wss://bsky.network,pds::wss://pds.example.com`. 188/// a bare URL (no `pds::` prefix) is treated as an aggregating relay (`is_pds = false`). 189#[derive(Debug, Clone)] 190pub struct FirehoseSource { 191 pub url: Url, 192 /// true when this is a direct PDS connection; enables host authority enforcement. 193 pub is_pds: bool, 194} 195 196impl FirehoseSource { 197 /// parse `[pds::]url`. the `pds::` prefix marks the source as a direct PDS connection. 198 pub fn parse(s: &str) -> Option<Self> { 199 if let Some(url_str) = s.strip_prefix("pds::") { 200 let url = Url::parse(url_str).ok()?; 201 Some(Self { url, is_pds: true }) 202 } else { 203 let url = Url::parse(s).ok()?; 204 Some(Self { url, is_pds: false }) 205 } 206 } 207} 208 209impl CrawlerSource { 210 /// parse `[mode::]url`. mode prefix is optional, falls back to `default_mode`. 211 fn parse(s: &str, default_mode: CrawlerMode) -> Option<Self> { 212 if let Some((prefix, rest)) = s.split_once("::") { 213 let mode = prefix.parse().ok()?; 214 let url = Url::parse(rest).ok()?; 215 Some(Self { url, mode }) 216 } else { 217 let url = Url::parse(s).ok()?; 218 Some(Self { 219 url, 220 mode: default_mode, 221 }) 222 } 223 } 224} 225 226#[derive(Debug, Clone, Copy, PartialEq, Eq)] 227pub enum Compression { 228 Lz4, 229 Zstd, 230 None, 231} 232 233impl FromStr for Compression { 234 type Err = miette::Error; 235 fn from_str(s: &str) -> Result<Self> { 236 match s { 237 "lz4" => Ok(Self::Lz4), 238 "zstd" => Ok(Self::Zstd), 239 "none" => Ok(Self::None), 240 _ => Err(miette::miette!("invalid compression type")), 241 } 242 } 243} 244 245impl fmt::Display for Compression { 246 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 247 match self { 248 Self::Lz4 => write!(f, "lz4"), 249 Self::Zstd => write!(f, "zstd"), 250 Self::None => write!(f, "none"), 251 } 252 } 253} 254 255#[derive(Debug, Clone, Copy)] 256pub enum SignatureVerification { 257 /// verify all commits, from the firehose and when backfilling a repo from a PDS. 258 Full, 259 /// only verify commits when backfilling a repo from a PDS. 260 BackfillOnly, 261 /// don't verify anything. 262 None, 263} 264 265impl FromStr for SignatureVerification { 266 type Err = miette::Error; 267 fn from_str(s: &str) -> Result<Self> { 268 match s { 269 "full" => Ok(Self::Full), 270 "backfill-only" => Ok(Self::BackfillOnly), 271 "none" => Ok(Self::None), 272 _ => Err(miette::miette!("invalid signature verification level")), 273 } 274 } 275} 276 277impl fmt::Display for SignatureVerification { 278 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 279 match self { 280 Self::Full => write!(f, "full"), 281 Self::BackfillOnly => write!(f, "backfill-only"), 282 Self::None => write!(f, "none"), 283 } 284 } 285} 286 287#[derive(Debug, Clone)] 288pub struct Config { 289 /// path to the database folder. set via `HYDRANT_DATABASE_PATH`. 290 pub database_path: PathBuf, 291 /// if `true`, discovers and indexes all repositories in the network. 292 /// set via `HYDRANT_FULL_NETWORK`. 293 pub full_network: bool, 294 /// if `true`, no records are stored; events are deleted after `ephemeral_ttl`. 295 /// set via `HYDRANT_EPHEMERAL`. 296 pub ephemeral: bool, 297 /// how long events are retained in ephemeral mode before deletion. 298 /// set via `HYDRANT_EPHEMERAL_TTL` (humantime duration, e.g. `60min`). 299 pub ephemeral_ttl: Duration, 300 301 /// firehose sources for ingestion. set via `HYDRANT_RELAY_HOST` (single) 302 /// or `HYDRANT_RELAY_HOSTS` (comma-separated; takes precedence). 303 /// prefix a URL with `pds::` to mark it as a direct PDS connection. 304 pub relays: Vec<FirehoseSource>, 305 /// base URL(s) of the PLC directory (comma-separated for multiple). 306 /// defaults to `https://plc.wtf`, or `https://plc.directory` in full-network mode. 307 /// set via `HYDRANT_PLC_URL`. 308 pub plc_urls: Vec<Url>, 309 /// whether to ingest events from relay firehose subscriptions. 310 /// set via `HYDRANT_ENABLE_FIREHOSE`. 311 pub enable_firehose: bool, 312 /// number of concurrent workers processing firehose events. 313 /// set via `HYDRANT_FIREHOSE_WORKERS`. 314 pub firehose_workers: usize, 315 /// how often the firehose cursor is persisted to disk. 316 /// set via `HYDRANT_CURSOR_SAVE_INTERVAL` (humantime duration, e.g. `3sec`). 317 pub cursor_save_interval: Duration, 318 /// timeout for fetching a full repository CAR during backfill. 319 /// set via `HYDRANT_REPO_FETCH_TIMEOUT` (humantime duration, e.g. `5min`). 320 pub repo_fetch_timeout: Duration, 321 /// maximum number of concurrent backfill tasks. 322 /// set via `HYDRANT_BACKFILL_CONCURRENCY_LIMIT`. 323 pub backfill_concurrency_limit: usize, 324 325 /// whether to run the network crawler. `None` defers to the default for the current mode. 326 /// set via `HYDRANT_ENABLE_CRAWLER`. 327 pub enable_crawler: Option<bool>, 328 /// maximum number of repos allowed in the backfill pending queue before the crawler pauses. 329 /// set via `HYDRANT_CRAWLER_MAX_PENDING_REPOS`. 330 pub crawler_max_pending_repos: usize, 331 /// pending queue size at which the crawler resumes after being paused. 332 /// set via `HYDRANT_CRAWLER_RESUME_PENDING_REPOS`. 333 pub crawler_resume_pending_repos: usize, 334 /// crawler sources: each entry pairs a URL with a discovery mode. 335 /// 336 /// set via `HYDRANT_CRAWLER_URLS` as a comma-separated list of `[mode::]url` entries, 337 /// e.g. `relay::wss://bsky.network,by_collection::https://lightrail.microcosm.blue`. 338 /// a bare URL without a `mode::` prefix uses the default mode (`relay` for full-network, 339 /// `by_collection` otherwise). defaults to the relay hosts with the default mode. 340 /// set to an empty string to disable crawling entirely. 341 pub crawler_sources: Vec<CrawlerSource>, 342 343 /// signature verification level for incoming commits. 344 /// set via `HYDRANT_VERIFY_SIGNATURES` (`full`, `backfill-only`, or `none`). 345 pub verify_signatures: SignatureVerification, 346 /// number of resolved identities to keep in the in-memory LRU cache. 347 /// set via `HYDRANT_IDENTITY_CACHE_SIZE`. 348 pub identity_cache_size: u64, 349 /// enable MST inversion validation on incoming commits (expensive). 350 /// set via `HYDRANT_VERIFY_MST`. 351 pub verify_mst: bool, 352 /// clock drift window for future-rev rejection, in seconds. 353 /// commits with a rev timestamp more than this many seconds in the future are rejected. 354 /// set via `HYDRANT_REV_CLOCK_SKEW`. default: 300 (5 minutes). 355 pub rev_clock_skew_secs: i64, 356 357 /// NSID patterns that trigger auto-discovery in filter mode (e.g. `app.bsky.feed.post`). 358 /// set via `HYDRANT_FILTER_SIGNALS` as a comma-separated list. 359 pub filter_signals: Option<Vec<String>>, 360 /// NSID patterns used to filter which record collections are stored. 361 /// if `None`, all collections are stored. set via `HYDRANT_FILTER_COLLECTIONS`. 362 pub filter_collections: Option<Vec<String>>, 363 /// DIDs that are always skipped, regardless of mode. 364 /// set via `HYDRANT_FILTER_EXCLUDES` as a comma-separated list. 365 pub filter_excludes: Option<Vec<String>>, 366 367 /// enable backlinks indexing (only meaningful in non-ephemeral mode). 368 /// set via `HYDRANT_ENABLE_BACKLINKS=true`. 369 pub enable_backlinks: bool, 370 371 /// if `true`, record blocks are not stored; only the index (records, counts, events) is kept. 372 /// `getRecord`, `listRecords`, and `getRepo` will return errors when this is enabled. 373 /// event stream still functions but create/update events will not include record values. 374 /// only valid in indexer mode (not relay). 375 /// set via `HYDRANT_ONLY_INDEX_LINKS=true`. 376 pub only_index_links: bool, 377 378 /// maximum number of new PDS sources that may be added (via seeding or API) in a single 379 /// UTC calendar day. `None` means unlimited. 380 /// set via `HYDRANT_NEW_HOST_LIMIT`. 381 pub new_host_limit: Option<u64>, 382 383 /// how often offline firehose sources are automatically retried. 384 /// set via `HYDRANT_OFFLINE_HOST_RETRY_INTERVAL` (humantime duration, e.g. `30min`). 385 /// set to `none` to disable automatic retries. 386 pub offline_host_retry_interval: Option<Duration>, 387 388 /// base URL(s) of relay or aggregator services to seed firehose PDS sources from at startup. 389 /// 390 /// hydrant calls `com.atproto.sync.listHosts` on each URL and adds the returned PDSes 391 /// as firehose sources (with `is_pds = true`). account counts from the response are 392 /// applied to newly-seen hosts to initialise rate-limiting immediately. 393 /// 394 /// set via `HYDRANT_SEED_HOSTS` as a comma-separated list of base URLs. 395 pub seed_hosts: Vec<Url>, 396 /// named rate tier definitions for PDS rate limiting. 397 /// 398 /// built-in tiers ("default" and "trusted") are always present and may be overridden. 399 /// set via `HYDRANT_RATE_TIERS` as a comma-separated list of `name:base/mul/hourly/daily` entries, 400 /// e.g. `trusted:5000/10.0/18000000/432000000,custom:100/1.0/7200000/172800000`. 401 /// 402 /// built from `HYDRANT_TIER_RULES` and `HYDRANT_RATE_TIERS` at startup. 403 pub tier_policy: TierPolicy, 404 405 /// glob rules mapping host patterns to named rate tiers. 406 /// 407 /// set via `HYDRANT_TIER_RULES` as a comma-separated list of `pattern:tiername` entries, 408 /// e.g. `*.bsky.network:trusted,pds.example.com:custom`. rules are evaluated in order; 409 /// api-assigned per-host overrides always take priority over these rules. 410 pub tier_rules: Vec<(String, String)>, 411 412 /// db internals, tune only if you know what you're doing. 413 /// 414 /// size of the fjall block cache in MB. set via `HYDRANT_CACHE_SIZE`. 415 pub cache_size: u64, 416 /// db internals, tune only if you know what you're doing. 417 /// 418 /// compression algorithm for data keyspaces (blocks, records, repos, events). 419 /// set via `HYDRANT_DATA_COMPRESSION` (`lz4`, `zstd`, or `none`). 420 pub data_compression: Compression, 421 /// db internals, tune only if you know what you're doing. 422 /// 423 /// compression algorithm for the fjall journal. 424 /// set via `HYDRANT_JOURNAL_COMPRESSION` (`lz4`, `zstd`, or `none`). 425 pub journal_compression: Compression, 426 /// db internals, tune only if you know what you're doing. 427 /// 428 /// number of background threads used by the fjall storage engine. 429 /// set via `HYDRANT_DB_WORKER_THREADS`. 430 pub db_worker_threads: usize, 431 /// db internals, tune only if you know what you're doing. 432 /// 433 /// maximum total size of the fjall journal in MB before a flush is forced. 434 /// set via `HYDRANT_DB_MAX_JOURNALING_SIZE_MB`. 435 pub db_max_journaling_size_mb: u64, 436 /// db internals, tune only if you know what you're doing. 437 /// 438 /// in-memory write buffer (memtable) size for the blocks keyspace in MB. 439 /// set via `HYDRANT_DB_BLOCKS_MEMTABLE_SIZE_MB`. 440 pub db_blocks_memtable_size_mb: u64, 441 /// db internals, tune only if you know what you're doing. 442 /// 443 /// in-memory write buffer (memtable) size for the repos keyspace in MB. 444 /// set via `HYDRANT_DB_REPOS_MEMTABLE_SIZE_MB`. 445 pub db_repos_memtable_size_mb: u64, 446 /// db internals, tune only if you know what you're doing. 447 /// 448 /// in-memory write buffer (memtable) size for the events keyspace in MB. 449 /// set via `HYDRANT_DB_EVENTS_MEMTABLE_SIZE_MB`. 450 pub db_events_memtable_size_mb: u64, 451 /// db internals, tune only if you know what you're doing. 452 /// 453 /// in-memory write buffer (memtable) size for the records keyspace in MB. 454 /// set via `HYDRANT_DB_RECORDS_MEMTABLE_SIZE_MB`. 455 pub db_records_memtable_size_mb: u64, 456} 457 458impl Default for Config { 459 fn default() -> Self { 460 const BASE_MEMTABLE_MB: u64 = 32; 461 Self { 462 database_path: PathBuf::from("./hydrant.db"), 463 #[cfg(feature = "indexer")] 464 ephemeral: false, 465 #[cfg(feature = "relay")] 466 ephemeral: true, 467 #[cfg(feature = "indexer")] 468 ephemeral_ttl: Duration::from_secs(3600), // 1 hour 469 #[cfg(feature = "relay")] 470 ephemeral_ttl: Duration::from_secs(3600 * 24 * 3), // 3 days 471 #[cfg(not(feature = "relay"))] 472 full_network: false, 473 #[cfg(feature = "relay")] 474 full_network: true, 475 #[cfg(not(feature = "relay"))] 476 relays: vec![FirehoseSource { 477 url: Url::parse("wss://relay.fire.hose.cam/").unwrap(), 478 is_pds: false, 479 }], 480 #[cfg(feature = "relay")] 481 relays: vec![], 482 #[cfg(not(feature = "relay"))] 483 seed_hosts: vec![], 484 #[cfg(feature = "relay")] 485 seed_hosts: vec![Url::parse("https://bsky.network").unwrap()], 486 plc_urls: vec![Url::parse("https://plc.wtf").unwrap()], 487 enable_firehose: true, 488 firehose_workers: 8, 489 cursor_save_interval: Duration::from_secs(3), 490 repo_fetch_timeout: Duration::from_secs(300), 491 backfill_concurrency_limit: 16, 492 enable_crawler: None, 493 crawler_max_pending_repos: 2000, 494 crawler_resume_pending_repos: 1000, 495 crawler_sources: vec![CrawlerSource { 496 url: Url::parse("https://lightrail.microcosm.blue").unwrap(), 497 mode: CrawlerMode::ByCollection, 498 }], 499 verify_signatures: SignatureVerification::Full, 500 identity_cache_size: 1_000_000, 501 verify_mst: false, 502 rev_clock_skew_secs: 300, 503 filter_signals: None, 504 filter_collections: None, 505 filter_excludes: None, 506 enable_backlinks: false, 507 only_index_links: false, 508 new_host_limit: Some(50), 509 offline_host_retry_interval: Some(Duration::from_secs(30 * 60)), 510 tier_rules: vec![], 511 tier_policy: { 512 let mut tiers = HashMap::new(); 513 tiers.insert(SmolStr::new("default"), RateTier::default_tier()); 514 tiers.insert(SmolStr::new("trusted"), RateTier::trusted()); 515 TierPolicy { 516 tiers, 517 rules: vec![], 518 } 519 }, 520 cache_size: 256, 521 data_compression: Compression::Zstd, 522 journal_compression: Compression::Lz4, 523 db_worker_threads: 4, 524 db_max_journaling_size_mb: 400, 525 db_blocks_memtable_size_mb: BASE_MEMTABLE_MB, 526 db_repos_memtable_size_mb: BASE_MEMTABLE_MB / 2, 527 db_events_memtable_size_mb: BASE_MEMTABLE_MB, 528 db_records_memtable_size_mb: BASE_MEMTABLE_MB / 3 * 2, 529 } 530 } 531} 532 533impl Config { 534 /// returns the default config for full network usage. 535 pub fn full_network() -> Self { 536 const BASE_MEMTABLE_MB: u64 = 192; 537 Self { 538 full_network: true, 539 plc_urls: vec![Url::parse("https://plc.directory").unwrap()], 540 firehose_workers: 24, 541 backfill_concurrency_limit: 64, 542 crawler_sources: vec![CrawlerSource { 543 url: Url::parse("wss://relay.fire.hose.cam/").unwrap(), 544 mode: CrawlerMode::ListRepos, 545 }], 546 db_worker_threads: 8, 547 db_max_journaling_size_mb: 1024, 548 db_blocks_memtable_size_mb: BASE_MEMTABLE_MB, 549 db_repos_memtable_size_mb: BASE_MEMTABLE_MB / 4, 550 db_events_memtable_size_mb: BASE_MEMTABLE_MB, 551 db_records_memtable_size_mb: BASE_MEMTABLE_MB / 3 * 2, 552 ..Self::default() 553 } 554 } 555 556 /// reads and builds the config from environment variables, loading `.env` first if present. 557 pub fn from_env() -> Result<Self> { 558 load_dotenv(); 559 560 // full_network is read first since it determines which defaults to use. 561 // relay mode defaults to true so that the network is indexed by default. 562 #[cfg(feature = "relay")] 563 let default_full_network = true; 564 #[cfg(not(feature = "relay"))] 565 let default_full_network = false; 566 let full_network: bool = cfg!("FULL_NETWORK", default_full_network); 567 let defaults = full_network 568 .then(Self::full_network) 569 .unwrap_or_else(Self::default); 570 571 let relay_hosts = match std::env::var("HYDRANT_RELAY_HOSTS") { 572 Ok(hosts) if !hosts.trim().is_empty() => hosts 573 .split(',') 574 .filter_map(|s| { 575 let s = s.trim(); 576 (!s.is_empty()) 577 .then(|| { 578 FirehoseSource::parse(s).or_else(|| { 579 tracing::warn!("invalid relay host URL: {s}"); 580 None 581 }) 582 }) 583 .flatten() 584 }) 585 .collect(), 586 // HYDRANT_RELAY_HOSTS explicitly set to "" 587 Ok(_) => vec![], 588 // not set at all, fall back to RELAY_HOST (bare URL, no pds:: prefix support here) 589 Err(_) => match std::env::var("HYDRANT_RELAY_HOST") { 590 Ok(s) if !s.trim().is_empty() => { 591 FirehoseSource::parse(s.trim()).into_iter().collect() 592 } 593 _ => defaults.relays.clone(), 594 }, 595 }; 596 597 let plc_urls: Vec<Url> = std::env::var("HYDRANT_PLC_URL") 598 .ok() 599 .map(|s| { 600 s.split(',') 601 .map(|s| Url::parse(s.trim())) 602 .collect::<Result<Vec<_>, _>>() 603 .map_err(|e| miette::miette!("invalid PLC URL: {e}")) 604 }) 605 .unwrap_or_else(|| Ok(defaults.plc_urls.clone()))?; 606 607 let cursor_save_interval = cfg!("CURSOR_SAVE_INTERVAL", defaults.cursor_save_interval, sec); 608 let repo_fetch_timeout = cfg!("REPO_FETCH_TIMEOUT", defaults.repo_fetch_timeout, sec); 609 610 let ephemeral: bool = cfg!("EPHEMERAL", defaults.ephemeral); 611 let ephemeral_ttl = cfg!("EPHEMERAL_TTL", defaults.ephemeral_ttl, sec); 612 let database_path = cfg!("DATABASE_PATH", defaults.database_path); 613 let cache_size = cfg!("CACHE_SIZE", defaults.cache_size); 614 let data_compression = cfg!("DATA_COMPRESSION", defaults.data_compression); 615 let journal_compression = cfg!("JOURNAL_COMPRESSION", defaults.journal_compression); 616 617 let verify_signatures = cfg!("VERIFY_SIGNATURES", defaults.verify_signatures); 618 let identity_cache_size = cfg!("IDENTITY_CACHE_SIZE", defaults.identity_cache_size); 619 let verify_mst: bool = cfg!("VERIFY_MST", defaults.verify_mst); 620 let rev_clock_skew_secs: i64 = cfg!("REV_CLOCK_SKEW", defaults.rev_clock_skew_secs); 621 let enable_firehose = cfg!("ENABLE_FIREHOSE", defaults.enable_firehose); 622 let enable_crawler = std::env::var("HYDRANT_ENABLE_CRAWLER") 623 .ok() 624 .and_then(|s| s.parse().ok()); 625 626 let backfill_concurrency_limit = cfg!( 627 "BACKFILL_CONCURRENCY_LIMIT", 628 defaults.backfill_concurrency_limit 629 ); 630 let firehose_workers = cfg!("FIREHOSE_WORKERS", defaults.firehose_workers); 631 632 let db_worker_threads = cfg!("DB_WORKER_THREADS", defaults.db_worker_threads); 633 let db_max_journaling_size_mb = cfg!( 634 "DB_MAX_JOURNALING_SIZE_MB", 635 defaults.db_max_journaling_size_mb 636 ); 637 let db_blocks_memtable_size_mb = cfg!( 638 "DB_BLOCKS_MEMTABLE_SIZE_MB", 639 defaults.db_blocks_memtable_size_mb 640 ); 641 let db_events_memtable_size_mb = cfg!( 642 "DB_EVENTS_MEMTABLE_SIZE_MB", 643 defaults.db_events_memtable_size_mb 644 ); 645 let db_records_memtable_size_mb = cfg!( 646 "DB_RECORDS_MEMTABLE_SIZE_MB", 647 defaults.db_records_memtable_size_mb 648 ); 649 let db_repos_memtable_size_mb = cfg!( 650 "DB_REPOS_MEMTABLE_SIZE_MB", 651 defaults.db_repos_memtable_size_mb 652 ); 653 654 let crawler_max_pending_repos = cfg!( 655 "CRAWLER_MAX_PENDING_REPOS", 656 defaults.crawler_max_pending_repos 657 ); 658 let crawler_resume_pending_repos = cfg!( 659 "CRAWLER_RESUME_PENDING_REPOS", 660 defaults.crawler_resume_pending_repos 661 ); 662 663 let filter_signals = std::env::var("HYDRANT_FILTER_SIGNALS").ok().map(|s| { 664 s.split(',') 665 .map(|s| s.trim().to_string()) 666 .filter(|s| !s.is_empty()) 667 .collect() 668 }); 669 670 let filter_collections = std::env::var("HYDRANT_FILTER_COLLECTIONS").ok().map(|s| { 671 s.split(',') 672 .map(|s| s.trim().to_string()) 673 .filter(|s| !s.is_empty()) 674 .collect() 675 }); 676 677 let filter_excludes = std::env::var("HYDRANT_FILTER_EXCLUDES").ok().map(|s| { 678 s.split(',') 679 .map(|s| s.trim().to_string()) 680 .filter(|s| !s.is_empty()) 681 .collect() 682 }); 683 684 let enable_backlinks: bool = cfg!("ENABLE_BACKLINKS", defaults.enable_backlinks); 685 let only_index_links: bool = cfg!("ONLY_INDEX_LINKS", defaults.only_index_links); 686 let max_pds_added_per_day: Option<u64> = std::env::var("HYDRANT_NEW_HOST_LIMIT") 687 .ok() 688 .and_then(|s| s.parse().ok()); 689 690 let offline_retry_interval: Option<Duration> = 691 match std::env::var("HYDRANT_OFFLINE_HOST_RETRY_INTERVAL") 692 .ok() 693 .as_deref() 694 { 695 None => defaults.offline_host_retry_interval, 696 Some("none") => None, 697 Some(s) => humantime::parse_duration(s) 698 .ok() 699 .or(defaults.offline_host_retry_interval), 700 }; 701 702 // start with built-in tier definitions, then layer in any env-defined overrides. 703 // format: HYDRANT_RATE_TIERS=name:base/mul/hourly/daily,... 704 let mut tiers = defaults.tier_policy.tiers.clone(); 705 if let Ok(s) = std::env::var("HYDRANT_RATE_TIERS") { 706 for entry in s.split(',') { 707 let entry = entry.trim(); 708 if let Some((name, spec)) = entry.split_once(':') { 709 match RateTier::parse(spec) { 710 Some(tier) => { 711 tiers.insert(SmolStr::new(name.trim()), tier); 712 } 713 None => tracing::warn!( 714 "ignoring invalid rate tier '{name}': expected base/mul/hourly/daily format" 715 ), 716 } 717 } 718 } 719 } 720 721 let seed_hosts: Vec<Url> = std::env::var("HYDRANT_SEED_HOSTS") 722 .ok() 723 .map(|s| { 724 s.split(',') 725 .filter_map(|u| { 726 let u = u.trim(); 727 if u.is_empty() { 728 return None; 729 } 730 Url::parse(u).ok().or_else(|| { 731 tracing::warn!("invalid seed host URL: {u}"); 732 None 733 }) 734 }) 735 .collect() 736 }) 737 .unwrap_or_else(|| defaults.seed_hosts.clone()); 738 739 // build ordered glob rules from HYDRANT_TIER_RULES 740 let mut rules: Vec<TierRule> = vec![]; 741 let mut tier_rules: Vec<(String, String)> = vec![]; 742 if let Ok(s) = std::env::var("HYDRANT_TIER_RULES") { 743 for entry in s.split(',') { 744 let entry = entry.trim(); 745 if entry.is_empty() { 746 continue; 747 } 748 if let Some((pattern_str, tier_name)) = entry.split_once(':') { 749 let pattern_str = pattern_str.trim(); 750 let tier_name = tier_name.trim(); 751 match glob::Pattern::new(pattern_str) { 752 Ok(pattern) => { 753 rules.push(TierRule { 754 pattern, 755 tier_name: SmolStr::new(tier_name), 756 }); 757 tier_rules.push((pattern_str.to_string(), tier_name.to_string())); 758 } 759 Err(e) => tracing::warn!( 760 "ignoring invalid tier rule pattern '{pattern_str}': {e}" 761 ), 762 } 763 } 764 } 765 } 766 767 let tier_policy = TierPolicy { tiers, rules }; 768 769 let default_mode = CrawlerMode::default_for(full_network); 770 let crawler_sources = match std::env::var("HYDRANT_CRAWLER_URLS") { 771 Ok(s) => s 772 .split(',') 773 .map(|s| s.trim()) 774 .filter(|s| !s.is_empty()) 775 .filter_map(|s| CrawlerSource::parse(s, default_mode)) 776 .collect(), 777 Err(_) => match default_mode { 778 CrawlerMode::ListRepos => relay_hosts 779 .iter() 780 .map(|source| CrawlerSource { 781 url: source.url.clone(), 782 mode: CrawlerMode::ListRepos, 783 }) 784 .collect(), 785 CrawlerMode::ByCollection => defaults.crawler_sources.clone(), 786 }, 787 }; 788 789 Ok(Self { 790 database_path, 791 full_network, 792 ephemeral, 793 seed_hosts, 794 ephemeral_ttl, 795 relays: relay_hosts, 796 plc_urls, 797 enable_firehose, 798 firehose_workers, 799 cursor_save_interval, 800 repo_fetch_timeout, 801 backfill_concurrency_limit, 802 enable_crawler, 803 crawler_max_pending_repos, 804 crawler_resume_pending_repos, 805 crawler_sources, 806 verify_signatures, 807 identity_cache_size, 808 verify_mst, 809 rev_clock_skew_secs, 810 filter_signals, 811 filter_collections, 812 filter_excludes, 813 enable_backlinks, 814 only_index_links, 815 new_host_limit: max_pds_added_per_day, 816 offline_host_retry_interval: offline_retry_interval, 817 tier_policy, 818 tier_rules, 819 cache_size, 820 data_compression, 821 journal_compression, 822 db_worker_threads, 823 db_max_journaling_size_mb, 824 db_blocks_memtable_size_mb, 825 db_repos_memtable_size_mb, 826 db_events_memtable_size_mb, 827 db_records_memtable_size_mb, 828 }) 829 } 830} 831 832macro_rules! config_line { 833 ($f:expr, $label:expr, $value:expr) => { 834 writeln!($f, " {:<width$}{}", $label, $value, width = LABEL_WIDTH) 835 }; 836} 837 838impl fmt::Display for Config { 839 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 840 const LABEL_WIDTH: usize = 27; 841 842 writeln!(f, "hydrant configuration:")?; 843 config_line!( 844 f, 845 "relay hosts", 846 format_args!( 847 "{:?}", 848 self.relays 849 .iter() 850 .map(|s| if s.is_pds { 851 format!("pds::{}", s.url) 852 } else { 853 s.url.to_string() 854 }) 855 .collect::<Vec<_>>() 856 ) 857 )?; 858 config_line!(f, "plc urls", format_args!("{:?}", self.plc_urls))?; 859 config_line!(f, "full network indexing", self.full_network)?; 860 config_line!(f, "verify signatures", self.verify_signatures)?; 861 config_line!(f, "backfill concurrency", self.backfill_concurrency_limit)?; 862 config_line!(f, "identity cache size", self.identity_cache_size)?; 863 config_line!( 864 f, 865 "cursor save interval", 866 format_args!("{}sec", self.cursor_save_interval.as_secs()) 867 )?; 868 config_line!( 869 f, 870 "repo fetch timeout", 871 format_args!("{}sec", self.repo_fetch_timeout.as_secs()) 872 )?; 873 config_line!(f, "ephemeral", self.ephemeral)?; 874 config_line!(f, "database path", self.database_path.to_string_lossy())?; 875 config_line!(f, "cache size", format_args!("{} mb", self.cache_size))?; 876 config_line!(f, "data compression", self.data_compression)?; 877 config_line!(f, "journal compression", self.journal_compression)?; 878 config_line!(f, "firehose workers", self.firehose_workers)?; 879 config_line!(f, "db worker threads", self.db_worker_threads)?; 880 config_line!( 881 f, 882 "db journal size", 883 format_args!("{} mb", self.db_max_journaling_size_mb) 884 )?; 885 config_line!( 886 f, 887 "db blocks memtable", 888 format_args!("{} mb", self.db_blocks_memtable_size_mb) 889 )?; 890 config_line!( 891 f, 892 "db repos memtable", 893 format_args!("{} mb", self.db_repos_memtable_size_mb) 894 )?; 895 config_line!( 896 f, 897 "db events memtable", 898 format_args!("{} mb", self.db_events_memtable_size_mb) 899 )?; 900 config_line!( 901 f, 902 "db records memtable", 903 format_args!("{} mb", self.db_records_memtable_size_mb) 904 )?; 905 config_line!(f, "crawler max pending", self.crawler_max_pending_repos)?; 906 config_line!( 907 f, 908 "crawler resume pending", 909 self.crawler_resume_pending_repos 910 )?; 911 if !self.crawler_sources.is_empty() { 912 let sources: Vec<_> = self 913 .crawler_sources 914 .iter() 915 .map(|s| format!("{}::{}", s.mode, s.url)) 916 .collect(); 917 config_line!(f, "crawler sources", sources.join(", "))?; 918 } 919 if let Some(signals) = &self.filter_signals { 920 config_line!(f, "filter signals", format_args!("{:?}", signals))?; 921 } 922 if let Some(collections) = &self.filter_collections { 923 config_line!(f, "filter collections", format_args!("{:?}", collections))?; 924 } 925 if let Some(excludes) = &self.filter_excludes { 926 config_line!(f, "filter excludes", format_args!("{:?}", excludes))?; 927 } 928 if self.enable_backlinks { 929 config_line!(f, "backlinks", "enabled")?; 930 } 931 if self.only_index_links { 932 config_line!(f, "only index links", "true")?; 933 } 934 if !self.seed_hosts.is_empty() { 935 config_line!( 936 f, 937 "seed hosts", 938 format_args!( 939 "{:?}", 940 self.seed_hosts 941 .iter() 942 .map(|u| u.as_str()) 943 .collect::<Vec<_>>() 944 ) 945 )?; 946 } 947 if let Some(limit) = self.new_host_limit { 948 config_line!(f, "max pds/day", limit)?; 949 } 950 match self.offline_host_retry_interval { 951 Some(d) => config_line!( 952 f, 953 "offline retry interval", 954 format_args!("{}sec", d.as_secs()) 955 )?, 956 None => config_line!(f, "offline retry interval", "disabled")?, 957 } 958 Ok(()) 959 } 960}