very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust
fjall
at-protocol
atproto
indexer
1use crate::pds_meta::{TierPolicy, TierRule};
2use miette::Result;
3use serde::{Deserialize, Serialize};
4use smol_str::{SmolStr, ToSmolStr};
5use std::collections::HashMap;
6use std::fmt;
7use std::path::PathBuf;
8use std::str::FromStr;
9use std::time::Duration;
10use url::Url;
11
12/// rate limit parameters for a named tier of PDS connections.
13///
14/// the per-second limit is `max(per_second_base, accounts * per_second_account_mul)`,
15/// giving a floor at `per_second_base` that scales up with the PDS's active account count.
16#[derive(Debug, Clone, Copy)]
17pub struct RateTier {
18 /// floor for the per-second limit, regardless of account count.
19 pub per_second_base: u64,
20 /// per-second events allowed per active account on this PDS.
21 pub per_second_account_mul: f64,
22 /// per-hour limit.
23 pub per_hour: u64,
24 /// per-day limit.
25 pub per_day: u64,
26 /// maximum active account limit for this host before dropping tracking of new accounts
27 pub account_limit: Option<u64>,
28}
29
30impl RateTier {
31 /// built-in "trusted" tier: high limits for well-behaved PDS operators.
32 pub fn trusted() -> Self {
33 Self {
34 per_second_base: 5000,
35 per_second_account_mul: 10.0,
36 per_hour: 5000 * 3600,
37 per_day: 5000 * 86400,
38 account_limit: Some(10_000_000),
39 }
40 }
41
42 /// built-in "default" tier: conservative limits for unknown PDS operators.
43 pub fn default_tier() -> Self {
44 Self {
45 per_second_base: 50,
46 per_second_account_mul: 0.5,
47 per_hour: 1000 * 3600,
48 per_day: 1000 * 86400,
49 account_limit: Some(100),
50 }
51 }
52
53 /// parse `base/mul/hourly/daily[/account_limit]` format used by `HYDRANT_RATE_TIERS`.
54 fn parse(s: &str) -> Option<Self> {
55 let parts: Vec<&str> = s.split('/').collect();
56 if parts.len() < 4 || parts.len() > 5 {
57 return None;
58 }
59 Some(Self {
60 per_second_base: parts[0].parse().ok()?,
61 per_second_account_mul: parts[1].parse().ok()?,
62 per_hour: parts[2].parse().ok()?,
63 per_day: parts[3].parse().ok()?,
64 account_limit: parts.get(4).and_then(|p| p.parse().ok()),
65 })
66 }
67}
68
69/// this is for internal use only, please don't use this macro.
70#[doc(hidden)]
71#[macro_export]
72macro_rules! __cfg {
73 (@val $key:expr) => {
74 std::env::var(concat!("HYDRANT_", $key))
75 };
76 ($key:expr, $default:expr, sec) => {
77 cfg!(@val $key)
78 .ok()
79 .and_then(|s| humantime::parse_duration(&s).ok())
80 .unwrap_or($default)
81 };
82 ($key:expr, $default:expr) => {
83 cfg!(@val $key)
84 .ok()
85 .and_then(|s| s.parse().ok())
86 .unwrap_or($default.to_owned())
87 .into()
88 };
89}
90use crate::__cfg as cfg;
91
92/// loads `.env` from the current directory, setting any variables not already in the environment.
93fn load_dotenv() {
94 let Ok(contents) = std::fs::read_to_string(".env") else {
95 return;
96 };
97 for line in contents.lines() {
98 let line = line.trim();
99 if line.is_empty() || line.starts_with('#') {
100 continue;
101 }
102 let Some((key, val)) = line.split_once('=') else {
103 continue;
104 };
105 let key = key.trim();
106 let val = val.trim();
107 let val = val
108 .strip_prefix('"')
109 .and_then(|v| v.strip_suffix('"'))
110 .or_else(|| val.strip_prefix('\'').and_then(|v| v.strip_suffix('\'')))
111 .unwrap_or(val);
112 if std::env::var(key).is_err() {
113 // SAFETY: single-threaded at startup; no other threads are reading env yet.
114 unsafe { std::env::set_var(key, val) };
115 }
116 }
117}
118
119#[derive(Debug, Clone, Copy, PartialEq, Eq)]
120pub enum CrawlerMode {
121 /// enumerate via `com.atproto.sync.listRepos`, check signals with `describeRepo`.
122 ListRepos,
123 /// enumerate via `com.atproto.sync.listReposByCollection` for each configured signal.
124 /// note: if no signals are specified, this won't crawl for any repos.
125 ByCollection,
126}
127
128impl CrawlerMode {
129 fn default_for(full_network: bool) -> Self {
130 full_network
131 .then_some(Self::ListRepos)
132 .unwrap_or(Self::ByCollection)
133 }
134}
135
136impl Serialize for CrawlerMode {
137 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
138 where
139 S: serde::Serializer,
140 {
141 serializer.serialize_str(&self.to_smolstr())
142 }
143}
144
145impl<'de> Deserialize<'de> for CrawlerMode {
146 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
147 where
148 D: serde::Deserializer<'de>,
149 {
150 let s = String::deserialize(deserializer)?;
151 FromStr::from_str(&s).map_err(serde::de::Error::custom)
152 }
153}
154
155impl FromStr for CrawlerMode {
156 type Err = miette::Error;
157 fn from_str(s: &str) -> Result<Self> {
158 match s {
159 "list_repos" | "list-repos" => Ok(Self::ListRepos),
160 "by_collection" | "by-collection" => Ok(Self::ByCollection),
161 _ => Err(miette::miette!(
162 "invalid crawler mode: expected 'list_repos' or 'by_collection'"
163 )),
164 }
165 }
166}
167
168impl fmt::Display for CrawlerMode {
169 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
170 match self {
171 Self::ListRepos => write!(f, "list_repos"),
172 Self::ByCollection => write!(f, "by_collection"),
173 }
174 }
175}
176
177/// a single crawler source: a URL and the mode used to enumerate it.
178#[derive(Debug, Clone)]
179pub struct CrawlerSource {
180 pub url: Url,
181 pub mode: CrawlerMode,
182}
183
184/// a single firehose source: a URL and whether it is a direct PDS connection.
185///
186/// set via `HYDRANT_RELAY_HOSTS` as a comma-separated list of `[pds::]url` entries.
187/// e.g. `wss://bsky.network,pds::wss://pds.example.com`.
188/// a bare URL (no `pds::` prefix) is treated as an aggregating relay (`is_pds = false`).
189#[derive(Debug, Clone)]
190pub struct FirehoseSource {
191 pub url: Url,
192 /// true when this is a direct PDS connection; enables host authority enforcement.
193 pub is_pds: bool,
194}
195
196impl FirehoseSource {
197 /// parse `[pds::]url`. the `pds::` prefix marks the source as a direct PDS connection.
198 pub fn parse(s: &str) -> Option<Self> {
199 if let Some(url_str) = s.strip_prefix("pds::") {
200 let url = Url::parse(url_str).ok()?;
201 Some(Self { url, is_pds: true })
202 } else {
203 let url = Url::parse(s).ok()?;
204 Some(Self { url, is_pds: false })
205 }
206 }
207}
208
209impl CrawlerSource {
210 /// parse `[mode::]url`. mode prefix is optional, falls back to `default_mode`.
211 fn parse(s: &str, default_mode: CrawlerMode) -> Option<Self> {
212 if let Some((prefix, rest)) = s.split_once("::") {
213 let mode = prefix.parse().ok()?;
214 let url = Url::parse(rest).ok()?;
215 Some(Self { url, mode })
216 } else {
217 let url = Url::parse(s).ok()?;
218 Some(Self {
219 url,
220 mode: default_mode,
221 })
222 }
223 }
224}
225
226#[derive(Debug, Clone, Copy, PartialEq, Eq)]
227pub enum Compression {
228 Lz4,
229 Zstd,
230 None,
231}
232
233impl FromStr for Compression {
234 type Err = miette::Error;
235 fn from_str(s: &str) -> Result<Self> {
236 match s {
237 "lz4" => Ok(Self::Lz4),
238 "zstd" => Ok(Self::Zstd),
239 "none" => Ok(Self::None),
240 _ => Err(miette::miette!("invalid compression type")),
241 }
242 }
243}
244
245impl fmt::Display for Compression {
246 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
247 match self {
248 Self::Lz4 => write!(f, "lz4"),
249 Self::Zstd => write!(f, "zstd"),
250 Self::None => write!(f, "none"),
251 }
252 }
253}
254
255#[derive(Debug, Clone, Copy)]
256pub enum SignatureVerification {
257 /// verify all commits, from the firehose and when backfilling a repo from a PDS.
258 Full,
259 /// only verify commits when backfilling a repo from a PDS.
260 BackfillOnly,
261 /// don't verify anything.
262 None,
263}
264
265impl FromStr for SignatureVerification {
266 type Err = miette::Error;
267 fn from_str(s: &str) -> Result<Self> {
268 match s {
269 "full" => Ok(Self::Full),
270 "backfill-only" => Ok(Self::BackfillOnly),
271 "none" => Ok(Self::None),
272 _ => Err(miette::miette!("invalid signature verification level")),
273 }
274 }
275}
276
277impl fmt::Display for SignatureVerification {
278 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
279 match self {
280 Self::Full => write!(f, "full"),
281 Self::BackfillOnly => write!(f, "backfill-only"),
282 Self::None => write!(f, "none"),
283 }
284 }
285}
286
287#[derive(Debug, Clone)]
288pub struct Config {
289 /// path to the database folder. set via `HYDRANT_DATABASE_PATH`.
290 pub database_path: PathBuf,
291 /// if `true`, discovers and indexes all repositories in the network.
292 /// set via `HYDRANT_FULL_NETWORK`.
293 pub full_network: bool,
294 /// if `true`, no records are stored; events are deleted after `ephemeral_ttl`.
295 /// set via `HYDRANT_EPHEMERAL`.
296 pub ephemeral: bool,
297 /// how long events are retained in ephemeral mode before deletion.
298 /// set via `HYDRANT_EPHEMERAL_TTL` (humantime duration, e.g. `60min`).
299 pub ephemeral_ttl: Duration,
300
301 /// firehose sources for ingestion. set via `HYDRANT_RELAY_HOST` (single)
302 /// or `HYDRANT_RELAY_HOSTS` (comma-separated; takes precedence).
303 /// prefix a URL with `pds::` to mark it as a direct PDS connection.
304 pub relays: Vec<FirehoseSource>,
305 /// base URL(s) of the PLC directory (comma-separated for multiple).
306 /// defaults to `https://plc.wtf`, or `https://plc.directory` in full-network mode.
307 /// set via `HYDRANT_PLC_URL`.
308 pub plc_urls: Vec<Url>,
309 /// whether to ingest events from relay firehose subscriptions.
310 /// set via `HYDRANT_ENABLE_FIREHOSE`.
311 pub enable_firehose: bool,
312 /// number of concurrent workers processing firehose events.
313 /// set via `HYDRANT_FIREHOSE_WORKERS`.
314 pub firehose_workers: usize,
315 /// how often the firehose cursor is persisted to disk.
316 /// set via `HYDRANT_CURSOR_SAVE_INTERVAL` (humantime duration, e.g. `3sec`).
317 pub cursor_save_interval: Duration,
318 /// timeout for fetching a full repository CAR during backfill.
319 /// set via `HYDRANT_REPO_FETCH_TIMEOUT` (humantime duration, e.g. `5min`).
320 pub repo_fetch_timeout: Duration,
321 /// maximum number of concurrent backfill tasks.
322 /// set via `HYDRANT_BACKFILL_CONCURRENCY_LIMIT`.
323 pub backfill_concurrency_limit: usize,
324
325 /// whether to run the network crawler. `None` defers to the default for the current mode.
326 /// set via `HYDRANT_ENABLE_CRAWLER`.
327 pub enable_crawler: Option<bool>,
328 /// maximum number of repos allowed in the backfill pending queue before the crawler pauses.
329 /// set via `HYDRANT_CRAWLER_MAX_PENDING_REPOS`.
330 pub crawler_max_pending_repos: usize,
331 /// pending queue size at which the crawler resumes after being paused.
332 /// set via `HYDRANT_CRAWLER_RESUME_PENDING_REPOS`.
333 pub crawler_resume_pending_repos: usize,
334 /// crawler sources: each entry pairs a URL with a discovery mode.
335 ///
336 /// set via `HYDRANT_CRAWLER_URLS` as a comma-separated list of `[mode::]url` entries,
337 /// e.g. `relay::wss://bsky.network,by_collection::https://lightrail.microcosm.blue`.
338 /// a bare URL without a `mode::` prefix uses the default mode (`relay` for full-network,
339 /// `by_collection` otherwise). defaults to the relay hosts with the default mode.
340 /// set to an empty string to disable crawling entirely.
341 pub crawler_sources: Vec<CrawlerSource>,
342
343 /// signature verification level for incoming commits.
344 /// set via `HYDRANT_VERIFY_SIGNATURES` (`full`, `backfill-only`, or `none`).
345 pub verify_signatures: SignatureVerification,
346 /// number of resolved identities to keep in the in-memory LRU cache.
347 /// set via `HYDRANT_IDENTITY_CACHE_SIZE`.
348 pub identity_cache_size: u64,
349 /// enable MST inversion validation on incoming commits (expensive).
350 /// set via `HYDRANT_VERIFY_MST`.
351 pub verify_mst: bool,
352 /// clock drift window for future-rev rejection, in seconds.
353 /// commits with a rev timestamp more than this many seconds in the future are rejected.
354 /// set via `HYDRANT_REV_CLOCK_SKEW`. default: 300 (5 minutes).
355 pub rev_clock_skew_secs: i64,
356
357 /// NSID patterns that trigger auto-discovery in filter mode (e.g. `app.bsky.feed.post`).
358 /// set via `HYDRANT_FILTER_SIGNALS` as a comma-separated list.
359 pub filter_signals: Option<Vec<String>>,
360 /// NSID patterns used to filter which record collections are stored.
361 /// if `None`, all collections are stored. set via `HYDRANT_FILTER_COLLECTIONS`.
362 pub filter_collections: Option<Vec<String>>,
363 /// DIDs that are always skipped, regardless of mode.
364 /// set via `HYDRANT_FILTER_EXCLUDES` as a comma-separated list.
365 pub filter_excludes: Option<Vec<String>>,
366
367 /// enable backlinks indexing (only meaningful in non-ephemeral mode).
368 /// set via `HYDRANT_ENABLE_BACKLINKS=true`.
369 pub enable_backlinks: bool,
370
371 /// if `true`, record blocks are not stored; only the index (records, counts, events) is kept.
372 /// `getRecord`, `listRecords`, and `getRepo` will return errors when this is enabled.
373 /// event stream still functions but create/update events will not include record values.
374 /// only valid in indexer mode (not relay).
375 /// set via `HYDRANT_ONLY_INDEX_LINKS=true`.
376 pub only_index_links: bool,
377
378 /// maximum number of new PDS sources that may be added (via seeding or API) in a single
379 /// UTC calendar day. `None` means unlimited.
380 /// set via `HYDRANT_NEW_HOST_LIMIT`.
381 pub new_host_limit: Option<u64>,
382
383 /// how often offline firehose sources are automatically retried.
384 /// set via `HYDRANT_OFFLINE_HOST_RETRY_INTERVAL` (humantime duration, e.g. `30min`).
385 /// set to `none` to disable automatic retries.
386 pub offline_host_retry_interval: Option<Duration>,
387
388 /// base URL(s) of relay or aggregator services to seed firehose PDS sources from at startup.
389 ///
390 /// hydrant calls `com.atproto.sync.listHosts` on each URL and adds the returned PDSes
391 /// as firehose sources (with `is_pds = true`). account counts from the response are
392 /// applied to newly-seen hosts to initialise rate-limiting immediately.
393 ///
394 /// set via `HYDRANT_SEED_HOSTS` as a comma-separated list of base URLs.
395 pub seed_hosts: Vec<Url>,
396 /// named rate tier definitions for PDS rate limiting.
397 ///
398 /// built-in tiers ("default" and "trusted") are always present and may be overridden.
399 /// set via `HYDRANT_RATE_TIERS` as a comma-separated list of `name:base/mul/hourly/daily` entries,
400 /// e.g. `trusted:5000/10.0/18000000/432000000,custom:100/1.0/7200000/172800000`.
401 ///
402 /// built from `HYDRANT_TIER_RULES` and `HYDRANT_RATE_TIERS` at startup.
403 pub tier_policy: TierPolicy,
404
405 /// glob rules mapping host patterns to named rate tiers.
406 ///
407 /// set via `HYDRANT_TIER_RULES` as a comma-separated list of `pattern:tiername` entries,
408 /// e.g. `*.bsky.network:trusted,pds.example.com:custom`. rules are evaluated in order;
409 /// api-assigned per-host overrides always take priority over these rules.
410 pub tier_rules: Vec<(String, String)>,
411
412 /// db internals, tune only if you know what you're doing.
413 ///
414 /// size of the fjall block cache in MB. set via `HYDRANT_CACHE_SIZE`.
415 pub cache_size: u64,
416 /// db internals, tune only if you know what you're doing.
417 ///
418 /// compression algorithm for data keyspaces (blocks, records, repos, events).
419 /// set via `HYDRANT_DATA_COMPRESSION` (`lz4`, `zstd`, or `none`).
420 pub data_compression: Compression,
421 /// db internals, tune only if you know what you're doing.
422 ///
423 /// compression algorithm for the fjall journal.
424 /// set via `HYDRANT_JOURNAL_COMPRESSION` (`lz4`, `zstd`, or `none`).
425 pub journal_compression: Compression,
426 /// db internals, tune only if you know what you're doing.
427 ///
428 /// number of background threads used by the fjall storage engine.
429 /// set via `HYDRANT_DB_WORKER_THREADS`.
430 pub db_worker_threads: usize,
431 /// db internals, tune only if you know what you're doing.
432 ///
433 /// maximum total size of the fjall journal in MB before a flush is forced.
434 /// set via `HYDRANT_DB_MAX_JOURNALING_SIZE_MB`.
435 pub db_max_journaling_size_mb: u64,
436 /// db internals, tune only if you know what you're doing.
437 ///
438 /// in-memory write buffer (memtable) size for the blocks keyspace in MB.
439 /// set via `HYDRANT_DB_BLOCKS_MEMTABLE_SIZE_MB`.
440 pub db_blocks_memtable_size_mb: u64,
441 /// db internals, tune only if you know what you're doing.
442 ///
443 /// in-memory write buffer (memtable) size for the repos keyspace in MB.
444 /// set via `HYDRANT_DB_REPOS_MEMTABLE_SIZE_MB`.
445 pub db_repos_memtable_size_mb: u64,
446 /// db internals, tune only if you know what you're doing.
447 ///
448 /// in-memory write buffer (memtable) size for the events keyspace in MB.
449 /// set via `HYDRANT_DB_EVENTS_MEMTABLE_SIZE_MB`.
450 pub db_events_memtable_size_mb: u64,
451 /// db internals, tune only if you know what you're doing.
452 ///
453 /// in-memory write buffer (memtable) size for the records keyspace in MB.
454 /// set via `HYDRANT_DB_RECORDS_MEMTABLE_SIZE_MB`.
455 pub db_records_memtable_size_mb: u64,
456}
457
458impl Default for Config {
459 fn default() -> Self {
460 const BASE_MEMTABLE_MB: u64 = 32;
461 Self {
462 database_path: PathBuf::from("./hydrant.db"),
463 #[cfg(feature = "indexer")]
464 ephemeral: false,
465 #[cfg(feature = "relay")]
466 ephemeral: true,
467 #[cfg(feature = "indexer")]
468 ephemeral_ttl: Duration::from_secs(3600), // 1 hour
469 #[cfg(feature = "relay")]
470 ephemeral_ttl: Duration::from_secs(3600 * 24 * 3), // 3 days
471 #[cfg(not(feature = "relay"))]
472 full_network: false,
473 #[cfg(feature = "relay")]
474 full_network: true,
475 #[cfg(not(feature = "relay"))]
476 relays: vec![FirehoseSource {
477 url: Url::parse("wss://relay.fire.hose.cam/").unwrap(),
478 is_pds: false,
479 }],
480 #[cfg(feature = "relay")]
481 relays: vec![],
482 #[cfg(not(feature = "relay"))]
483 seed_hosts: vec![],
484 #[cfg(feature = "relay")]
485 seed_hosts: vec![Url::parse("https://bsky.network").unwrap()],
486 plc_urls: vec![Url::parse("https://plc.wtf").unwrap()],
487 enable_firehose: true,
488 firehose_workers: 8,
489 cursor_save_interval: Duration::from_secs(3),
490 repo_fetch_timeout: Duration::from_secs(300),
491 backfill_concurrency_limit: 16,
492 enable_crawler: None,
493 crawler_max_pending_repos: 2000,
494 crawler_resume_pending_repos: 1000,
495 crawler_sources: vec![CrawlerSource {
496 url: Url::parse("https://lightrail.microcosm.blue").unwrap(),
497 mode: CrawlerMode::ByCollection,
498 }],
499 verify_signatures: SignatureVerification::Full,
500 identity_cache_size: 1_000_000,
501 verify_mst: false,
502 rev_clock_skew_secs: 300,
503 filter_signals: None,
504 filter_collections: None,
505 filter_excludes: None,
506 enable_backlinks: false,
507 only_index_links: false,
508 new_host_limit: Some(50),
509 offline_host_retry_interval: Some(Duration::from_secs(30 * 60)),
510 tier_rules: vec![],
511 tier_policy: {
512 let mut tiers = HashMap::new();
513 tiers.insert(SmolStr::new("default"), RateTier::default_tier());
514 tiers.insert(SmolStr::new("trusted"), RateTier::trusted());
515 TierPolicy {
516 tiers,
517 rules: vec![],
518 }
519 },
520 cache_size: 256,
521 data_compression: Compression::Zstd,
522 journal_compression: Compression::Lz4,
523 db_worker_threads: 4,
524 db_max_journaling_size_mb: 400,
525 db_blocks_memtable_size_mb: BASE_MEMTABLE_MB,
526 db_repos_memtable_size_mb: BASE_MEMTABLE_MB / 2,
527 db_events_memtable_size_mb: BASE_MEMTABLE_MB,
528 db_records_memtable_size_mb: BASE_MEMTABLE_MB / 3 * 2,
529 }
530 }
531}
532
533impl Config {
534 /// returns the default config for full network usage.
535 pub fn full_network() -> Self {
536 const BASE_MEMTABLE_MB: u64 = 192;
537 Self {
538 full_network: true,
539 plc_urls: vec![Url::parse("https://plc.directory").unwrap()],
540 firehose_workers: 24,
541 backfill_concurrency_limit: 64,
542 crawler_sources: vec![CrawlerSource {
543 url: Url::parse("wss://relay.fire.hose.cam/").unwrap(),
544 mode: CrawlerMode::ListRepos,
545 }],
546 db_worker_threads: 8,
547 db_max_journaling_size_mb: 1024,
548 db_blocks_memtable_size_mb: BASE_MEMTABLE_MB,
549 db_repos_memtable_size_mb: BASE_MEMTABLE_MB / 4,
550 db_events_memtable_size_mb: BASE_MEMTABLE_MB,
551 db_records_memtable_size_mb: BASE_MEMTABLE_MB / 3 * 2,
552 ..Self::default()
553 }
554 }
555
556 /// reads and builds the config from environment variables, loading `.env` first if present.
557 pub fn from_env() -> Result<Self> {
558 load_dotenv();
559
560 // full_network is read first since it determines which defaults to use.
561 // relay mode defaults to true so that the network is indexed by default.
562 #[cfg(feature = "relay")]
563 let default_full_network = true;
564 #[cfg(not(feature = "relay"))]
565 let default_full_network = false;
566 let full_network: bool = cfg!("FULL_NETWORK", default_full_network);
567 let defaults = full_network
568 .then(Self::full_network)
569 .unwrap_or_else(Self::default);
570
571 let relay_hosts = match std::env::var("HYDRANT_RELAY_HOSTS") {
572 Ok(hosts) if !hosts.trim().is_empty() => hosts
573 .split(',')
574 .filter_map(|s| {
575 let s = s.trim();
576 (!s.is_empty())
577 .then(|| {
578 FirehoseSource::parse(s).or_else(|| {
579 tracing::warn!("invalid relay host URL: {s}");
580 None
581 })
582 })
583 .flatten()
584 })
585 .collect(),
586 // HYDRANT_RELAY_HOSTS explicitly set to ""
587 Ok(_) => vec![],
588 // not set at all, fall back to RELAY_HOST (bare URL, no pds:: prefix support here)
589 Err(_) => match std::env::var("HYDRANT_RELAY_HOST") {
590 Ok(s) if !s.trim().is_empty() => {
591 FirehoseSource::parse(s.trim()).into_iter().collect()
592 }
593 _ => defaults.relays.clone(),
594 },
595 };
596
597 let plc_urls: Vec<Url> = std::env::var("HYDRANT_PLC_URL")
598 .ok()
599 .map(|s| {
600 s.split(',')
601 .map(|s| Url::parse(s.trim()))
602 .collect::<Result<Vec<_>, _>>()
603 .map_err(|e| miette::miette!("invalid PLC URL: {e}"))
604 })
605 .unwrap_or_else(|| Ok(defaults.plc_urls.clone()))?;
606
607 let cursor_save_interval = cfg!("CURSOR_SAVE_INTERVAL", defaults.cursor_save_interval, sec);
608 let repo_fetch_timeout = cfg!("REPO_FETCH_TIMEOUT", defaults.repo_fetch_timeout, sec);
609
610 let ephemeral: bool = cfg!("EPHEMERAL", defaults.ephemeral);
611 let ephemeral_ttl = cfg!("EPHEMERAL_TTL", defaults.ephemeral_ttl, sec);
612 let database_path = cfg!("DATABASE_PATH", defaults.database_path);
613 let cache_size = cfg!("CACHE_SIZE", defaults.cache_size);
614 let data_compression = cfg!("DATA_COMPRESSION", defaults.data_compression);
615 let journal_compression = cfg!("JOURNAL_COMPRESSION", defaults.journal_compression);
616
617 let verify_signatures = cfg!("VERIFY_SIGNATURES", defaults.verify_signatures);
618 let identity_cache_size = cfg!("IDENTITY_CACHE_SIZE", defaults.identity_cache_size);
619 let verify_mst: bool = cfg!("VERIFY_MST", defaults.verify_mst);
620 let rev_clock_skew_secs: i64 = cfg!("REV_CLOCK_SKEW", defaults.rev_clock_skew_secs);
621 let enable_firehose = cfg!("ENABLE_FIREHOSE", defaults.enable_firehose);
622 let enable_crawler = std::env::var("HYDRANT_ENABLE_CRAWLER")
623 .ok()
624 .and_then(|s| s.parse().ok());
625
626 let backfill_concurrency_limit = cfg!(
627 "BACKFILL_CONCURRENCY_LIMIT",
628 defaults.backfill_concurrency_limit
629 );
630 let firehose_workers = cfg!("FIREHOSE_WORKERS", defaults.firehose_workers);
631
632 let db_worker_threads = cfg!("DB_WORKER_THREADS", defaults.db_worker_threads);
633 let db_max_journaling_size_mb = cfg!(
634 "DB_MAX_JOURNALING_SIZE_MB",
635 defaults.db_max_journaling_size_mb
636 );
637 let db_blocks_memtable_size_mb = cfg!(
638 "DB_BLOCKS_MEMTABLE_SIZE_MB",
639 defaults.db_blocks_memtable_size_mb
640 );
641 let db_events_memtable_size_mb = cfg!(
642 "DB_EVENTS_MEMTABLE_SIZE_MB",
643 defaults.db_events_memtable_size_mb
644 );
645 let db_records_memtable_size_mb = cfg!(
646 "DB_RECORDS_MEMTABLE_SIZE_MB",
647 defaults.db_records_memtable_size_mb
648 );
649 let db_repos_memtable_size_mb = cfg!(
650 "DB_REPOS_MEMTABLE_SIZE_MB",
651 defaults.db_repos_memtable_size_mb
652 );
653
654 let crawler_max_pending_repos = cfg!(
655 "CRAWLER_MAX_PENDING_REPOS",
656 defaults.crawler_max_pending_repos
657 );
658 let crawler_resume_pending_repos = cfg!(
659 "CRAWLER_RESUME_PENDING_REPOS",
660 defaults.crawler_resume_pending_repos
661 );
662
663 let filter_signals = std::env::var("HYDRANT_FILTER_SIGNALS").ok().map(|s| {
664 s.split(',')
665 .map(|s| s.trim().to_string())
666 .filter(|s| !s.is_empty())
667 .collect()
668 });
669
670 let filter_collections = std::env::var("HYDRANT_FILTER_COLLECTIONS").ok().map(|s| {
671 s.split(',')
672 .map(|s| s.trim().to_string())
673 .filter(|s| !s.is_empty())
674 .collect()
675 });
676
677 let filter_excludes = std::env::var("HYDRANT_FILTER_EXCLUDES").ok().map(|s| {
678 s.split(',')
679 .map(|s| s.trim().to_string())
680 .filter(|s| !s.is_empty())
681 .collect()
682 });
683
684 let enable_backlinks: bool = cfg!("ENABLE_BACKLINKS", defaults.enable_backlinks);
685 let only_index_links: bool = cfg!("ONLY_INDEX_LINKS", defaults.only_index_links);
686 let max_pds_added_per_day: Option<u64> = std::env::var("HYDRANT_NEW_HOST_LIMIT")
687 .ok()
688 .and_then(|s| s.parse().ok());
689
690 let offline_retry_interval: Option<Duration> =
691 match std::env::var("HYDRANT_OFFLINE_HOST_RETRY_INTERVAL")
692 .ok()
693 .as_deref()
694 {
695 None => defaults.offline_host_retry_interval,
696 Some("none") => None,
697 Some(s) => humantime::parse_duration(s)
698 .ok()
699 .or(defaults.offline_host_retry_interval),
700 };
701
702 // start with built-in tier definitions, then layer in any env-defined overrides.
703 // format: HYDRANT_RATE_TIERS=name:base/mul/hourly/daily,...
704 let mut tiers = defaults.tier_policy.tiers.clone();
705 if let Ok(s) = std::env::var("HYDRANT_RATE_TIERS") {
706 for entry in s.split(',') {
707 let entry = entry.trim();
708 if let Some((name, spec)) = entry.split_once(':') {
709 match RateTier::parse(spec) {
710 Some(tier) => {
711 tiers.insert(SmolStr::new(name.trim()), tier);
712 }
713 None => tracing::warn!(
714 "ignoring invalid rate tier '{name}': expected base/mul/hourly/daily format"
715 ),
716 }
717 }
718 }
719 }
720
721 let seed_hosts: Vec<Url> = std::env::var("HYDRANT_SEED_HOSTS")
722 .ok()
723 .map(|s| {
724 s.split(',')
725 .filter_map(|u| {
726 let u = u.trim();
727 if u.is_empty() {
728 return None;
729 }
730 Url::parse(u).ok().or_else(|| {
731 tracing::warn!("invalid seed host URL: {u}");
732 None
733 })
734 })
735 .collect()
736 })
737 .unwrap_or_else(|| defaults.seed_hosts.clone());
738
739 // build ordered glob rules from HYDRANT_TIER_RULES
740 let mut rules: Vec<TierRule> = vec![];
741 let mut tier_rules: Vec<(String, String)> = vec![];
742 if let Ok(s) = std::env::var("HYDRANT_TIER_RULES") {
743 for entry in s.split(',') {
744 let entry = entry.trim();
745 if entry.is_empty() {
746 continue;
747 }
748 if let Some((pattern_str, tier_name)) = entry.split_once(':') {
749 let pattern_str = pattern_str.trim();
750 let tier_name = tier_name.trim();
751 match glob::Pattern::new(pattern_str) {
752 Ok(pattern) => {
753 rules.push(TierRule {
754 pattern,
755 tier_name: SmolStr::new(tier_name),
756 });
757 tier_rules.push((pattern_str.to_string(), tier_name.to_string()));
758 }
759 Err(e) => tracing::warn!(
760 "ignoring invalid tier rule pattern '{pattern_str}': {e}"
761 ),
762 }
763 }
764 }
765 }
766
767 let tier_policy = TierPolicy { tiers, rules };
768
769 let default_mode = CrawlerMode::default_for(full_network);
770 let crawler_sources = match std::env::var("HYDRANT_CRAWLER_URLS") {
771 Ok(s) => s
772 .split(',')
773 .map(|s| s.trim())
774 .filter(|s| !s.is_empty())
775 .filter_map(|s| CrawlerSource::parse(s, default_mode))
776 .collect(),
777 Err(_) => match default_mode {
778 CrawlerMode::ListRepos => relay_hosts
779 .iter()
780 .map(|source| CrawlerSource {
781 url: source.url.clone(),
782 mode: CrawlerMode::ListRepos,
783 })
784 .collect(),
785 CrawlerMode::ByCollection => defaults.crawler_sources.clone(),
786 },
787 };
788
789 Ok(Self {
790 database_path,
791 full_network,
792 ephemeral,
793 seed_hosts,
794 ephemeral_ttl,
795 relays: relay_hosts,
796 plc_urls,
797 enable_firehose,
798 firehose_workers,
799 cursor_save_interval,
800 repo_fetch_timeout,
801 backfill_concurrency_limit,
802 enable_crawler,
803 crawler_max_pending_repos,
804 crawler_resume_pending_repos,
805 crawler_sources,
806 verify_signatures,
807 identity_cache_size,
808 verify_mst,
809 rev_clock_skew_secs,
810 filter_signals,
811 filter_collections,
812 filter_excludes,
813 enable_backlinks,
814 only_index_links,
815 new_host_limit: max_pds_added_per_day,
816 offline_host_retry_interval: offline_retry_interval,
817 tier_policy,
818 tier_rules,
819 cache_size,
820 data_compression,
821 journal_compression,
822 db_worker_threads,
823 db_max_journaling_size_mb,
824 db_blocks_memtable_size_mb,
825 db_repos_memtable_size_mb,
826 db_events_memtable_size_mb,
827 db_records_memtable_size_mb,
828 })
829 }
830}
831
832macro_rules! config_line {
833 ($f:expr, $label:expr, $value:expr) => {
834 writeln!($f, " {:<width$}{}", $label, $value, width = LABEL_WIDTH)
835 };
836}
837
838impl fmt::Display for Config {
839 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
840 const LABEL_WIDTH: usize = 27;
841
842 writeln!(f, "hydrant configuration:")?;
843 config_line!(
844 f,
845 "relay hosts",
846 format_args!(
847 "{:?}",
848 self.relays
849 .iter()
850 .map(|s| if s.is_pds {
851 format!("pds::{}", s.url)
852 } else {
853 s.url.to_string()
854 })
855 .collect::<Vec<_>>()
856 )
857 )?;
858 config_line!(f, "plc urls", format_args!("{:?}", self.plc_urls))?;
859 config_line!(f, "full network indexing", self.full_network)?;
860 config_line!(f, "verify signatures", self.verify_signatures)?;
861 config_line!(f, "backfill concurrency", self.backfill_concurrency_limit)?;
862 config_line!(f, "identity cache size", self.identity_cache_size)?;
863 config_line!(
864 f,
865 "cursor save interval",
866 format_args!("{}sec", self.cursor_save_interval.as_secs())
867 )?;
868 config_line!(
869 f,
870 "repo fetch timeout",
871 format_args!("{}sec", self.repo_fetch_timeout.as_secs())
872 )?;
873 config_line!(f, "ephemeral", self.ephemeral)?;
874 config_line!(f, "database path", self.database_path.to_string_lossy())?;
875 config_line!(f, "cache size", format_args!("{} mb", self.cache_size))?;
876 config_line!(f, "data compression", self.data_compression)?;
877 config_line!(f, "journal compression", self.journal_compression)?;
878 config_line!(f, "firehose workers", self.firehose_workers)?;
879 config_line!(f, "db worker threads", self.db_worker_threads)?;
880 config_line!(
881 f,
882 "db journal size",
883 format_args!("{} mb", self.db_max_journaling_size_mb)
884 )?;
885 config_line!(
886 f,
887 "db blocks memtable",
888 format_args!("{} mb", self.db_blocks_memtable_size_mb)
889 )?;
890 config_line!(
891 f,
892 "db repos memtable",
893 format_args!("{} mb", self.db_repos_memtable_size_mb)
894 )?;
895 config_line!(
896 f,
897 "db events memtable",
898 format_args!("{} mb", self.db_events_memtable_size_mb)
899 )?;
900 config_line!(
901 f,
902 "db records memtable",
903 format_args!("{} mb", self.db_records_memtable_size_mb)
904 )?;
905 config_line!(f, "crawler max pending", self.crawler_max_pending_repos)?;
906 config_line!(
907 f,
908 "crawler resume pending",
909 self.crawler_resume_pending_repos
910 )?;
911 if !self.crawler_sources.is_empty() {
912 let sources: Vec<_> = self
913 .crawler_sources
914 .iter()
915 .map(|s| format!("{}::{}", s.mode, s.url))
916 .collect();
917 config_line!(f, "crawler sources", sources.join(", "))?;
918 }
919 if let Some(signals) = &self.filter_signals {
920 config_line!(f, "filter signals", format_args!("{:?}", signals))?;
921 }
922 if let Some(collections) = &self.filter_collections {
923 config_line!(f, "filter collections", format_args!("{:?}", collections))?;
924 }
925 if let Some(excludes) = &self.filter_excludes {
926 config_line!(f, "filter excludes", format_args!("{:?}", excludes))?;
927 }
928 if self.enable_backlinks {
929 config_line!(f, "backlinks", "enabled")?;
930 }
931 if self.only_index_links {
932 config_line!(f, "only index links", "true")?;
933 }
934 if !self.seed_hosts.is_empty() {
935 config_line!(
936 f,
937 "seed hosts",
938 format_args!(
939 "{:?}",
940 self.seed_hosts
941 .iter()
942 .map(|u| u.as_str())
943 .collect::<Vec<_>>()
944 )
945 )?;
946 }
947 if let Some(limit) = self.new_host_limit {
948 config_line!(f, "max pds/day", limit)?;
949 }
950 match self.offline_host_retry_interval {
951 Some(d) => config_line!(
952 f,
953 "offline retry interval",
954 format_args!("{}sec", d.as_secs())
955 )?,
956 None => config_line!(f, "offline retry interval", "disabled")?,
957 }
958 Ok(())
959 }
960}