very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust
fjall
at-protocol
atproto
indexer
1use std::sync::Arc;
2use std::sync::atomic::{AtomicUsize, Ordering};
3use std::time::Duration;
4
5use miette::{IntoDiagnostic, Result};
6use rand::RngExt;
7use tokio_util::sync::CancellationToken;
8use tracing::{error, info};
9use url::Url;
10
11use crate::config::FirehoseSource;
12use crate::db::{self, keys};
13use crate::ingest::{BufferTx, firehose::FirehoseIngestor};
14use crate::state::AppState;
15
16pub(super) struct FirehoseIngestorHandle {
17 id: usize,
18 cancel: CancellationToken,
19 pub(super) is_pds: bool,
20}
21
22impl Drop for FirehoseIngestorHandle {
23 fn drop(&mut self) {
24 self.cancel.cancel();
25 }
26}
27
28pub(super) struct FirehoseShared {
29 pub(super) buffer_tx: BufferTx,
30 pub(super) verify_signatures: bool,
31}
32
33/// a snapshot of a single firehose relay's runtime state.
34#[derive(Debug, Clone, serde::Serialize)]
35pub struct FirehoseSourceInfo {
36 pub url: Url,
37 /// true when this is a direct PDS connection; enables host authority enforcement.
38 pub is_pds: bool,
39}
40
41/// runtime control over the firehose ingestor component.
42#[derive(Clone)]
43pub struct FirehoseHandle {
44 pub(super) state: Arc<AppState>,
45 /// set once by [`Hydrant::run`]; `None` means run() has not been called yet.
46 pub(super) shared: Arc<std::sync::OnceLock<FirehoseShared>>,
47 /// per-relay running tasks, keyed by url.
48 pub(super) tasks: Arc<scc::HashMap<Url, FirehoseIngestorHandle>>,
49 /// known source urls → is_pds flag; includes API-added (db-persisted) and static config sources.
50 pub(super) known_sources: Arc<scc::HashMap<Url, bool>>,
51 /// ids assigned to spawned tasks
52 next_task_id: Arc<AtomicUsize>,
53}
54
55impl FirehoseHandle {
56 pub(super) fn new(state: Arc<AppState>) -> Self {
57 Self {
58 state,
59 shared: Arc::new(std::sync::OnceLock::new()),
60 tasks: Arc::new(scc::HashMap::new()),
61 known_sources: Arc::new(scc::HashMap::new()),
62 next_task_id: Arc::new(AtomicUsize::new(0)),
63 }
64 }
65
66 pub(super) async fn spawn_firehose_ingestor(
67 &self,
68 source: &FirehoseSource,
69 shared: &FirehoseShared,
70 delay_startup: bool,
71 ) -> Result<()> {
72 use std::sync::atomic::AtomicI64;
73 let state = &self.state;
74
75 let start = db::get_firehose_cursor(&state.db, &source.url).await?;
76 // insert into relay_cursors if not already present; existing in-memory cursor takes precedence
77 let _ = state
78 .firehose_cursors
79 .insert_async(source.url.clone(), AtomicI64::new(start.unwrap_or(0)))
80 .await;
81
82 info!(relay = %source.url, source.is_pds, cursor = ?start, "starting firehose ingestor");
83
84 let enabled = state.firehose_enabled.subscribe();
85 let ingestor = FirehoseIngestor::new(
86 state.clone(),
87 shared.buffer_tx.clone(),
88 source.url.clone(),
89 source.is_pds,
90 state.filter.clone(),
91 enabled,
92 shared.verify_signatures,
93 )
94 .await;
95
96 let id = self.next_task_id.fetch_add(1, Ordering::Relaxed);
97 let cancel = CancellationToken::new();
98
99 tokio::spawn({
100 let relay_url = source.url.clone();
101 let tasks = self.tasks.clone();
102 let token = cancel.clone();
103 async move {
104 // jitter connection start so we dont cause thundering herd problems
105 if delay_startup {
106 let jitter_ms = rand::rng().random_range(0u64..2000);
107 tokio::select! {
108 _ = tokio::time::sleep(Duration::from_millis(jitter_ms)) => {}
109 _ = token.cancelled() => {
110 info!(relay = %relay_url, "firehose ingestor cancelled");
111 return;
112 }
113 }
114 }
115 tokio::select! {
116 res = ingestor.run() => {
117 // only remove our own entry because an upsert could replace us
118 tasks.remove_if_async(&relay_url, |h| h.id == id).await;
119 match res {
120 Ok(()) => info!(relay = %relay_url, "firehose shut down!"),
121 Err(e) => error!(relay = %relay_url, err = %e, "firehose ingestor exited with error"),
122 }
123 },
124 _ = token.cancelled() => {
125 info!(relay = %relay_url, "firehose ingestor cancelled");
126 }
127 }
128 }
129 });
130
131 let handle = FirehoseIngestorHandle {
132 id,
133 cancel,
134 is_pds: source.is_pds,
135 };
136 self.tasks.upsert_async(source.url.clone(), handle).await;
137
138 Ok(())
139 }
140
141 /// enable firehose ingestion, no-op if already enabled.
142 pub fn enable(&self) {
143 self.state.firehose_enabled.send_replace(true);
144 }
145 /// disable firehose ingestion, in-flight messages complete before pausing.
146 pub fn disable(&self) {
147 self.state.firehose_enabled.send_replace(false);
148 }
149 /// returns the current enabled state of firehose ingestion.
150 pub fn is_enabled(&self) -> bool {
151 *self.state.firehose_enabled.borrow()
152 }
153
154 /// returns `true` if this URL is already a known firehose source.
155 /// either currently running or persisted (e.g. the host is offline but was previously added).
156 pub fn is_source_known(&self, url: &Url) -> bool {
157 self.known_sources.contains_sync(url)
158 }
159
160 /// return `true` if this source has a running firehose task (eg. its not offline).
161 pub fn is_source_running(&self, url: &Url) -> bool {
162 self.tasks.contains_sync(url)
163 }
164
165 /// list all currently active firehose sources.
166 pub async fn list_sources(&self) -> Vec<FirehoseSourceInfo> {
167 let mut out = Vec::with_capacity(self.tasks.capacity());
168 self.tasks
169 .iter_async(|url, handle| {
170 out.push(FirehoseSourceInfo {
171 url: url.clone(),
172 is_pds: handle.is_pds,
173 });
174 true
175 })
176 .await;
177 out
178 }
179
180 /// add a new firehose source at runtime, persisting it to the database.
181 ///
182 /// if a source with the same URL already exists, it is replaced: the
183 /// running task is stopped and a new one is started with the new `is_pds`
184 /// setting. existing cursor state for the URL is preserved.
185 pub async fn add_source(&self, url: Url, is_pds: bool) -> Result<()> {
186 let shared = self
187 .shared
188 .get()
189 .ok_or_else(|| miette::miette!("firehose worker not started"))?;
190
191 // persist to db first
192 let key = keys::firehose_source_key(url.as_str());
193 tokio::task::spawn_blocking({
194 let state = self.state.clone();
195 move || {
196 let mut batch = state.db.inner.batch();
197 let value = rmp_serde::to_vec(&db::FirehoseSourceMeta { is_pds }).map_err(|e| {
198 miette::miette!("failed to serialize firehose source meta: {e}")
199 })?;
200 batch.insert(&state.db.crawler, key, &value);
201 batch.commit().into_diagnostic()?;
202 state.db.persist()
203 }
204 })
205 .await
206 .into_diagnostic()??;
207
208 let _ = self.known_sources.insert_async(url.clone(), is_pds).await;
209
210 // reset failure state so the fresh task gets a clean slate.
211 // if the previous task exited after max failures, the failure counter
212 // would otherwise cause the new task to exit immediately.
213 let throttle = self.state.throttler.get_handle(&url).await;
214 throttle.record_success();
215
216 self.spawn_firehose_ingestor(&FirehoseSource { url, is_pds }, shared, false)
217 .await?;
218
219 Ok(())
220 }
221
222 /// remove a firehose source at runtime.
223 ///
224 /// returns `true` if the source was found and removed, `false` otherwise.
225 /// if the source was added via the API, it is removed from the database;
226 /// if it came from the static config, only the running task is stopped.
227 pub async fn remove_source(&self, url: &Url) -> Result<bool> {
228 if self.known_sources.contains_async(url).await {
229 let url_str = url.to_string();
230 tokio::task::spawn_blocking({
231 let state = self.state.clone();
232 move || {
233 state
234 .db
235 .crawler
236 .remove(keys::firehose_source_key(&url_str))
237 .into_diagnostic()?;
238 state.db.persist()
239 }
240 })
241 .await
242 .into_diagnostic()??;
243 self.known_sources.remove_async(url).await;
244 }
245
246 Ok(self.tasks.remove_async(url).await.is_some())
247 }
248
249 /// restart an offline firehose source without touching the database or daily limits.
250 pub(super) async fn restart_source(&self, url: Url, is_pds: bool) -> Result<()> {
251 let shared = self
252 .shared
253 .get()
254 .ok_or_else(|| miette::miette!("firehose worker not started"))?;
255
256 // clear the failure counter so the new task isn't immediately terminated
257 let throttle = self.state.throttler.get_handle(&url).await;
258 throttle.record_success();
259
260 self.spawn_firehose_ingestor(&FirehoseSource { url, is_pds }, shared, true)
261 .await
262 }
263
264 /// reset the stored firehose cursor for a given URL.
265 pub async fn reset_cursor(&self, url: &str) -> Result<()> {
266 let url = Url::parse(url).into_diagnostic()?;
267 let key = keys::firehose_cursor_key_from_url(&url);
268 tokio::task::spawn_blocking({
269 let state = self.state.clone();
270 move || {
271 state.db.cursors.remove(key).into_diagnostic()?;
272 state.db.persist()
273 }
274 })
275 .await
276 .into_diagnostic()??;
277
278 self.state.firehose_cursors.remove_async(&url).await;
279
280 Ok(())
281 }
282}