very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[api,ingest,crawler] add a way to pause / resume crawler and firehose at runtime

dawn ec1b1027 65621074

+219 -93
+50
src/api/ingestion.rs
··· 1 + use std::sync::Arc; 2 + 3 + use crate::state::AppState; 4 + use axum::{ 5 + Json, Router, 6 + extract::State, 7 + http::StatusCode, 8 + routing::{get, patch}, 9 + }; 10 + use serde::{Deserialize, Serialize}; 11 + 12 + pub fn router() -> Router<Arc<AppState>> { 13 + Router::new() 14 + .route("/ingestion", get(get_ingestion)) 15 + .route("/ingestion", patch(patch_ingestion)) 16 + } 17 + 18 + #[derive(Serialize)] 19 + pub struct IngestionStatus { 20 + pub crawler: bool, 21 + pub firehose: bool, 22 + } 23 + 24 + pub async fn get_ingestion(State(state): State<Arc<AppState>>) -> Json<IngestionStatus> { 25 + Json(IngestionStatus { 26 + crawler: *state.crawler_enabled.borrow(), 27 + firehose: *state.firehose_enabled.borrow(), 28 + }) 29 + } 30 + 31 + #[derive(Deserialize)] 32 + pub struct IngestionPatch { 33 + #[serde(default)] 34 + pub crawler: Option<bool>, 35 + #[serde(default)] 36 + pub firehose: Option<bool>, 37 + } 38 + 39 + pub async fn patch_ingestion( 40 + State(state): State<Arc<AppState>>, 41 + Json(body): Json<IngestionPatch>, 42 + ) -> StatusCode { 43 + if let Some(crawler) = body.crawler { 44 + state.crawler_enabled.send_replace(crawler); 45 + } 46 + if let Some(firehose) = body.firehose { 47 + state.firehose_enabled.send_replace(firehose); 48 + } 49 + StatusCode::OK 50 + }
+4 -23
src/api/mod.rs
··· 1 1 use crate::state::AppState; 2 - use axum::extract::State; 3 - use axum::routing::post; 4 2 use axum::{Router, routing::get}; 5 - use futures::FutureExt; 6 - use miette::IntoDiagnostic; 7 - use reqwest::StatusCode; 8 3 use std::{net::SocketAddr, sync::Arc}; 9 4 use tower_http::cors::CorsLayer; 10 5 use tower_http::trace::TraceLayer; 11 6 12 7 mod debug; 13 8 mod filter; 9 + mod ingestion; 14 10 mod repos; 15 11 mod stats; 16 12 mod stream; 13 + mod train_dict; 17 14 mod xrpc; 18 15 19 16 pub async fn serve(state: Arc<AppState>, port: u16) -> miette::Result<()> { 20 17 let app = Router::new() 21 18 .route("/health", get(|| async { "OK" })) 22 19 .route("/stats", get(stats::get_stats)) 23 - .route("/_train_dict", post(handle_train_dict)) 24 20 .nest("/stream", stream::router()) 25 21 .merge(xrpc::router()) 26 22 .merge(filter::router()) 27 23 .merge(repos::router()) 24 + .merge(ingestion::router()) 25 + .merge(train_dict::router()) 28 26 .with_state(state) 29 27 .layer(TraceLayer::new_for_http()) 30 28 .layer(CorsLayer::permissive()); ··· 43 41 .map_err(|e| miette::miette!("axum server error: {e}"))?; 44 42 45 43 Ok(()) 46 - } 47 - 48 - pub async fn handle_train_dict( 49 - State(state): State<Arc<AppState>>, 50 - ) -> Result<StatusCode, StatusCode> { 51 - let train = |name: &'static str| { 52 - let db = state.db.clone(); 53 - tokio::task::spawn_blocking(move || db.train_dict(name)) 54 - .map(|res| res.into_diagnostic().flatten()) 55 - }; 56 - let repos = train("repos"); 57 - let blocks = train("blocks"); 58 - let events = train("events"); 59 - 60 - tokio::try_join!(repos, blocks, events).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 61 - 62 - Ok(StatusCode::OK) 63 44 } 64 45 65 46 pub async fn serve_debug(state: Arc<AppState>, port: u16) -> miette::Result<()> {
+27
src/api/train_dict.rs
··· 1 + use std::sync::Arc; 2 + 3 + use crate::state::AppState; 4 + use axum::{Router, extract::State, http::StatusCode, routing::post}; 5 + use futures::FutureExt; 6 + use miette::IntoDiagnostic; 7 + 8 + pub fn router() -> Router<Arc<AppState>> { 9 + Router::new().route("/train_dict", post(handle_train_dict)) 10 + } 11 + 12 + pub async fn handle_train_dict( 13 + State(state): State<Arc<AppState>>, 14 + ) -> Result<StatusCode, StatusCode> { 15 + let train = |name: &'static str| { 16 + let db = state.db.clone(); 17 + tokio::task::spawn_blocking(move || db.train_dict(name)) 18 + .map(|res| res.into_diagnostic().flatten()) 19 + }; 20 + let repos = train("repos"); 21 + let blocks = train("blocks"); 22 + let events = train("events"); 23 + 24 + tokio::try_join!(repos, blocks, events).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 25 + 26 + Ok(StatusCode::OK) 27 + }
+7
src/crawler/mod.rs
··· 3 3 use crate::db::{Db, keys, ser_repo_state}; 4 4 use crate::state::AppState; 5 5 use crate::types::RepoState; 6 + use crate::util::WatchEnabledExt; 6 7 use crate::util::{ErrorForStatus, RetryOutcome, RetryWithBackoff, parse_retry_after}; 7 8 use chrono::{DateTime, TimeDelta, Utc}; 8 9 use futures::FutureExt; ··· 22 23 use std::sync::Arc; 23 24 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; 24 25 use std::time::Duration; 26 + use tokio::sync::watch; 25 27 use tracing::{Instrument, debug, error, info, trace, warn}; 26 28 use url::Url; 27 29 ··· 180 182 throttled: AtomicBool, 181 183 pds_throttler: Throttler, 182 184 in_flight: InFlight, 185 + enabled: watch::Receiver<bool>, 183 186 } 184 187 185 188 impl Crawler { ··· 188 191 relay_hosts: Vec<Url>, 189 192 max_pending: usize, 190 193 resume_pending: usize, 194 + enabled: watch::Receiver<bool>, 191 195 ) -> Self { 192 196 let http = reqwest::Client::builder() 193 197 .user_agent(concat!( ··· 210 214 throttled: AtomicBool::new(false), 211 215 pds_throttler: Throttler::new(), 212 216 in_flight: Arc::new(HashSet::new()), 217 + enabled, 213 218 } 214 219 } 215 220 ··· 340 345 let mut tasks = tokio::task::JoinSet::new(); 341 346 for url in crawler.relays.clone() { 342 347 let crawler = crawler.clone(); 348 + let mut enabled = crawler.enabled.clone(); 343 349 let span = tracing::info_span!("crawl", %url); 344 350 tasks.spawn( 345 351 async move { 346 352 loop { 353 + enabled.wait_enabled("crawler").await; 347 354 if let Err(e) = Self::crawl(crawler.clone(), &url).await { 348 355 error!(err = ?e, "fatal error, restarting in 30s"); 349 356 tokio::time::sleep(Duration::from_secs(30)).await;
+36 -16
src/ingest/firehose.rs
··· 3 3 use crate::ingest::stream::{FirehoseStream, SubscribeReposMessage, decode_frame}; 4 4 use crate::ingest::{BufferTx, IngestMessage}; 5 5 use crate::state::AppState; 6 + use crate::util::WatchEnabledExt; 6 7 use jacquard_common::IntoStatic; 7 8 use jacquard_common::types::did::Did; 8 9 use miette::{IntoDiagnostic, Result}; 9 10 use std::sync::Arc; 10 11 use std::time::Duration; 12 + use tokio::sync::watch; 11 13 use tracing::{Span, debug, error, info, trace}; 12 14 use url::Url; 13 15 ··· 16 18 buffer_tx: BufferTx, 17 19 relay_host: Url, 18 20 filter: FilterHandle, 21 + enabled: watch::Receiver<bool>, 19 22 _verify_signatures: bool, 20 23 } 21 24 ··· 25 28 buffer_tx: BufferTx, 26 29 relay_host: Url, 27 30 filter: FilterHandle, 31 + enabled: watch::Receiver<bool>, 28 32 verify_signatures: bool, 29 33 ) -> Self { 30 34 Self { ··· 32 36 buffer_tx, 33 37 relay_host, 34 38 filter, 39 + enabled, 35 40 _verify_signatures: verify_signatures, 36 41 } 37 42 } 38 43 39 44 #[tracing::instrument(skip(self), fields(relay = %self.relay_host))] 40 - pub async fn run(self) -> Result<()> { 45 + pub async fn run(mut self) -> Result<()> { 41 46 loop { 47 + self.enabled.wait_enabled("firehose").await; 48 + 42 49 let start_cursor = db::get_firehose_cursor(&self.state.db, &self.relay_host).await?; 43 50 44 51 match start_cursor { ··· 58 65 59 66 info!("firehose connected"); 60 67 61 - while let Some(bytes_res) = stream.next().await { 62 - let bytes = match bytes_res { 63 - Ok(b) => b, 64 - Err(e) => { 65 - error!(err = %e, "firehose stream error"); 66 - break; 68 + let disconnected_by_error = loop { 69 + tokio::select! { 70 + msg = stream.next() => { 71 + let Some(bytes_res) = msg else { break true; }; 72 + let bytes = match bytes_res { 73 + Ok(b) => b, 74 + Err(e) => { 75 + error!(err = %e, "firehose stream error"); 76 + break true; 77 + } 78 + }; 79 + match decode_frame(&bytes) { 80 + Ok(msg) => self.handle_message(msg).await, 81 + Err(e) => { 82 + error!(err = %e, "firehose stream error"); 83 + break true; 84 + } 85 + } 67 86 } 68 - }; 69 - match decode_frame(&bytes) { 70 - Ok(msg) => self.handle_message(msg).await, 71 - Err(e) => { 72 - error!(err = %e, "firehose stream error"); 73 - break; 87 + _ = self.enabled.changed() => { 88 + if !*self.enabled.borrow() { 89 + info!("firehose disabled, disconnecting"); 90 + break false; 91 + } 74 92 } 75 93 } 94 + }; 95 + 96 + if disconnected_by_error { 97 + error!("firehose disconnected, reconnecting in 5s..."); 98 + tokio::time::sleep(Duration::from_secs(5)).await; 76 99 } 77 - 78 - error!("firehose disconnected, reconnecting in 5s..."); 79 - tokio::time::sleep(Duration::from_secs(5)).await; 80 100 } 81 101 } 82 102
+56 -53
src/main.rs
··· 196 196 } 197 197 }); 198 198 199 - info!("starting crawler ({:?})", state.filter.load().mode); 200 - let state_for_crawler = state.clone(); 199 + let post_patch_crawler = match cfg.enable_crawler { 200 + Some(b) => b, 201 + None => state.filter.load().mode == hydrant::filter::FilterMode::Full, 202 + }; 203 + state.crawler_enabled.send_replace(post_patch_crawler); 204 + 205 + info!( 206 + crawler_enabled = *state.crawler_enabled.borrow(), 207 + firehose_enabled = *state.firehose_enabled.borrow(), 208 + filter_mode = ?state.filter.load().mode, 209 + "starting ingestion" 210 + ); 211 + 201 212 let relay_hosts = cfg.relays.clone(); 202 213 let crawler_max_pending = cfg.crawler_max_pending_repos; 203 214 let crawler_resume_pending = cfg.crawler_resume_pending_repos; 204 215 205 - let should_run_crawler = match cfg.enable_crawler { 206 - Some(true) => true, 207 - Some(false) => false, 208 - None => state.filter.load().mode == hydrant::filter::FilterMode::Full, 209 - }; 210 - 211 - if should_run_crawler { 216 + if !relay_hosts.is_empty() { 217 + let state_for_crawler = state.clone(); 218 + let crawler_rx = state.crawler_enabled.subscribe(); 212 219 info!( 213 220 relay_count = relay_hosts.len(), 214 - hosts = ?relay_hosts, 215 - "spawning crawler" 221 + hosts = relay_hosts 222 + .iter() 223 + .map(|h| h.as_str()) 224 + .collect::<Vec<_>>() 225 + .join(", "), 226 + enabled = *state.crawler_enabled.borrow(), 227 + "starting crawler(s)" 216 228 ); 217 229 tokio::spawn(async move { 218 230 let crawler = hydrant::crawler::Crawler::new( ··· 220 232 relay_hosts, 221 233 crawler_max_pending, 222 234 crawler_resume_pending, 235 + crawler_rx, 223 236 ); 224 237 if let Err(e) = crawler.run().await { 225 238 error!(err = %e, "crawler error"); 226 239 db::check_poisoned_report(&e); 227 240 } 228 241 }); 229 - } else { 230 - info!("crawler disabled by config or filter mode"); 231 242 } 232 243 233 - let mut tasks = if cfg.enable_firehose { 234 - let firehose_worker = std::thread::spawn({ 235 - let state = state.clone(); 236 - let handle = tokio::runtime::Handle::current(); 237 - move || { 238 - FirehoseWorker::new( 239 - state, 240 - buffer_rx, 241 - matches!(cfg.verify_signatures, SignatureVerification::Full), 242 - cfg.ephemeral, 243 - cfg.firehose_workers, 244 - ) 245 - .run(handle) 246 - } 247 - }); 248 - 249 - let mut t: Vec<BoxFuture<miette::Result<()>>> = vec![Box::pin( 250 - tokio::task::spawn_blocking(move || { 251 - firehose_worker 252 - .join() 253 - .map_err(|e| miette::miette!("buffer processor died: {e:?}")) 254 - }) 255 - .map(|r| r.into_diagnostic().flatten().flatten()), 256 - )]; 257 - 258 - for relay_url in &cfg.relays { 259 - let ingestor = FirehoseIngestor::new( 260 - state.clone(), 261 - buffer_tx.clone(), 262 - relay_url.clone(), 263 - state.filter.clone(), 244 + let firehose_worker = std::thread::spawn({ 245 + let state = state.clone(); 246 + let handle = tokio::runtime::Handle::current(); 247 + move || { 248 + FirehoseWorker::new( 249 + state, 250 + buffer_rx, 264 251 matches!(cfg.verify_signatures, SignatureVerification::Full), 265 - ); 266 - t.push(Box::pin(ingestor.run())); 252 + cfg.ephemeral, 253 + cfg.firehose_workers, 254 + ) 255 + .run(handle) 267 256 } 257 + }); 268 258 269 - t 270 - } else { 271 - info!("firehose ingestion disabled by config"); 272 - // if firehose is disabled, we just wait indefinitely (or until signal) 273 - // essentially we just want to keep the main thread alive for the other components 274 - vec![Box::pin(futures::future::pending::<miette::Result<()>>()) as BoxFuture<_>] 275 - }; 259 + let mut tasks: Vec<BoxFuture<miette::Result<()>>> = vec![Box::pin( 260 + tokio::task::spawn_blocking(move || { 261 + firehose_worker 262 + .join() 263 + .map_err(|e| miette::miette!("buffer processor died: {e:?}")) 264 + }) 265 + .map(|r| r.into_diagnostic().flatten().flatten()), 266 + )]; 267 + 268 + for relay_url in &cfg.relays { 269 + let ingestor = FirehoseIngestor::new( 270 + state.clone(), 271 + buffer_tx.clone(), 272 + relay_url.clone(), 273 + state.filter.clone(), 274 + state.firehose_enabled.subscribe(), 275 + matches!(cfg.verify_signatures, SignatureVerification::Full), 276 + ); 277 + tasks.push(Box::pin(ingestor.run())); 278 + } 276 279 277 280 let state_api = state.clone(); 278 281 tasks.push(Box::pin(async move {
+17 -1
src/state.rs
··· 2 2 use std::sync::atomic::AtomicI64; 3 3 4 4 use miette::Result; 5 - use tokio::sync::Notify; 5 + use tokio::sync::{Notify, watch}; 6 6 use url::Url; 7 7 8 8 use crate::{ ··· 18 18 pub filter: FilterHandle, 19 19 pub relay_cursors: HashMap<Url, AtomicI64>, 20 20 pub backfill_notify: Notify, 21 + /// Controls whether the crawler is running. Receivers are held by crawler tasks. 22 + pub crawler_enabled: watch::Sender<bool>, 23 + /// Controls whether firehose ingestion is running. Receivers are held by ingestor tasks. 24 + pub firehose_enabled: watch::Sender<bool>, 21 25 } 22 26 23 27 impl AppState { ··· 25 29 let db = Db::open(config)?; 26 30 let resolver = Resolver::new(config.plc_urls.clone(), config.identity_cache_size); 27 31 let filter_config = crate::db::filter::load(&db.filter)?; 32 + 33 + let crawler_default = match config.enable_crawler { 34 + Some(b) => b, 35 + // default: enabled in full-network mode, disabled in filter mode 36 + None => filter_config.mode == crate::filter::FilterMode::Full, 37 + }; 38 + 28 39 let filter = new_handle(filter_config); 29 40 30 41 let relay_cursors = config ··· 33 44 .map(|url| (url.clone(), AtomicI64::new(0))) 34 45 .collect(); 35 46 47 + let (crawler_enabled, _) = watch::channel(crawler_default); 48 + let (firehose_enabled, _) = watch::channel(config.enable_firehose); 49 + 36 50 Ok(Self { 37 51 db, 38 52 resolver, 39 53 filter, 40 54 relay_cursors, 41 55 backfill_notify: Notify::new(), 56 + crawler_enabled, 57 + firehose_enabled, 42 58 }) 43 59 } 44 60
+22
src/util.rs
··· 3 3 use rand::RngExt; 4 4 use reqwest::StatusCode; 5 5 use serde::{Deserialize, Deserializer, Serializer}; 6 + use tokio::sync::watch; 7 + use tracing::info; 6 8 7 9 /// outcome of [`RetryWithBackoff::retry`] when the operation does not succeed. 8 10 pub enum RetryOutcome<E> { ··· 51 53 F: FnMut() -> Fut, 52 54 Fut: Future<Output = Result<T, E>>, 53 55 { 56 + } 57 + 58 + /// extension trait that adds `.wait_enabled()` to `watch::Receiver<bool>`. 59 + /// 60 + /// waits until the value becomes `true`, logging once when paused and once when resumed. 61 + pub trait WatchEnabledExt { 62 + #[allow(async_fn_in_trait)] 63 + async fn wait_enabled(&mut self, component: &'static str); 64 + } 65 + 66 + impl WatchEnabledExt for watch::Receiver<bool> { 67 + async fn wait_enabled(&mut self, component: &'static str) { 68 + if !*self.borrow() { 69 + info!("{component} paused"); 70 + while !*self.borrow() { 71 + let _ = self.changed().await; 72 + } 73 + info!("{component} resumed"); 74 + } 75 + } 54 76 } 55 77 56 78 /// extension trait that adds `.error_for_status()` to futures returning a reqwest `Response`.