···11#### table-of-contents
2233-> [hydrant](#hydrant)</br>
44--> [vs tap](#vs-tap) | [stream](#stream-behavior) | [multi-relay](#multiple-relay-support) | [crawler sources](#crawler-sources)</br>
44+-> [vs tap](#vs-tap) | [stream](#stream-behavior) | [multi-relay](#multiple-relay-support) | [seeding](#firehose-seeding) | [crawler sources](#crawler-sources)</br>
55-> [configuration](#configuration) | [build features](#build-features)</br>
66-> [rest api](#rest-api) | [filter](#filter-management) | [ingestion](#ingestion-control) | [crawler](#crawler-management) | [firehose](#firehose-management) | [pds](#pds-management) | [repos](#repository-management)</br>
77-> [xrpc api](#data-access-xrpc) | [backlinks](#bluemicrocosmlinks) | [identity](#bluemicrocosmidentity) | [atproto](#comatproto) | [custom](#systemsgazehydrant)
···7878since they forward commits from many PDSes by design. this means you will trust
7979the relay on this though.
80808181+#### firehose seeding
8282+8383+<small>[<- back to toc](#table-of-contents)</small>
8484+8585+in relay mode, `RELAY_HOSTS` defaults to empty. set `SEED_HOSTS` to one or more
8686+relay base URLs and hydrant will call `com.atproto.sync.listHosts` on each at
8787+startup, adding every returned PDS as a firehose source:
8888+8989+```
9090+HYDRANT_SEED_HOSTS=https://bsky.network
9191+```
9292+9393+seeding runs as a background task so the main firehose loop is not blocked. seed
9494+URLs are fetched concurrently (up to four at a time) and the full `listHosts`
9595+pagination is consumed for each. if a request fails partway through, the hosts
9696+collected so far are still added and the failure is logged.
9797+9898+each discovered host is added as a persistent PDS firehose source (`is_pds: true`),
9999+equivalent to calling `POST /firehose/sources`.
100100+101101+banned hosts (`status: "banned"`) are skipped. all other statuses are included
102102+since the firehose ingestor retries on disconnect and transiently-unavailable
103103+hosts will reconnect on their own.
104104+105105+seeding runs from latest cursor on restart so new PDS' added to the upstream relay
106106+since the last start are picked up automatically (if they haven't through firehose).
107107+sources that are already running are detected and skipped, so re-seeding is idempotent.
108108+81109### crawler sources
8211083111<small>[<- back to toc](#table-of-contents)</small>
···117145| :--- | :--- | :--- |
118146| `DATABASE_PATH` | `./hydrant.db` | path to the database folder. |
119147| `RUST_LOG` | `info` | log filter directives (e.g., `debug`, `hydrant=trace`). [`tracing` env-filter syntax](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html). |
120120-| `RELAY_HOST` | `wss://relay.fire.hose.cam/` | URL of the relay (firehose only). |
121121-| `RELAY_HOSTS` | | comma-separated list of firehose sources (firehose only). if unset, falls back to `RELAY_HOST`. prefix a URL with `pds::` to mark it as a direct PDS connection (e.g. `pds::wss://pds.example.com`). bare URLs are treated as relays. |
148148+| `RELAY_HOST` | `wss://relay.fire.hose.cam/` (indexer), empty (relay) | URL of a firehose source. |
149149+| `RELAY_HOSTS` | | comma-separated list of firehose sources. if unset, falls back to `RELAY_HOST`. prefix a URL with `pds::` to mark it as a direct PDS connection (e.g. `pds::wss://pds.example.com`). bare URLs are treated as relays. defaults to empty in relay mode, PDS' are expected to be seeded via `SEED_HOSTS` or the firehose management API. |
150150+| `SEED_HOSTS` | `https://bsky.network` (relay) | comma-separated list of base URLs to call `com.atproto.sync.listHosts` on at startup. hydrant adds every non-banned host as a PDS firehose source. see [firehose seeding](#firehose-seeding). |
122151| `CRAWLER_URLS` | relay hosts in full-network mode, `https://lightrail.microcosm.blue` in filter mode | comma-separated list of `[mode::]url` crawler sources. mode is `relay` or `by_collection`; bare URLs use the default mode. set to empty string to disable crawling. |
123152| `PLC_URL` | `https://plc.wtf`, `https://plc.directory` if full network | base URL(s) of the PLC directory (comma-separated for multiple). |
124153| `EPHEMERAL` | `false` | if enabled, no records are stored. events are deleted after a certain duration (`EPHEMERAL_TTL`). |
125154| `EPHEMERAL_TTL` | `60min`, `3d` in relay mode | decides after how long events should be deleted. |
126126-| `FULL_NETWORK` | `false` | if `true`, discovers and indexes all repositories in the network. |
155155+| `FULL_NETWORK` | `false` (indexer), `true` (relay) | if `true`, discovers and indexes all repositories in the network. |
127156| `FILTER_SIGNALS` | | comma-separated list of NSID patterns to use for the filter (e.g. `app.bsky.feed.post,app.bsky.graph.*`). |
128157| `FILTER_COLLECTIONS` | | comma-separated list of NSID patterns to use for the collections filter. |
129158| `FILTER_EXCLUDES` | | comma-separated list of DIDs to exclude from indexing. |
+57-2
src/config.rs
···362362 /// set via `HYDRANT_ENABLE_BACKLINKS=true`.
363363 pub enable_backlinks: bool,
364364365365+ /// base URL(s) of relay or aggregator services to seed firehose PDS sources from at startup.
366366+ ///
367367+ /// hydrant calls `com.atproto.sync.listHosts` on each URL and adds the returned PDSes
368368+ /// as firehose sources (with `is_pds = true`). account counts from the response are
369369+ /// applied to newly-seen hosts to initialise rate-limiting immediately.
370370+ ///
371371+ /// set via `HYDRANT_SEED_HOSTS` as a comma-separated list of base URLs.
372372+ pub seed_hosts: Vec<Url>,
365373 /// list of trusted PDS/relay hosts to pre-assign to the "trusted" rate tier at startup.
366374 /// set via `HYDRANT_TRUSTED_HOSTS` as a comma-separated list of hostnames.
367375 /// hosts not present in this list use the "default" tier unless assigned via the API.
···424432 const BASE_MEMTABLE_MB: u64 = 32;
425433 Self {
426434 database_path: PathBuf::from("./hydrant.db"),
427427- full_network: false,
428435 ephemeral: false,
429436 #[cfg(feature = "indexer")]
430437 ephemeral_ttl: Duration::from_secs(3600), // 1 hour
431438 #[cfg(feature = "relay")]
432439 ephemeral_ttl: Duration::from_secs(3600 * 24 * 3), // 3 days
440440+ #[cfg(not(feature = "relay"))]
441441+ full_network: false,
442442+ #[cfg(feature = "relay")]
443443+ full_network: true,
444444+ #[cfg(not(feature = "relay"))]
433445 relays: vec![FirehoseSource {
434446 url: Url::parse("wss://relay.fire.hose.cam/").unwrap(),
435447 is_pds: false,
436448 }],
449449+ #[cfg(feature = "relay")]
450450+ relays: vec![],
451451+ #[cfg(not(feature = "relay"))]
452452+ seed_hosts: vec![],
453453+ #[cfg(feature = "relay")]
454454+ seed_hosts: vec![Url::parse("https://bsky.network").unwrap()],
437455 plc_urls: vec![Url::parse("https://plc.wtf").unwrap()],
438456 enable_firehose: true,
439457 firehose_workers: 8,
···503521 load_dotenv();
504522505523 // full_network is read first since it determines which defaults to use.
506506- let full_network: bool = cfg!("FULL_NETWORK", false);
524524+ // relay mode defaults to true so that the network is indexed by default.
525525+ #[cfg(feature = "relay")]
526526+ let default_full_network = true;
527527+ #[cfg(not(feature = "relay"))]
528528+ let default_full_network = false;
529529+ let full_network: bool = cfg!("FULL_NETWORK", default_full_network);
507530 let defaults = full_network
508531 .then(Self::full_network)
509532 .unwrap_or_else(Self::default);
···642665 }
643666 }
644667668668+ let seed_hosts: Vec<Url> = std::env::var("HYDRANT_SEED_HOSTS")
669669+ .ok()
670670+ .map(|s| {
671671+ s.split(',')
672672+ .filter_map(|u| {
673673+ let u = u.trim();
674674+ if u.is_empty() {
675675+ return None;
676676+ }
677677+ Url::parse(u).ok().or_else(|| {
678678+ tracing::warn!("invalid seed host URL: {u}");
679679+ None
680680+ })
681681+ })
682682+ .collect()
683683+ })
684684+ .unwrap_or_else(|| defaults.seed_hosts.clone());
685685+645686 let trusted_hosts = std::env::var("HYDRANT_TRUSTED_HOSTS")
646687 .ok()
647688 .map(|s| {
···676717 database_path,
677718 full_network,
678719 ephemeral,
720720+ seed_hosts,
679721 ephemeral_ttl,
680722 relays: relay_hosts,
681723 plc_urls,
···809851 }
810852 if self.enable_backlinks {
811853 config_line!(f, "backlinks", "enabled")?;
854854+ }
855855+ if !self.seed_hosts.is_empty() {
856856+ config_line!(
857857+ f,
858858+ "seed hosts",
859859+ format_args!(
860860+ "{:?}",
861861+ self.seed_hosts
862862+ .iter()
863863+ .map(|u| u.as_str())
864864+ .collect::<Vec<_>>()
865865+ )
866866+ )?;
812867 }
813868 Ok(())
814869 }
+11
src/control/mod.rs
···55pub(crate) mod firehose;
66pub(crate) mod pds;
77pub(crate) mod repos;
88+mod seed;
89pub(crate) mod stream;
9101011pub use crawler::{CrawlerHandle, CrawlerSourceInfo};
···430431 .tasks
431432 .insert_async(source.url.clone(), handle)
432433 .await;
434434+ }
435435+436436+ // 10c. seed firehose PDS sources from listHosts on configured seed URLs
437437+ if !config.seed_hosts.is_empty() {
438438+ let seed_urls = config.seed_hosts.clone();
439439+ let firehose = firehose.clone();
440440+ let state = state.clone();
441441+ tokio::spawn(async move {
442442+ seed::seed_from_list_hosts(&seed_urls, &firehose, &state).await;
443443+ });
433444 }
434445435446 // 11. spawn crawler infrastructure
+200
src/control/seed.rs
···11+use std::sync::Arc;
22+use std::time::Duration;
33+44+use futures::StreamExt;
55+use jacquard_api::com_atproto::sync::HostStatus;
66+use jacquard_api::com_atproto::sync::list_hosts::ListHostsOutput;
77+use miette::IntoDiagnostic;
88+use tracing::{info, warn};
99+use url::Url;
1010+1111+use super::firehose::FirehoseHandle;
1212+use crate::db::{self, keys};
1313+use crate::state::AppState;
1414+1515+const MAX_CONCURRENT_SEEDS: usize = 4;
1616+1717+/// seed firehose pds sources by calling `com.atproto.sync.listHosts` on each seed URL.
1818+/// banned pds' are not added, everything else is (including offline)
1919+pub(crate) async fn seed_from_list_hosts(
2020+ seed_urls: &[Url],
2121+ firehose: &FirehoseHandle,
2222+ state: &Arc<AppState>,
2323+) {
2424+ info!("will seed urls...");
2525+2626+ let http = reqwest::Client::builder()
2727+ .user_agent(concat!(
2828+ env!("CARGO_PKG_NAME"),
2929+ "/",
3030+ env!("CARGO_PKG_VERSION")
3131+ ))
3232+ .timeout(Duration::from_secs(10))
3333+ .build()
3434+ .expect("that reqwest will build");
3535+3636+ let mut futs = futures::stream::iter(seed_urls.iter().cloned())
3737+ .map(|seed_url| {
3838+ let firehose = firehose.clone();
3939+ let state = state.clone();
4040+ let http = http.clone();
4141+ async move { seed_one(&seed_url, &firehose, &state, &http).await }
4242+ })
4343+ .buffer_unordered(MAX_CONCURRENT_SEEDS);
4444+4545+ while let Some(_) = futs.next().await {}
4646+}
4747+4848+#[tracing::instrument(skip_all, fields(seed_url = %seed_url))]
4949+async fn seed_one(
5050+ seed_url: &Url,
5151+ firehose: &FirehoseHandle,
5252+ state: &Arc<AppState>,
5353+ http: &reqwest::Client,
5454+) {
5555+ let cursor_key = keys::seed_cursor_key(seed_url.as_str());
5656+5757+ // resume from the last saved cursor so we don't re-page through already-seen hosts
5858+ let mut cursor: Option<String> = {
5959+ let ks = state.db.cursors.clone();
6060+ let key = cursor_key.clone();
6161+ match db::Db::get(ks, key).await {
6262+ Ok(Some(b)) => rmp_serde::from_slice::<String>(&b).ok(),
6363+ Ok(None) => None,
6464+ Err(e) => {
6565+ warn!(err = %e, "failed to load seed cursor, starting from scratch");
6666+ None
6767+ }
6868+ }
6969+ };
7070+7171+ if cursor.is_some() {
7272+ info!(cursor = ?cursor, "resuming seed from saved cursor");
7373+ } else {
7474+ info!("seeding firehose sources from listHosts");
7575+ }
7676+7777+ let mut total = 0usize;
7878+ let mut added = 0usize;
7979+8080+ loop {
8181+ let url = list_hosts_url(seed_url, cursor.as_deref());
8282+ let resp = match http.get(url).send().await {
8383+ Ok(r) => r,
8484+ Err(e) => {
8585+ warn!(err = %e, "failed to fetch listHosts, stopping");
8686+ break;
8787+ }
8888+ };
8989+9090+ if !resp.status().is_success() {
9191+ warn!(status = %resp.status(), "listHosts returned error status, stopping");
9292+ break;
9393+ }
9494+9595+ let bytes = match resp.bytes().await {
9696+ Ok(b) => b,
9797+ Err(e) => {
9898+ warn!(err = %e, "failed to read listHosts response, stopping");
9999+ break;
100100+ }
101101+ };
102102+103103+ let body: ListHostsOutput<'_> = match serde_json::from_slice(&bytes) {
104104+ Ok(b) => b,
105105+ Err(e) => {
106106+ warn!(err = %e, "failed to parse listHosts response, stopping");
107107+ break;
108108+ }
109109+ };
110110+111111+ let next_cursor = body.cursor.as_deref().map(str::to_owned);
112112+ total += body.hosts.len();
113113+114114+ for host in &body.hosts {
115115+ // skip banned hosts; everything else (active, idle, offline, throttled) is included
116116+ // since the firehose ingestor handles reconnection for transiently-unavailable hosts
117117+ if matches!(host.status, Some(HostStatus::Banned)) {
118118+ continue;
119119+ }
120120+121121+ let wss_url_str = format!("wss://{}/", host.hostname);
122122+ let wss_url = match Url::parse(&wss_url_str) {
123123+ Ok(u) => u,
124124+ Err(e) => {
125125+ warn!(hostname = %host.hostname, err = %e, "invalid hostname in listHosts response, skipping");
126126+ continue;
127127+ }
128128+ };
129129+130130+ // skip sources that are already running
131131+ if firehose.tasks.contains_async(&wss_url).await {
132132+ continue;
133133+ }
134134+135135+ // initialise account count for hosts we haven't seen before
136136+ if let Some(count) = host.account_count.filter(|&c| c > 0) {
137137+ let count_key = keys::pds_account_count_key(host.hostname.as_ref());
138138+ let current = state.db.get_count(&count_key).await;
139139+ if current == 0 {
140140+ state.db.update_count_async(&count_key, count).await;
141141+ }
142142+ }
143143+144144+ match firehose.add_source(wss_url, true).await {
145145+ Ok(()) => added += 1,
146146+ Err(e) => {
147147+ warn!(hostname = %host.hostname, err = %e, "failed to add firehose source");
148148+ }
149149+ }
150150+ }
151151+152152+ cursor = next_cursor;
153153+154154+ // persist cursor after each page so a restart can resume where we left off
155155+ if let Some(ref c) = cursor {
156156+ let value = match rmp_serde::to_vec(c) {
157157+ Ok(v) => v,
158158+ Err(e) => {
159159+ warn!(err = %e, "failed to serialize seed cursor");
160160+ continue;
161161+ }
162162+ };
163163+ let state = state.clone();
164164+ let key: Vec<u8> = cursor_key.clone();
165165+ let result = tokio::task::spawn_blocking(move || -> miette::Result<()> {
166166+ let mut batch = state.db.inner.batch();
167167+ batch.insert(&state.db.cursors, key, &value);
168168+ batch.commit().into_diagnostic()
169169+ })
170170+ .await
171171+ .into_diagnostic()
172172+ .flatten();
173173+ if let Err(e) = result {
174174+ warn!(err = %e, "failed to persist seed cursor");
175175+ }
176176+ }
177177+178178+ if cursor.is_none() {
179179+ break;
180180+ }
181181+ }
182182+183183+ info!(
184184+ total,
185185+ added, "finished seeding firehose sources from listHosts"
186186+ );
187187+}
188188+189189+fn list_hosts_url(base: &Url, cursor: Option<&str>) -> Url {
190190+ let mut url = base.clone();
191191+ url.set_path("/xrpc/com.atproto.sync.listHosts");
192192+ {
193193+ let mut pairs = url.query_pairs_mut();
194194+ pairs.append_pair("limit", "1000");
195195+ if let Some(c) = cursor {
196196+ pairs.append_pair("cursor", c);
197197+ }
198198+ }
199199+ url
200200+}