very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
59
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 264 lines 9.5 kB view raw
1use std::sync::Arc; 2 3use miette::{IntoDiagnostic, Result}; 4use tokio::sync::{mpsc, watch}; 5use tracing::{error, info}; 6use url::Url; 7 8use crate::db::keys; 9use crate::state::AppState; 10 11pub(super) struct ProducerHandle { 12 mode: crate::config::CrawlerMode, 13 abort: tokio::task::AbortHandle, 14} 15 16impl Drop for ProducerHandle { 17 fn drop(&mut self) { 18 self.abort.abort(); 19 } 20} 21 22pub(super) struct CrawlerShared { 23 pub(super) http: reqwest::Client, 24 pub(super) checker: crate::crawler::SignalChecker, 25 pub(super) in_flight: crate::crawler::InFlight, 26 pub(super) tx: mpsc::Sender<crate::crawler::CrawlerBatch>, 27 pub(super) stats: crate::crawler::CrawlerStats, 28} 29 30/// a snapshot of a single crawler source's runtime state. 31#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] 32pub struct CrawlerSourceInfo { 33 pub url: Url, 34 pub mode: crate::config::CrawlerMode, 35} 36 37pub(super) fn spawn_crawler_producer( 38 source: &crate::config::CrawlerSource, 39 http: &reqwest::Client, 40 state: &Arc<AppState>, 41 checker: &crate::crawler::SignalChecker, 42 in_flight: &crate::crawler::InFlight, 43 tx: &mpsc::Sender<crate::crawler::CrawlerBatch>, 44 stats: &crate::crawler::CrawlerStats, 45 enabled: watch::Receiver<bool>, 46) -> ProducerHandle { 47 use crate::config::CrawlerMode; 48 use crate::crawler::{ByCollectionProducer, ListReposProducer}; 49 use std::time::Duration; 50 use tracing::Instrument; 51 52 let abort = match source.mode { 53 CrawlerMode::ListRepos => { 54 info!(relay = %source.url, enabled = *state.crawler_enabled.borrow(), "starting relay crawler"); 55 let span = tracing::info_span!("crawl", url = %source.url); 56 tokio::spawn( 57 ListReposProducer { 58 url: source.url.clone(), 59 checker: checker.clone(), 60 in_flight: in_flight.clone(), 61 tx: tx.clone(), 62 enabled, 63 stats: stats.clone(), 64 } 65 .run() 66 .instrument(span), 67 ) 68 .abort_handle() 69 } 70 CrawlerMode::ByCollection => { 71 info!( 72 host = source.url.host_str(), 73 enabled = *state.crawler_enabled.borrow(), 74 "starting by-collection crawler" 75 ); 76 let span = tracing::info_span!("by_collection", host = source.url.host_str()); 77 let http = http.clone(); 78 let state = state.clone(); 79 let in_flight = in_flight.clone(); 80 let tx = tx.clone(); 81 let stats = stats.clone(); 82 let url = source.url.clone(); 83 tokio::spawn( 84 async move { 85 loop { 86 let producer = ByCollectionProducer { 87 index_url: url.clone(), 88 http: http.clone(), 89 state: state.clone(), 90 in_flight: in_flight.clone(), 91 tx: tx.clone(), 92 enabled: enabled.clone(), 93 stats: stats.clone(), 94 }; 95 if let Err(e) = producer.run().await { 96 error!(err = ?e, "by-collection crawler fatal error, restarting in 30s"); 97 tokio::time::sleep(Duration::from_secs(30)).await; 98 } 99 } 100 } 101 .instrument(span), 102 ) 103 .abort_handle() 104 } 105 }; 106 ProducerHandle { 107 mode: source.mode, 108 abort, 109 } 110} 111 112/// runtime control over the crawler component. 113/// 114/// the crawler walks `com.atproto.sync.listRepos` on each configured relay to discover 115/// repositories that have never emitted a firehose event. in `filter` mode it also 116/// checks each discovered repo against the configured signal collections before 117/// enqueuing it for backfill. 118/// 119/// disabling the crawler does not affect in-progress repo checks. each one completes 120/// its current PDS request before pausing. 121#[derive(Clone)] 122pub struct CrawlerHandle { 123 pub(super) state: Arc<AppState>, 124 /// set once by [`Hydrant::run`]; `None` means run() has not been called yet. 125 pub(super) shared: Arc<std::sync::OnceLock<CrawlerShared>>, 126 /// per-source running tasks, keyed by url. 127 pub(super) tasks: Arc<scc::HashMap<Url, ProducerHandle>>, 128 /// set of urls persisted in the database (dynamically added sources). 129 pub(super) persisted: Arc<scc::HashSet<Url>>, 130} 131 132impl CrawlerHandle { 133 /// enable the crawler (enables all configured producers). no-op if already enabled. 134 pub fn enable(&self) { 135 self.state.crawler_enabled.send_replace(true); 136 } 137 /// disable the crawler (disables all configured producers). 138 /// in-progress repo checks finish before the crawler pauses. 139 pub fn disable(&self) { 140 self.state.crawler_enabled.send_replace(false); 141 } 142 /// returns the current enabled state of the crawler. 143 pub fn is_enabled(&self) -> bool { 144 *self.state.crawler_enabled.borrow() 145 } 146 147 /// delete all cursor entries associated with the given URL. 148 pub async fn reset_cursor(&self, url: &str) -> Result<()> { 149 let state = self.state.clone(); 150 let point_keys = [keys::crawler_cursor_key(url)]; 151 let by_collection_prefix = keys::by_collection_cursor_prefix(url); 152 tokio::task::spawn_blocking(move || { 153 let mut batch = state.db.inner.batch(); 154 for k in point_keys { 155 batch.remove(&state.db.cursors, k); 156 } 157 for entry in state.db.cursors.prefix(&by_collection_prefix) { 158 let k = entry.key().into_diagnostic()?; 159 batch.remove(&state.db.cursors, k); 160 } 161 batch.commit().into_diagnostic()?; 162 state.db.persist() 163 }) 164 .await 165 .into_diagnostic()??; 166 Ok(()) 167 } 168 169 /// return info on all currently active crawler sources. 170 /// 171 /// returns an empty list if called before [`Hydrant::run`]. 172 pub async fn list_sources(&self) -> Vec<CrawlerSourceInfo> { 173 let mut sources = Vec::new(); 174 self.tasks 175 .iter_async(|url, h| { 176 sources.push(CrawlerSourceInfo { 177 url: url.clone(), 178 mode: h.mode, 179 }); 180 true 181 }) 182 .await; 183 sources 184 } 185 186 /// add a new crawler source at runtime. 187 /// 188 /// the source is persisted to the database and will be re-spawned on restart. 189 /// if a source with the same URL already exists, it is replaced (the old task is 190 /// aborted and a new one is started with the new mode). 191 /// 192 /// returns an error if called before [`Hydrant::run`]. 193 pub async fn add_source(&self, source: crate::config::CrawlerSource) -> Result<()> { 194 let Some(shared) = self.shared.get() else { 195 miette::bail!("crawler not yet started: call Hydrant::run() first"); 196 }; 197 198 let state = self.state.clone(); 199 let key = keys::crawler_source_key(source.url.as_str()); 200 let val = rmp_serde::to_vec(&source.mode).into_diagnostic()?; 201 tokio::task::spawn_blocking(move || { 202 state.db.crawler.insert(key, val).into_diagnostic()?; 203 state.db.persist() 204 }) 205 .await 206 .into_diagnostic()??; 207 208 let enabled_rx = self.state.crawler_enabled.subscribe(); 209 let handle = spawn_crawler_producer( 210 &source, 211 &shared.http, 212 &self.state, 213 &shared.checker, 214 &shared.in_flight, 215 &shared.tx, 216 &shared.stats, 217 enabled_rx, 218 ); 219 220 let _ = self.persisted.insert_async(source.url.clone()).await; 221 match self.tasks.entry_async(source.url).await { 222 scc::hash_map::Entry::Vacant(e) => { 223 e.insert_entry(handle); 224 } 225 scc::hash_map::Entry::Occupied(mut e) => { 226 *e.get_mut() = handle; 227 } 228 } 229 Ok(()) 230 } 231 232 /// remove a crawler source at runtime by URL. 233 /// 234 /// aborts the running producer task and removes the source from the database if it 235 /// was dynamically added. config-sourced entries are aborted but not persisted, so 236 /// they will reappear on restart. 237 /// 238 /// returns `true` if a source with the given URL was found and removed. 239 /// returns an error if called before [`Hydrant::run`]. 240 pub async fn remove_source(&self, url: &Url) -> Result<bool> { 241 if self.shared.get().is_none() { 242 miette::bail!("crawler not yet started: call Hydrant::run() first"); 243 } 244 245 // dropping the ProducerHandle aborts the task via Drop 246 if self.tasks.remove_async(url).await.is_none() { 247 return Ok(false); 248 } 249 250 // remove from DB if it was a persisted source 251 if self.persisted.remove_async(url).await.is_some() { 252 let state = self.state.clone(); 253 let key = keys::crawler_source_key(url.as_str()); 254 tokio::task::spawn_blocking(move || { 255 state.db.crawler.remove(key).into_diagnostic()?; 256 state.db.persist() 257 }) 258 .await 259 .into_diagnostic()??; 260 } 261 262 Ok(true) 263 } 264}