very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[all] make types that dont need to be pub pub(crate)

dawn 9952f51d 7dd2efc7

+50 -51
-28
src/config.rs
··· 553 553 } 554 554 } 555 555 556 - #[derive(Debug, Clone)] 557 - pub struct AppConfig { 558 - pub api_port: u16, 559 - pub enable_debug: bool, 560 - pub debug_port: u16, 561 - } 562 - 563 - impl AppConfig { 564 - pub fn from_env() -> Self { 565 - macro_rules! cfg { 566 - ($key:expr, $default:expr) => { 567 - std::env::var(concat!("HYDRANT_", $key)) 568 - .ok() 569 - .and_then(|s| s.parse().ok()) 570 - .unwrap_or($default) 571 - }; 572 - } 573 - let api_port = cfg!("API_PORT", 3000u16); 574 - let enable_debug = cfg!("ENABLE_DEBUG", false); 575 - let debug_port = cfg!("DEBUG_PORT", api_port + 1); 576 - Self { 577 - api_port, 578 - enable_debug, 579 - debug_port, 580 - } 581 - } 582 - } 583 - 584 556 macro_rules! config_line { 585 557 ($f:expr, $label:expr, $value:expr) => { 586 558 writeln!($f, " {:<width$}{}", $label, $value, width = LABEL_WIDTH)
+3 -3
src/control/filter.rs
··· 148 148 /// if set, replaces the current indexing mode. 149 149 pub mode: Option<FilterMode>, 150 150 /// if set, replaces or patches the signals set. 151 - pub signals: Option<SetUpdate>, 151 + pub(crate) signals: Option<SetUpdate>, 152 152 /// if set, replaces or patches the collections set. 153 - pub collections: Option<SetUpdate>, 153 + pub(crate) collections: Option<SetUpdate>, 154 154 /// if set, replaces or patches the excludes set. 155 - pub excludes: Option<SetUpdate>, 155 + pub(crate) excludes: Option<SetUpdate>, 156 156 } 157 157 158 158 impl FilterPatch {
+7 -7
src/db/mod.rs
··· 50 50 pub crawler: Keyspace, 51 51 #[cfg(feature = "backlinks")] 52 52 pub backlinks: Keyspace, 53 - pub event_tx: broadcast::Sender<BroadcastEvent>, 53 + pub(crate) event_tx: broadcast::Sender<BroadcastEvent>, 54 54 pub next_event_id: Arc<AtomicU64>, 55 55 pub counts_map: HashMap<SmolStr, u64>, 56 56 } ··· 617 617 .unwrap_or(0) 618 618 } 619 619 620 - pub fn update_gauge_diff( 620 + pub(crate) fn update_gauge_diff( 621 621 &self, 622 622 old: &crate::types::GaugeState, 623 623 new: &crate::types::GaugeState, ··· 625 625 update_gauge_diff_impl!(self, old, new, update_count); 626 626 } 627 627 628 - pub async fn update_gauge_diff_async( 628 + pub(crate) async fn update_gauge_diff_async( 629 629 &self, 630 630 old: &crate::types::GaugeState, 631 631 new: &crate::types::GaugeState, ··· 633 633 update_gauge_diff_impl!(self, old, new, update_count_async, await); 634 634 } 635 635 636 - pub fn update_repo_state<F, T>( 636 + pub(crate) fn update_repo_state<F, T>( 637 637 batch: &mut OwnedWriteBatch, 638 638 repos: &Keyspace, 639 639 did: &Did<'_>, ··· 655 655 } 656 656 } 657 657 658 - pub async fn update_repo_state_async<F, T>( 658 + pub(crate) async fn update_repo_state_async<F, T>( 659 659 &self, 660 660 did: &Did<'_>, 661 661 f: F, ··· 681 681 .into_diagnostic()? 682 682 } 683 683 684 - pub fn repo_gauge_state( 684 + pub(crate) fn repo_gauge_state( 685 685 repo_state: &RepoState, 686 686 resync_bytes: Option<&[u8]>, 687 687 ) -> crate::types::GaugeState { ··· 707 707 } 708 708 } 709 709 710 - pub async fn repo_gauge_state_async( 710 + pub(crate) async fn repo_gauge_state_async( 711 711 &self, 712 712 repo_state: &RepoState<'_>, 713 713 did_key: &[u8],
+4 -4
src/filter.rs
··· 2 2 use serde::{Deserialize, Serialize}; 3 3 use std::sync::Arc; 4 4 5 - pub type FilterHandle = Arc<arc_swap::ArcSwap<FilterConfig>>; 5 + pub(crate) type FilterHandle = Arc<arc_swap::ArcSwap<FilterConfig>>; 6 6 7 - pub fn new_handle(config: FilterConfig) -> FilterHandle { 7 + pub(crate) fn new_handle(config: FilterConfig) -> FilterHandle { 8 8 Arc::new(arc_swap::ArcSwap::new(Arc::new(config))) 9 9 } 10 10 11 11 /// apply a bool patch or set replacement for a single set update. 12 12 #[derive(Debug, Clone, Serialize, Deserialize)] 13 13 #[serde(untagged)] 14 - pub enum SetUpdate { 14 + pub(crate) enum SetUpdate { 15 15 /// replace the entire set with this list 16 16 Set(Vec<String>), 17 17 /// patch: true = add, false = remove ··· 26 26 } 27 27 28 28 #[derive(Debug, Clone, Serialize)] 29 - pub struct FilterConfig { 29 + pub(crate) struct FilterConfig { 30 30 pub mode: FilterMode, 31 31 pub signals: Vec<Nsid<'static>>, 32 32 pub collections: Vec<Nsid<'static>>,
+28 -1
src/main.rs
··· 1 - use hydrant::config::{AppConfig, Config}; 1 + use hydrant::config::Config; 2 2 use hydrant::control::Hydrant; 3 3 use mimalloc::MiMalloc; 4 + 5 + struct AppConfig { 6 + api_port: u16, 7 + enable_debug: bool, 8 + debug_port: u16, 9 + } 10 + 11 + impl AppConfig { 12 + fn from_env() -> Self { 13 + macro_rules! cfg { 14 + ($key:expr, $default:expr) => { 15 + std::env::var(concat!("HYDRANT_", $key)) 16 + .ok() 17 + .and_then(|s| s.parse().ok()) 18 + .unwrap_or($default) 19 + }; 20 + } 21 + let api_port = cfg!("API_PORT", 3000u16); 22 + let enable_debug = cfg!("ENABLE_DEBUG", false); 23 + let debug_port = cfg!("DEBUG_PORT", api_port + 1); 24 + Self { 25 + api_port, 26 + enable_debug, 27 + debug_port, 28 + } 29 + } 30 + } 4 31 5 32 #[global_allocator] 6 33 static GLOBAL: MiMalloc = MiMalloc;
+1 -1
src/state.rs
··· 15 15 pub struct AppState { 16 16 pub db: Db, 17 17 pub resolver: Resolver, 18 - pub filter: FilterHandle, 18 + pub(crate) filter: FilterHandle, 19 19 /// per-relay firehose cursors. values use interior mutability so they can be 20 20 /// updated through the lock-free `peek_with` reads in the ingest worker. 21 21 pub relay_cursors: scc::HashIndex<Url, AtomicI64>,
+7 -7
src/types.rs
··· 37 37 38 38 #[derive(Debug, Clone, Serialize, Deserialize)] 39 39 #[serde(bound(deserialize = "'i: 'de"))] 40 - pub struct RepoState<'i> { 40 + pub(crate) struct RepoState<'i> { 41 41 pub status: RepoStatus, 42 42 pub rev: Option<DbTid>, 43 43 pub data: Option<IpldCid>, ··· 129 129 } 130 130 131 131 #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] 132 - pub enum ResyncErrorKind { 132 + pub(crate) enum ResyncErrorKind { 133 133 Ratelimited, 134 134 Transport, 135 135 Generic, 136 136 } 137 137 138 138 #[derive(Debug, Clone, Serialize, Deserialize)] 139 - pub enum ResyncState { 139 + pub(crate) enum ResyncState { 140 140 Error { 141 141 kind: ResyncErrorKind, 142 142 retry_count: u32, ··· 202 202 } 203 203 204 204 #[derive(Clone, Debug)] 205 - pub enum BroadcastEvent { 205 + pub(crate) enum BroadcastEvent { 206 206 #[allow(dead_code)] 207 207 Persisted(u64), 208 208 Ephemeral(Box<MarshallableEvt<'static>>), ··· 244 244 use jacquard_common::bytes::Bytes; 245 245 246 246 #[derive(Serialize, Deserialize, Clone)] 247 - pub enum StoredData { 247 + pub(crate) enum StoredData { 248 248 Nothing, 249 249 Ptr(IpldCid), 250 250 #[serde(with = "serde_bytes_squared")] ··· 275 275 276 276 #[derive(Debug, Serialize, Deserialize, Clone)] 277 277 #[serde(bound(deserialize = "'i: 'de"))] 278 - pub struct StoredEvent<'i> { 278 + pub(crate) struct StoredEvent<'i> { 279 279 #[serde(default)] 280 280 pub live: bool, 281 281 #[serde(borrow)] ··· 304 304 } 305 305 306 306 #[derive(Debug, PartialEq, Eq, Clone, Copy)] 307 - pub enum GaugeState { 307 + pub(crate) enum GaugeState { 308 308 Synced, 309 309 Pending, 310 310 Resync(Option<ResyncErrorKind>),