very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
59
fork

Configure Feed

Select the types of activity you want to include in your feed.

[all] minor refactorings

dawn 342c24a3 cbad49c0

+46 -43
+23 -19
src/config.rs
··· 7 7 use std::time::Duration; 8 8 use url::Url; 9 9 10 + /// this is for internal use only, please don't use this macro. 11 + #[doc(hidden)] 12 + #[macro_export] 13 + macro_rules! __cfg { 14 + (@val $key:expr) => { 15 + std::env::var(concat!("HYDRANT_", $key)) 16 + }; 17 + ($key:expr, $default:expr, sec) => { 18 + cfg!(@val $key) 19 + .ok() 20 + .and_then(|s| humantime::parse_duration(&s).ok()) 21 + .unwrap_or($default) 22 + }; 23 + ($key:expr, $default:expr) => { 24 + cfg!(@val $key) 25 + .ok() 26 + .and_then(|s| s.parse().ok()) 27 + .unwrap_or($default.to_owned()) 28 + .into() 29 + }; 30 + } 31 + use crate::__cfg as cfg; 32 + 10 33 /// loads `.env` from the current directory, setting any variables not already in the environment. 11 34 fn load_dotenv() { 12 35 let Ok(contents) = std::fs::read_to_string(".env") else { ··· 366 389 /// reads and builds the config from environment variables, loading `.env` first if present. 367 390 pub fn from_env() -> Result<Self> { 368 391 load_dotenv(); 369 - 370 - macro_rules! cfg { 371 - (@val $key:expr) => { 372 - std::env::var(concat!("HYDRANT_", $key)) 373 - }; 374 - ($key:expr, $default:expr, sec) => { 375 - cfg!(@val $key) 376 - .ok() 377 - .and_then(|s| humantime::parse_duration(&s).ok()) 378 - .unwrap_or($default) 379 - }; 380 - ($key:expr, $default:expr) => { 381 - cfg!(@val $key) 382 - .ok() 383 - .and_then(|s| s.parse().ok()) 384 - .unwrap_or($default.to_owned()) 385 - .into() 386 - }; 387 - } 388 392 389 393 // full_network is read first since it determines which defaults to use. 390 394 let full_network: bool = cfg!("FULL_NETWORK", false);
+2 -2
src/control/repos.rs
··· 57 57 /// control over which repositories are tracked and access to their state. 58 58 /// 59 59 /// in `filter` mode, a repo is only indexed if it either matches a signal or is 60 - /// explicitly tracked via [`ReposControl::track`]. in `full` mode all repos are indexed 61 - /// and tracking is implicit. 60 + /// explicitly tracked via [`ReposControl::track`]. in `full` mode all repos are 61 + /// indexed and tracking is implicit. 62 62 /// 63 63 /// tracking a DID that hydrant has never seen enqueues an immediate backfill. 64 64 /// tracking a DID that hydrant already knows about (but has marked untracked)
+1 -1
src/crawler/list_repos.rs
··· 381 381 let valid_set: std::collections::HashSet<&Did<'static>> = valid.iter().collect(); 382 382 Ok(in_flight 383 383 .into_iter() 384 - .filter(|g| valid_set.contains(&**g)) 384 + .filter(|g| valid_set.contains(g.as_did())) 385 385 .collect()) 386 386 } 387 387 }
+6
src/crawler/mod.rs
··· 49 49 pub(super) did: Did<'static>, 50 50 } 51 51 52 + impl InFlightGuard { 53 + fn as_did(&self) -> &Did<'static> { 54 + &self.did 55 + } 56 + } 57 + 52 58 impl std::ops::Deref for InFlightGuard { 53 59 type Target = Did<'static>; 54 60 fn deref(&self) -> &Did<'static> {
+1 -1
src/db/ephemeral.rs
··· 32 32 let cutoff_key = keys::event_watermark_key(cutoff_ts); 33 33 let cutoff_event_id = db 34 34 .cursors 35 - .range(..=cutoff_key.clone()) 35 + .range(..=cutoff_key.as_slice()) 36 36 .next_back() 37 37 .map(|g| g.into_inner().into_diagnostic()) 38 38 .transpose()?
+13 -20
src/main.rs
··· 1 + use futures::FutureExt; 1 2 use hydrant::config::Config; 2 3 use hydrant::control::Hydrant; 3 4 use mimalloc::MiMalloc; ··· 10 11 11 12 impl AppConfig { 12 13 fn from_env() -> Self { 13 - macro_rules! cfg { 14 - ($key:expr, $default:expr) => { 15 - std::env::var(concat!("HYDRANT_", $key)) 16 - .ok() 17 - .and_then(|s| s.parse().ok()) 18 - .unwrap_or($default) 19 - }; 20 - } 14 + use hydrant::__cfg as cfg; 21 15 let api_port = cfg!("API_PORT", 3000u16); 22 16 let enable_debug = cfg!("ENABLE_DEBUG", false); 23 - let debug_port = cfg!("DEBUG_PORT", api_port + 1); 17 + let debug_port: u16 = api_port + 1; 18 + let debug_port = cfg!("DEBUG_PORT", debug_port); 24 19 Self { 25 20 api_port, 26 21 enable_debug, ··· 48 43 49 44 let hydrant = Hydrant::new(cfg).await?; 50 45 51 - if app.enable_debug { 52 - tokio::select! { 53 - r = hydrant.run()? => r, 54 - r = hydrant.serve(app.api_port) => r, 55 - r = hydrant.serve_debug(app.debug_port) => r, 56 - } 57 - } else { 58 - tokio::select! { 59 - r = hydrant.run()? => r, 60 - r = hydrant.serve(app.api_port) => r, 61 - } 46 + let debug_fut = app 47 + .enable_debug 48 + .then(|| hydrant.serve_debug(app.debug_port).boxed()) 49 + .unwrap_or_else(|| std::future::pending().boxed()); 50 + 51 + tokio::select! { 52 + r = hydrant.run()? => r, 53 + r = hydrant.serve(app.api_port) => r, 54 + r = debug_fut => r, 62 55 } 63 56 }