very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
59
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 282 lines 10 kB view raw
1use std::sync::Arc; 2use std::sync::atomic::{AtomicUsize, Ordering}; 3use std::time::Duration; 4 5use miette::{IntoDiagnostic, Result}; 6use rand::RngExt; 7use tokio_util::sync::CancellationToken; 8use tracing::{error, info}; 9use url::Url; 10 11use crate::config::FirehoseSource; 12use crate::db::{self, keys}; 13use crate::ingest::{BufferTx, firehose::FirehoseIngestor}; 14use crate::state::AppState; 15 16pub(super) struct FirehoseIngestorHandle { 17 id: usize, 18 cancel: CancellationToken, 19 pub(super) is_pds: bool, 20} 21 22impl Drop for FirehoseIngestorHandle { 23 fn drop(&mut self) { 24 self.cancel.cancel(); 25 } 26} 27 28pub(super) struct FirehoseShared { 29 pub(super) buffer_tx: BufferTx, 30 pub(super) verify_signatures: bool, 31} 32 33/// a snapshot of a single firehose relay's runtime state. 34#[derive(Debug, Clone, serde::Serialize)] 35pub struct FirehoseSourceInfo { 36 pub url: Url, 37 /// true when this is a direct PDS connection; enables host authority enforcement. 38 pub is_pds: bool, 39} 40 41/// runtime control over the firehose ingestor component. 42#[derive(Clone)] 43pub struct FirehoseHandle { 44 pub(super) state: Arc<AppState>, 45 /// set once by [`Hydrant::run`]; `None` means run() has not been called yet. 46 pub(super) shared: Arc<std::sync::OnceLock<FirehoseShared>>, 47 /// per-relay running tasks, keyed by url. 48 pub(super) tasks: Arc<scc::HashMap<Url, FirehoseIngestorHandle>>, 49 /// known source urls → is_pds flag; includes API-added (db-persisted) and static config sources. 50 pub(super) known_sources: Arc<scc::HashMap<Url, bool>>, 51 /// ids assigned to spawned tasks 52 next_task_id: Arc<AtomicUsize>, 53} 54 55impl FirehoseHandle { 56 pub(super) fn new(state: Arc<AppState>) -> Self { 57 Self { 58 state, 59 shared: Arc::new(std::sync::OnceLock::new()), 60 tasks: Arc::new(scc::HashMap::new()), 61 known_sources: Arc::new(scc::HashMap::new()), 62 next_task_id: Arc::new(AtomicUsize::new(0)), 63 } 64 } 65 66 pub(super) async fn spawn_firehose_ingestor( 67 &self, 68 source: &FirehoseSource, 69 shared: &FirehoseShared, 70 delay_startup: bool, 71 ) -> Result<()> { 72 use std::sync::atomic::AtomicI64; 73 let state = &self.state; 74 75 let start = db::get_firehose_cursor(&state.db, &source.url).await?; 76 // insert into relay_cursors if not already present; existing in-memory cursor takes precedence 77 let _ = state 78 .firehose_cursors 79 .insert_async(source.url.clone(), AtomicI64::new(start.unwrap_or(0))) 80 .await; 81 82 info!(relay = %source.url, source.is_pds, cursor = ?start, "starting firehose ingestor"); 83 84 let enabled = state.firehose_enabled.subscribe(); 85 let ingestor = FirehoseIngestor::new( 86 state.clone(), 87 shared.buffer_tx.clone(), 88 source.url.clone(), 89 source.is_pds, 90 state.filter.clone(), 91 enabled, 92 shared.verify_signatures, 93 ) 94 .await; 95 96 let id = self.next_task_id.fetch_add(1, Ordering::Relaxed); 97 let cancel = CancellationToken::new(); 98 99 tokio::spawn({ 100 let relay_url = source.url.clone(); 101 let tasks = self.tasks.clone(); 102 let token = cancel.clone(); 103 async move { 104 // jitter connection start so we dont cause thundering herd problems 105 if delay_startup { 106 let jitter_ms = rand::rng().random_range(0u64..2000); 107 tokio::select! { 108 _ = tokio::time::sleep(Duration::from_millis(jitter_ms)) => {} 109 _ = token.cancelled() => { 110 info!(relay = %relay_url, "firehose ingestor cancelled"); 111 return; 112 } 113 } 114 } 115 tokio::select! { 116 res = ingestor.run() => { 117 // only remove our own entry because an upsert could replace us 118 tasks.remove_if_async(&relay_url, |h| h.id == id).await; 119 match res { 120 Ok(()) => info!(relay = %relay_url, "firehose shut down!"), 121 Err(e) => error!(relay = %relay_url, err = %e, "firehose ingestor exited with error"), 122 } 123 }, 124 _ = token.cancelled() => { 125 info!(relay = %relay_url, "firehose ingestor cancelled"); 126 } 127 } 128 } 129 }); 130 131 let handle = FirehoseIngestorHandle { 132 id, 133 cancel, 134 is_pds: source.is_pds, 135 }; 136 self.tasks.upsert_async(source.url.clone(), handle).await; 137 138 Ok(()) 139 } 140 141 /// enable firehose ingestion, no-op if already enabled. 142 pub fn enable(&self) { 143 self.state.firehose_enabled.send_replace(true); 144 } 145 /// disable firehose ingestion, in-flight messages complete before pausing. 146 pub fn disable(&self) { 147 self.state.firehose_enabled.send_replace(false); 148 } 149 /// returns the current enabled state of firehose ingestion. 150 pub fn is_enabled(&self) -> bool { 151 *self.state.firehose_enabled.borrow() 152 } 153 154 /// returns `true` if this URL is already a known firehose source. 155 /// either currently running or persisted (e.g. the host is offline but was previously added). 156 pub fn is_source_known(&self, url: &Url) -> bool { 157 self.known_sources.contains_sync(url) 158 } 159 160 /// return `true` if this source has a running firehose task (eg. its not offline). 161 pub fn is_source_running(&self, url: &Url) -> bool { 162 self.tasks.contains_sync(url) 163 } 164 165 /// list all currently active firehose sources. 166 pub async fn list_sources(&self) -> Vec<FirehoseSourceInfo> { 167 let mut out = Vec::with_capacity(self.tasks.capacity()); 168 self.tasks 169 .iter_async(|url, handle| { 170 out.push(FirehoseSourceInfo { 171 url: url.clone(), 172 is_pds: handle.is_pds, 173 }); 174 true 175 }) 176 .await; 177 out 178 } 179 180 /// add a new firehose source at runtime, persisting it to the database. 181 /// 182 /// if a source with the same URL already exists, it is replaced: the 183 /// running task is stopped and a new one is started with the new `is_pds` 184 /// setting. existing cursor state for the URL is preserved. 185 pub async fn add_source(&self, url: Url, is_pds: bool) -> Result<()> { 186 let shared = self 187 .shared 188 .get() 189 .ok_or_else(|| miette::miette!("firehose worker not started"))?; 190 191 // persist to db first 192 let key = keys::firehose_source_key(url.as_str()); 193 tokio::task::spawn_blocking({ 194 let state = self.state.clone(); 195 move || { 196 let mut batch = state.db.inner.batch(); 197 let value = rmp_serde::to_vec(&db::FirehoseSourceMeta { is_pds }).map_err(|e| { 198 miette::miette!("failed to serialize firehose source meta: {e}") 199 })?; 200 batch.insert(&state.db.crawler, key, &value); 201 batch.commit().into_diagnostic()?; 202 state.db.persist() 203 } 204 }) 205 .await 206 .into_diagnostic()??; 207 208 let _ = self.known_sources.insert_async(url.clone(), is_pds).await; 209 210 // reset failure state so the fresh task gets a clean slate. 211 // if the previous task exited after max failures, the failure counter 212 // would otherwise cause the new task to exit immediately. 213 let throttle = self.state.throttler.get_handle(&url).await; 214 throttle.record_success(); 215 216 self.spawn_firehose_ingestor(&FirehoseSource { url, is_pds }, shared, false) 217 .await?; 218 219 Ok(()) 220 } 221 222 /// remove a firehose source at runtime. 223 /// 224 /// returns `true` if the source was found and removed, `false` otherwise. 225 /// if the source was added via the API, it is removed from the database; 226 /// if it came from the static config, only the running task is stopped. 227 pub async fn remove_source(&self, url: &Url) -> Result<bool> { 228 if self.known_sources.contains_async(url).await { 229 let url_str = url.to_string(); 230 tokio::task::spawn_blocking({ 231 let state = self.state.clone(); 232 move || { 233 state 234 .db 235 .crawler 236 .remove(keys::firehose_source_key(&url_str)) 237 .into_diagnostic()?; 238 state.db.persist() 239 } 240 }) 241 .await 242 .into_diagnostic()??; 243 self.known_sources.remove_async(url).await; 244 } 245 246 Ok(self.tasks.remove_async(url).await.is_some()) 247 } 248 249 /// restart an offline firehose source without touching the database or daily limits. 250 pub(super) async fn restart_source(&self, url: Url, is_pds: bool) -> Result<()> { 251 let shared = self 252 .shared 253 .get() 254 .ok_or_else(|| miette::miette!("firehose worker not started"))?; 255 256 // clear the failure counter so the new task isn't immediately terminated 257 let throttle = self.state.throttler.get_handle(&url).await; 258 throttle.record_success(); 259 260 self.spawn_firehose_ingestor(&FirehoseSource { url, is_pds }, shared, true) 261 .await 262 } 263 264 /// reset the stored firehose cursor for a given URL. 265 pub async fn reset_cursor(&self, url: &str) -> Result<()> { 266 let url = Url::parse(url).into_diagnostic()?; 267 let key = keys::firehose_cursor_key_from_url(&url); 268 tokio::task::spawn_blocking({ 269 let state = self.state.clone(); 270 move || { 271 state.db.cursors.remove(key).into_diagnostic()?; 272 state.db.persist() 273 } 274 }) 275 .await 276 .into_diagnostic()??; 277 278 self.state.firehose_cursors.remove_async(&url).await; 279 280 Ok(()) 281 } 282}