···69697070each source maintains its own cursor so restarts resume mid-pass.
71717272+sources can also be added and removed at runtime via the `/crawler/sources` API
7373+(see below). dynamically added sources are persisted to the database and survive
7474+restarts. `CRAWLER_URLS` sources are startup-only: they are not written to the
7575+database and will always reappear after a restart regardless of runtime changes.
7676+7277## configuration
73787479`hydrant` is configured via environment variables. all variables are prefixed
···117122 - returns `{ "crawler": bool, "firehose": bool, "backfill": bool }`.
118123- `PATCH /ingestion`: enable or disable ingestion components at runtime without
119124 restarting.
120120- - body: `{ "crawler"?: bool, "firehose"?: bool, "backfill"?: bool }` — only
121121- provided fields are updated.
125125+ - body: `{ "crawler"?: bool, "firehose"?: bool, "backfill"?: bool }`. only provided fields are updated.
122126 - when disabled, each component finishes its current task before pausing (e.g.
123127 the backfill worker completes any in-flight repo syncs, the firehose
124128 finishes processing the current message). they resume immediately when
125129 re-enabled.
126130131131+#### crawler source management
132132+133133+- `GET /crawler/sources`: list all currently active crawler sources.
134134+ - returns a JSON array of `{ "url": string, "mode": "relay" | "by_collection", "persisted": bool }`.
135135+ - `persisted: true` means the source was added via the API and is stored in the
136136+ database, it will survive a restart. `persisted: false` means the source
137137+ came from `CRAWLER_URLS` and is not written to the database.
138138+- `POST /crawler/sources`: add a crawler source at runtime.
139139+ - body: `{ "url": string, "mode": "relay" | "by_collection" }`.
140140+ - the source is written to the database before the producer task is started, so
141141+ it is safe to add sources and then immediately restart without losing them.
142142+ - if a source with the same URL already exists (whether from `CRAWLER_URLS` or
143143+ a previous `POST`), it is replaced: the running task is stopped and a new one
144144+ is started with the new mode. any cursor state for that URL is preserved.
145145+ - returns `201 Created` on success.
146146+- `DELETE /crawler/sources`: remove a crawler source at runtime.
147147+ - body: `{ "url": string }`.
148148+ - the producer task is stopped immediately.
149149+ - if the source was added via the API (`persisted: true`), it is removed from
150150+ the database and will not reappear on restart. if it came from `CRAWLER_URLS`
151151+ (`persisted: false`), only the running task is stopped, the source will
152152+ reappear on the next restart since `CRAWLER_URLS` is re-applied at startup.
153153+ (unless you remove it manually from your configuration of course).
154154+ - cursor state is not cleared. use `DELETE /cursors` separately if you want
155155+ the source to restart from the beginning when re-added.
156156+ - returns `200 OK` if the source was found and removed, `404 Not Found` otherwise.
157157+127158#### database operations
128159129160- `POST /db/train`: train zstd compression dictionaries for the `repos`,
···159190160191each set field accepts one of two forms:
161192162162-- **replace**: an array replaces the entire set — `["did:plc:abc", "did:web:example.org"]`
163163-- **patch**: an object maps items to `true` (add) or `false` (remove) — `{"did:plc:abc": true, "did:web:example.org": false}`
193193+- **replace**: an array replaces the entire set, eg. `["did:plc:abc", "did:web:example.org"]`
194194+- **patch**: an object maps items to `true` (add) or `false` (remove), eg. `{"did:plc:abc": true, "did:web:example.org": false}`
164195165196#### NSID patterns
166197167198`signals` and `collections` support an optional `.*` suffix to match an entire namespace:
168199169169-- `app.bsky.feed.post` — exact match only
170170-- `app.bsky.feed.*` — matches any collection under `app.bsky.feed`
200200+- `app.bsky.feed.post`: exact match only
201201+- `app.bsky.feed.*`: matches any collection under `app.bsky.feed`
171202172203### repository management
173204
···144144 let state = Arc::new(state);
145145146146 Ok(Self {
147147- crawler: CrawlerHandle(state.clone()),
147147+ crawler: CrawlerHandle {
148148+ state: state.clone(),
149149+ shared: Arc::new(std::sync::OnceLock::new()),
150150+ tasks: Arc::new(scc::HashMap::new()),
151151+ persisted: Arc::new(scc::HashSet::new()),
152152+ },
148153 firehose: FirehoseHandle(state.clone()),
149154 backfill: BackfillHandle(state.clone()),
150155 filter: FilterControl(state.clone()),
···175180 pub fn run(&self) -> Result<impl Future<Output = Result<()>>> {
176181 let state = self.state.clone();
177182 let config = self.config.clone();
183183+ let crawler = self.crawler.clone();
178184179185 if self.started.swap(true, Ordering::SeqCst) {
180186 miette::bail!("Hydrant::run() called more than once");
···329335 }
330336 }
331337332332- // 11. spawn crawler components
333333- if !config.crawler_sources.is_empty() {
334334- use crate::config::CrawlerMode;
338338+ // 11. spawn crawler infrastructure (always, to support dynamic source management)
339339+ {
335340 use crate::crawler::throttle::Throttler;
336341 use crate::crawler::{
337337- ByCollectionProducer, CrawlerStats, CrawlerWorker, InFlight, RelayProducer,
338338- RetryProducer, SignalChecker,
342342+ CrawlerStats, CrawlerWorker, InFlight, RetryProducer, SignalChecker,
339343 };
340340- use std::time::Duration;
341341- use tracing::Instrument;
342344343345 let http = reqwest::Client::builder()
344346 .user_agent(concat!(
···402404 .run(),
403405 );
404406405405- let crawler_rx = state.crawler_enabled.subscribe();
406406- for source in config.crawler_sources.iter().cloned() {
407407- let http = http.clone();
408408- let state = state.clone();
409409- let in_flight = in_flight.clone();
410410- let tx = tx.clone();
411411- let stats = stats.clone();
412412- let enabled = crawler_rx.clone();
413413- match source.mode {
414414- CrawlerMode::Relay => {
415415- info!(relay = %source.url, enabled = *state.crawler_enabled.borrow(), "starting relay crawler");
416416- let span = tracing::info_span!("crawl", url = %source.url);
417417- tokio::spawn(
418418- RelayProducer {
419419- relay_url: source.url,
420420- checker: checker.clone(),
421421- in_flight,
422422- tx,
423423- enabled,
424424- stats,
425425- }
426426- .run()
427427- .instrument(span),
428428- );
429429- }
430430- CrawlerMode::ByCollection => {
431431- info!(
432432- host = source.url.host_str(),
433433- enabled = *state.crawler_enabled.borrow(),
434434- "starting by-collection crawler"
435435- );
436436- let span =
437437- tracing::info_span!("by_collection", host = source.url.host_str());
438438- tokio::spawn(
439439- async move {
440440- loop {
441441- let producer = ByCollectionProducer {
442442- index_url: source.url.clone(),
443443- http: http.clone(),
444444- state: state.clone(),
445445- in_flight: in_flight.clone(),
446446- tx: tx.clone(),
447447- enabled: enabled.clone(),
448448- stats: stats.clone(),
449449- };
450450- if let Err(e) = producer.run().await {
451451- error!(err = ?e, "by-collection crawler fatal error, restarting in 30s");
452452- tokio::time::sleep(Duration::from_secs(30)).await;
453453- }
454454- }
455455- }
456456- .instrument(span),
457457- );
458458- }
407407+ // set shared objects so CrawlerHandle methods can use them
408408+ crawler
409409+ .shared
410410+ .set(CrawlerShared {
411411+ http,
412412+ checker,
413413+ in_flight,
414414+ tx,
415415+ stats,
416416+ })
417417+ .ok()
418418+ .expect("crawler shared already set");
419419+ let shared = crawler.shared.get().unwrap();
420420+421421+ // spawn initial sources from config
422422+ for source in config.crawler_sources.iter() {
423423+ let enabled_rx = state.crawler_enabled.subscribe();
424424+ let handle = spawn_crawler_producer(
425425+ source,
426426+ &shared.http,
427427+ &state,
428428+ &shared.checker,
429429+ &shared.in_flight,
430430+ &shared.tx,
431431+ &shared.stats,
432432+ enabled_rx,
433433+ );
434434+ let _ = crawler.tasks.insert_async(source.url.clone(), handle).await;
435435+ }
436436+437437+ // load and spawn any sources persisted in the database
438438+ let db = state.db.clone();
439439+ let persisted_sources =
440440+ tokio::task::spawn_blocking(move || load_persisted_crawler_sources(&db))
441441+ .await
442442+ .into_diagnostic()??;
443443+444444+ for source in &persisted_sources {
445445+ let _ = crawler.persisted.insert_async(source.url.clone()).await;
446446+ if crawler.tasks.contains_async(&source.url).await {
447447+ continue;
459448 }
449449+ let enabled_rx = state.crawler_enabled.subscribe();
450450+ let handle = spawn_crawler_producer(
451451+ source,
452452+ &shared.http,
453453+ &state,
454454+ &shared.checker,
455455+ &shared.in_flight,
456456+ &shared.tx,
457457+ &shared.stats,
458458+ enabled_rx,
459459+ );
460460+ let _ = crawler.tasks.insert_async(source.url.clone(), handle).await;
460461 }
461462 }
462463···625626 }
626627}
627628628628-// --- event stream ---
629629-630629/// a stream of [`Event`]s. returned by [`Hydrant::subscribe`].
631630///
632631/// implements [`futures::Stream`] and can be used with `StreamExt::next`,
···642641 }
643642}
644643645645-// --- stats ---
646646-647644/// database statistics returned by [`Hydrant::stats`].
648645#[derive(serde::Serialize)]
649646pub struct StatsResponse {
···653650 pub sizes: BTreeMap<&'static str, u64>,
654651}
655652656656-// --- ingestion handles ---
653653+struct ProducerHandle {
654654+ mode: crate::config::CrawlerMode,
655655+ abort: tokio::task::AbortHandle,
656656+}
657657+658658+impl Drop for ProducerHandle {
659659+ fn drop(&mut self) {
660660+ self.abort.abort();
661661+ }
662662+}
663663+664664+struct CrawlerShared {
665665+ http: reqwest::Client,
666666+ checker: crate::crawler::SignalChecker,
667667+ in_flight: crate::crawler::InFlight,
668668+ tx: mpsc::Sender<crate::crawler::CrawlerBatch>,
669669+ stats: crate::crawler::CrawlerStats,
670670+}
671671+672672+/// a snapshot of a single crawler source's runtime state.
673673+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
674674+pub struct CrawlerSourceInfo {
675675+ pub url: Url,
676676+ pub mode: crate::config::CrawlerMode,
677677+ /// whether this source is persisted in the database (i.e. it was dynamically added
678678+ /// and will survive restarts). config-sourced entries have `persisted: false`.
679679+ pub persisted: bool,
680680+}
681681+682682+fn spawn_crawler_producer(
683683+ source: &crate::config::CrawlerSource,
684684+ http: &reqwest::Client,
685685+ state: &Arc<AppState>,
686686+ checker: &crate::crawler::SignalChecker,
687687+ in_flight: &crate::crawler::InFlight,
688688+ tx: &mpsc::Sender<crate::crawler::CrawlerBatch>,
689689+ stats: &crate::crawler::CrawlerStats,
690690+ enabled: watch::Receiver<bool>,
691691+) -> ProducerHandle {
692692+ use crate::config::CrawlerMode;
693693+ use crate::crawler::{ByCollectionProducer, RelayProducer};
694694+ use std::time::Duration;
695695+ use tracing::Instrument;
696696+697697+ let abort = match source.mode {
698698+ CrawlerMode::Relay => {
699699+ info!(relay = %source.url, enabled = *state.crawler_enabled.borrow(), "starting relay crawler");
700700+ let span = tracing::info_span!("crawl", url = %source.url);
701701+ tokio::spawn(
702702+ RelayProducer {
703703+ relay_url: source.url.clone(),
704704+ checker: checker.clone(),
705705+ in_flight: in_flight.clone(),
706706+ tx: tx.clone(),
707707+ enabled,
708708+ stats: stats.clone(),
709709+ }
710710+ .run()
711711+ .instrument(span),
712712+ )
713713+ .abort_handle()
714714+ }
715715+ CrawlerMode::ByCollection => {
716716+ info!(
717717+ host = source.url.host_str(),
718718+ enabled = *state.crawler_enabled.borrow(),
719719+ "starting by-collection crawler"
720720+ );
721721+ let span = tracing::info_span!("by_collection", host = source.url.host_str());
722722+ let http = http.clone();
723723+ let state = state.clone();
724724+ let in_flight = in_flight.clone();
725725+ let tx = tx.clone();
726726+ let stats = stats.clone();
727727+ let url = source.url.clone();
728728+ tokio::spawn(
729729+ async move {
730730+ loop {
731731+ let producer = ByCollectionProducer {
732732+ index_url: url.clone(),
733733+ http: http.clone(),
734734+ state: state.clone(),
735735+ in_flight: in_flight.clone(),
736736+ tx: tx.clone(),
737737+ enabled: enabled.clone(),
738738+ stats: stats.clone(),
739739+ };
740740+ if let Err(e) = producer.run().await {
741741+ error!(err = ?e, "by-collection crawler fatal error, restarting in 30s");
742742+ tokio::time::sleep(Duration::from_secs(30)).await;
743743+ }
744744+ }
745745+ }
746746+ .instrument(span),
747747+ )
748748+ .abort_handle()
749749+ }
750750+ };
751751+ ProducerHandle {
752752+ mode: source.mode,
753753+ abort,
754754+ }
755755+}
756756+757757+/// load all crawler sources persisted in the database.
758758+fn load_persisted_crawler_sources(db: &crate::db::Db) -> Result<Vec<crate::config::CrawlerSource>> {
759759+ use crate::db::keys::CRAWLER_SOURCE_PREFIX;
760760+761761+ let mut sources = Vec::new();
762762+ for entry in db.crawler.prefix(CRAWLER_SOURCE_PREFIX) {
763763+ let (key, val) = entry.into_inner().into_diagnostic()?;
764764+ let url_bytes = &key[CRAWLER_SOURCE_PREFIX.len()..];
765765+ let url_str = std::str::from_utf8(url_bytes).into_diagnostic()?;
766766+ let url = Url::parse(url_str).into_diagnostic()?;
767767+ let mode: crate::config::CrawlerMode = rmp_serde::from_slice(&val).into_diagnostic()?;
768768+ sources.push(crate::config::CrawlerSource { url, mode });
769769+ }
770770+ Ok(sources)
771771+}
657772658773/// runtime control over the crawler component.
659774///
···665780/// disabling the crawler does not affect in-progress repo checks. each one completes
666781/// its current PDS request before pausing.
667782#[derive(Clone)]
668668-pub struct CrawlerHandle(Arc<AppState>);
783783+pub struct CrawlerHandle {
784784+ state: Arc<AppState>,
785785+ /// set once by [`Hydrant::run`]; `None` means run() has not been called yet.
786786+ shared: Arc<std::sync::OnceLock<CrawlerShared>>,
787787+ /// per-source running tasks, keyed by url.
788788+ tasks: Arc<scc::HashMap<Url, ProducerHandle>>,
789789+ /// set of urls persisted in the database (dynamically added sources).
790790+ persisted: Arc<scc::HashSet<Url>>,
791791+}
669792670793impl CrawlerHandle {
671794 /// enable the crawler (enables all configured producers). no-op if already enabled.
672795 pub fn enable(&self) {
673673- self.0.crawler_enabled.send_replace(true);
796796+ self.state.crawler_enabled.send_replace(true);
674797 }
675798 /// disable the crawler (disables all configured producers).
676799 /// in-progress repo checks finish before the crawler pauses.
677800 pub fn disable(&self) {
678678- self.0.crawler_enabled.send_replace(false);
801801+ self.state.crawler_enabled.send_replace(false);
679802 }
680803 /// returns the current enabled state of the crawler.
681804 pub fn is_enabled(&self) -> bool {
682682- *self.0.crawler_enabled.borrow()
805805+ *self.state.crawler_enabled.borrow()
683806 }
684807685808 /// delete all cursor entries associated with the given URL.
686809 pub async fn reset_cursor(&self, url: &str) -> Result<()> {
687687- let db = self.0.db.clone();
810810+ let db = self.state.db.clone();
688811 let point_keys = [keys::crawler_cursor_key(url)];
689812 let by_collection_prefix = keys::by_collection_cursor_prefix(url);
690813 tokio::task::spawn_blocking(move || {
···693816 batch.remove(&db.cursors, k);
694817 }
695818 for entry in db.cursors.prefix(&by_collection_prefix) {
696696- let (k, _) = entry.into_inner().into_diagnostic()?;
819819+ let k = entry.key().into_diagnostic()?;
697820 batch.remove(&db.cursors, k);
698821 }
699822 batch.commit().into_diagnostic()
···702825 .into_diagnostic()??;
703826 Ok(())
704827 }
828828+829829+ /// return info on all currently active crawler sources.
830830+ ///
831831+ /// returns an empty list if called before [`Hydrant::run`].
832832+ pub async fn list_sources(&self) -> Vec<CrawlerSourceInfo> {
833833+ let mut sources = Vec::new();
834834+ self.tasks
835835+ .iter_async(|url, h| {
836836+ sources.push(CrawlerSourceInfo {
837837+ url: url.clone(),
838838+ mode: h.mode,
839839+ persisted: self.persisted.contains_sync(url),
840840+ });
841841+ true
842842+ })
843843+ .await;
844844+ sources
845845+ }
846846+847847+ /// add a new crawler source at runtime.
848848+ ///
849849+ /// the source is persisted to the database and will be re-spawned on restart.
850850+ /// if a source with the same URL already exists, it is replaced (the old task is
851851+ /// aborted and a new one is started with the new mode).
852852+ ///
853853+ /// returns an error if called before [`Hydrant::run`].
854854+ pub async fn add_source(&self, source: crate::config::CrawlerSource) -> Result<()> {
855855+ let Some(shared) = self.shared.get() else {
856856+ miette::bail!("crawler not yet started: call Hydrant::run() first");
857857+ };
858858+859859+ let db = self.state.db.clone();
860860+ let key = keys::crawler_source_key(source.url.as_str());
861861+ let val = rmp_serde::to_vec(&source.mode).into_diagnostic()?;
862862+ tokio::task::spawn_blocking(move || db.crawler.insert(key, val).into_diagnostic())
863863+ .await
864864+ .into_diagnostic()??;
865865+866866+ let enabled_rx = self.state.crawler_enabled.subscribe();
867867+ let handle = spawn_crawler_producer(
868868+ &source,
869869+ &shared.http,
870870+ &self.state,
871871+ &shared.checker,
872872+ &shared.in_flight,
873873+ &shared.tx,
874874+ &shared.stats,
875875+ enabled_rx,
876876+ );
877877+878878+ let _ = self.persisted.insert_async(source.url.clone()).await;
879879+ match self.tasks.entry_async(source.url).await {
880880+ scc::hash_map::Entry::Vacant(e) => {
881881+ e.insert_entry(handle);
882882+ }
883883+ scc::hash_map::Entry::Occupied(mut e) => {
884884+ *e.get_mut() = handle;
885885+ }
886886+ }
887887+ Ok(())
888888+ }
889889+890890+ /// remove a crawler source at runtime by URL.
891891+ ///
892892+ /// aborts the running producer task and removes the source from the database if it
893893+ /// was dynamically added. config-sourced entries are aborted but not persisted, so
894894+ /// they will reappear on restart.
895895+ ///
896896+ /// returns `true` if a source with the given URL was found and removed.
897897+ /// returns an error if called before [`Hydrant::run`].
898898+ pub async fn remove_source(&self, url: &Url) -> Result<bool> {
899899+ if self.shared.get().is_none() {
900900+ miette::bail!("crawler not yet started: call Hydrant::run() first");
901901+ }
902902+903903+ // dropping the ProducerHandle aborts the task via Drop
904904+ if self.tasks.remove_async(url).await.is_none() {
905905+ return Ok(false);
906906+ }
907907+908908+ // remove from DB if it was a persisted source
909909+ if self.persisted.remove_async(url).await.is_some() {
910910+ let db = self.state.db.clone();
911911+ let key = keys::crawler_source_key(url.as_str());
912912+ tokio::task::spawn_blocking(move || db.crawler.remove(key).into_diagnostic())
913913+ .await
914914+ .into_diagnostic()??;
915915+ }
916916+917917+ Ok(true)
918918+ }
705919}
706920707921/// runtime control over the firehose ingestor component.
···709923pub struct FirehoseHandle(Arc<AppState>);
710924711925impl FirehoseHandle {
712712- /// enable the firehose. no-op if already enabled.
926926+ /// enable the firehose, no-op if already enabled.
713927 pub fn enable(&self) {
714928 self.0.firehose_enabled.send_replace(true);
715929 }
716716- /// disable the firehose. the current message finishes processing before the connection closes.
930930+ /// disable the firehose, the current message finishes processing before the connection closes.
717931 pub fn disable(&self) {
718932 self.0.firehose_enabled.send_replace(false);
719933 }
···732946pub struct BackfillHandle(Arc<AppState>);
733947734948impl BackfillHandle {
735735- /// enable the backfill worker. no-op if already enabled.
949949+ /// enable the backfill worker, no-op if already enabled.
736950 pub fn enable(&self) {
737951 self.0.backfill_enabled.send_replace(true);
738952 }
739739- /// disable the backfill worker. in-flight repos complete before pausing.
953953+ /// disable the backfill worker, in-flight repos complete before pausing.
740954 pub fn disable(&self) {
741955 self.0.backfill_enabled.send_replace(false);
742956 }
+1-1
src/crawler/mod.rs
···19192020pub(crate) use by_collection::ByCollectionProducer;
2121pub(crate) use relay::{RelayProducer, RetryProducer, SignalChecker};
2222-pub(crate) use worker::CrawlerWorker;
2222+pub(crate) use worker::{CrawlerBatch, CrawlerWorker};
23232424// -- InFlight ------------------------------------------------------------
2525