very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust
fjall
at-protocol
atproto
indexer
1use std::sync::Arc;
2
3use miette::{IntoDiagnostic, Result};
4use tokio::sync::{mpsc, watch};
5use tracing::{error, info};
6use url::Url;
7
8use crate::db::keys;
9use crate::state::AppState;
10
11pub(super) struct ProducerHandle {
12 mode: crate::config::CrawlerMode,
13 abort: tokio::task::AbortHandle,
14}
15
16impl Drop for ProducerHandle {
17 fn drop(&mut self) {
18 self.abort.abort();
19 }
20}
21
22pub(super) struct CrawlerShared {
23 pub(super) http: reqwest::Client,
24 pub(super) checker: crate::crawler::SignalChecker,
25 pub(super) in_flight: crate::crawler::InFlight,
26 pub(super) tx: mpsc::Sender<crate::crawler::CrawlerBatch>,
27 pub(super) stats: crate::crawler::CrawlerStats,
28}
29
30/// a snapshot of a single crawler source's runtime state.
31#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
32pub struct CrawlerSourceInfo {
33 pub url: Url,
34 pub mode: crate::config::CrawlerMode,
35}
36
37pub(super) fn spawn_crawler_producer(
38 source: &crate::config::CrawlerSource,
39 http: &reqwest::Client,
40 state: &Arc<AppState>,
41 checker: &crate::crawler::SignalChecker,
42 in_flight: &crate::crawler::InFlight,
43 tx: &mpsc::Sender<crate::crawler::CrawlerBatch>,
44 stats: &crate::crawler::CrawlerStats,
45 enabled: watch::Receiver<bool>,
46) -> ProducerHandle {
47 use crate::config::CrawlerMode;
48 use crate::crawler::{ByCollectionProducer, ListReposProducer};
49 use std::time::Duration;
50 use tracing::Instrument;
51
52 let abort = match source.mode {
53 CrawlerMode::ListRepos => {
54 info!(relay = %source.url, enabled = *state.crawler_enabled.borrow(), "starting relay crawler");
55 let span = tracing::info_span!("crawl", url = %source.url);
56 tokio::spawn(
57 ListReposProducer {
58 url: source.url.clone(),
59 checker: checker.clone(),
60 in_flight: in_flight.clone(),
61 tx: tx.clone(),
62 enabled,
63 stats: stats.clone(),
64 }
65 .run()
66 .instrument(span),
67 )
68 .abort_handle()
69 }
70 CrawlerMode::ByCollection => {
71 info!(
72 host = source.url.host_str(),
73 enabled = *state.crawler_enabled.borrow(),
74 "starting by-collection crawler"
75 );
76 let span = tracing::info_span!("by_collection", host = source.url.host_str());
77 let http = http.clone();
78 let state = state.clone();
79 let in_flight = in_flight.clone();
80 let tx = tx.clone();
81 let stats = stats.clone();
82 let url = source.url.clone();
83 tokio::spawn(
84 async move {
85 loop {
86 let producer = ByCollectionProducer {
87 index_url: url.clone(),
88 http: http.clone(),
89 state: state.clone(),
90 in_flight: in_flight.clone(),
91 tx: tx.clone(),
92 enabled: enabled.clone(),
93 stats: stats.clone(),
94 };
95 if let Err(e) = producer.run().await {
96 error!(err = ?e, "by-collection crawler fatal error, restarting in 30s");
97 tokio::time::sleep(Duration::from_secs(30)).await;
98 }
99 }
100 }
101 .instrument(span),
102 )
103 .abort_handle()
104 }
105 };
106 ProducerHandle {
107 mode: source.mode,
108 abort,
109 }
110}
111
112/// runtime control over the crawler component.
113///
114/// the crawler walks `com.atproto.sync.listRepos` on each configured relay to discover
115/// repositories that have never emitted a firehose event. in `filter` mode it also
116/// checks each discovered repo against the configured signal collections before
117/// enqueuing it for backfill.
118///
119/// disabling the crawler does not affect in-progress repo checks. each one completes
120/// its current PDS request before pausing.
121#[derive(Clone)]
122pub struct CrawlerHandle {
123 pub(super) state: Arc<AppState>,
124 /// set once by [`Hydrant::run`]; `None` means run() has not been called yet.
125 pub(super) shared: Arc<std::sync::OnceLock<CrawlerShared>>,
126 /// per-source running tasks, keyed by url.
127 pub(super) tasks: Arc<scc::HashMap<Url, ProducerHandle>>,
128 /// set of urls persisted in the database (dynamically added sources).
129 pub(super) persisted: Arc<scc::HashSet<Url>>,
130}
131
132impl CrawlerHandle {
133 /// enable the crawler (enables all configured producers). no-op if already enabled.
134 pub fn enable(&self) {
135 self.state.crawler_enabled.send_replace(true);
136 }
137 /// disable the crawler (disables all configured producers).
138 /// in-progress repo checks finish before the crawler pauses.
139 pub fn disable(&self) {
140 self.state.crawler_enabled.send_replace(false);
141 }
142 /// returns the current enabled state of the crawler.
143 pub fn is_enabled(&self) -> bool {
144 *self.state.crawler_enabled.borrow()
145 }
146
147 /// delete all cursor entries associated with the given URL.
148 pub async fn reset_cursor(&self, url: &str) -> Result<()> {
149 let state = self.state.clone();
150 let point_keys = [keys::crawler_cursor_key(url)];
151 let by_collection_prefix = keys::by_collection_cursor_prefix(url);
152 tokio::task::spawn_blocking(move || {
153 let mut batch = state.db.inner.batch();
154 for k in point_keys {
155 batch.remove(&state.db.cursors, k);
156 }
157 for entry in state.db.cursors.prefix(&by_collection_prefix) {
158 let k = entry.key().into_diagnostic()?;
159 batch.remove(&state.db.cursors, k);
160 }
161 batch.commit().into_diagnostic()?;
162 state.db.persist()
163 })
164 .await
165 .into_diagnostic()??;
166 Ok(())
167 }
168
169 /// return info on all currently active crawler sources.
170 ///
171 /// returns an empty list if called before [`Hydrant::run`].
172 pub async fn list_sources(&self) -> Vec<CrawlerSourceInfo> {
173 let mut sources = Vec::new();
174 self.tasks
175 .iter_async(|url, h| {
176 sources.push(CrawlerSourceInfo {
177 url: url.clone(),
178 mode: h.mode,
179 });
180 true
181 })
182 .await;
183 sources
184 }
185
186 /// add a new crawler source at runtime.
187 ///
188 /// the source is persisted to the database and will be re-spawned on restart.
189 /// if a source with the same URL already exists, it is replaced (the old task is
190 /// aborted and a new one is started with the new mode).
191 ///
192 /// returns an error if called before [`Hydrant::run`].
193 pub async fn add_source(&self, source: crate::config::CrawlerSource) -> Result<()> {
194 let Some(shared) = self.shared.get() else {
195 miette::bail!("crawler not yet started: call Hydrant::run() first");
196 };
197
198 let state = self.state.clone();
199 let key = keys::crawler_source_key(source.url.as_str());
200 let val = rmp_serde::to_vec(&source.mode).into_diagnostic()?;
201 tokio::task::spawn_blocking(move || {
202 state.db.crawler.insert(key, val).into_diagnostic()?;
203 state.db.persist()
204 })
205 .await
206 .into_diagnostic()??;
207
208 let enabled_rx = self.state.crawler_enabled.subscribe();
209 let handle = spawn_crawler_producer(
210 &source,
211 &shared.http,
212 &self.state,
213 &shared.checker,
214 &shared.in_flight,
215 &shared.tx,
216 &shared.stats,
217 enabled_rx,
218 );
219
220 let _ = self.persisted.insert_async(source.url.clone()).await;
221 match self.tasks.entry_async(source.url).await {
222 scc::hash_map::Entry::Vacant(e) => {
223 e.insert_entry(handle);
224 }
225 scc::hash_map::Entry::Occupied(mut e) => {
226 *e.get_mut() = handle;
227 }
228 }
229 Ok(())
230 }
231
232 /// remove a crawler source at runtime by URL.
233 ///
234 /// aborts the running producer task and removes the source from the database if it
235 /// was dynamically added. config-sourced entries are aborted but not persisted, so
236 /// they will reappear on restart.
237 ///
238 /// returns `true` if a source with the given URL was found and removed.
239 /// returns an error if called before [`Hydrant::run`].
240 pub async fn remove_source(&self, url: &Url) -> Result<bool> {
241 if self.shared.get().is_none() {
242 miette::bail!("crawler not yet started: call Hydrant::run() first");
243 }
244
245 // dropping the ProducerHandle aborts the task via Drop
246 if self.tasks.remove_async(url).await.is_none() {
247 return Ok(false);
248 }
249
250 // remove from DB if it was a persisted source
251 if self.persisted.remove_async(url).await.is_some() {
252 let state = self.state.clone();
253 let key = keys::crawler_source_key(url.as_str());
254 tokio::task::spawn_blocking(move || {
255 state.db.crawler.remove(key).into_diagnostic()?;
256 state.db.persist()
257 })
258 .await
259 .into_diagnostic()??;
260 }
261
262 Ok(true)
263 }
264}