very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
59
fork

Configure Feed

Select the types of activity you want to include in your feed.

[lib,api] add new host limit for hosts added via requestCrawl

dawn 03f00b1d 0f652fa5

+202 -36
+1
README.md
··· 238 238 | `ENABLE_CRAWLER` | `true` if full network or crawler sources are configured, `false` otherwise | whether to actively query the network for unknown repositories. | 239 239 | `CRAWLER_MAX_PENDING_REPOS` | `2000` | max pending repos for crawler. | 240 240 | `CRAWLER_RESUME_PENDING_REPOS` | `1000` | resume threshold for crawler pending repos. | 241 + | `NEW_HOST_LIMIT` | `50` | in relay mode, decides how many new hosts can be added via `com.atproto.sync.requestCrawl` in a day. | 241 242 | `RATE_TIERS` | | comma-separated list of named rate tier definitions in `name:base/mul/hourly/daily[/account_limit]` format (e.g. `trusted:5000/10.0/18000000/432000000/10000000`). the optional account limit prevents new accounts from being created on this PDS once reached. built-in tiers (`default`, `trusted`) are always present and can be overridden. | 242 243 | `TIER_RULES` | | comma-separated ordered list of glob rules in `pattern:tier_name` format (e.g. `*.bsky.network:trusted`). rules are evaluated in order; first match wins. explicit API assignments via `PUT /pds/tiers` take precedence over rules; the `default` tier is the final fallback. uses standard glob wildcards (`*`, `?`) matched against the PDS hostname. | 243 244
+17
src/api/xrpc/mod.rs
··· 179 179 } 180 180 } 181 181 182 + #[cfg(feature = "relay")] 183 + fn rate_limited<E: std::error::Error + IntoStatic>( 184 + nsid: &'static str, 185 + message: impl Display, 186 + ) -> XrpcErrorResponse<E> { 187 + XrpcErrorResponse { 188 + status: StatusCode::TOO_MANY_REQUESTS, 189 + error: XrpcError::Generic(GenericXrpcError { 190 + error: "RateLimitExceeded".into(), 191 + message: Some(message.to_smolstr()), 192 + nsid, 193 + method: "POST", 194 + http_status: StatusCode::TOO_MANY_REQUESTS, 195 + }), 196 + } 197 + } 198 + 182 199 #[cfg(feature = "indexer")] 183 200 fn upstream_error<E: std::error::Error + IntoStatic>( 184 201 nsid: &'static str,
+25
src/api/xrpc/request_crawl.rs
··· 1 1 use jacquard_api::com_atproto::sync::request_crawl::{ 2 2 RequestCrawlError, RequestCrawlRequest, RequestCrawlResponse, 3 3 }; 4 + use miette::IntoDiagnostic; 4 5 use url::Url; 5 6 6 7 use super::*; ··· 21 22 "host is banned".into(), 22 23 ))), 23 24 }); 25 + } 26 + 27 + // enforce daily new pds limit on unknown hosts 28 + if !hydrant.firehose.is_source_known(&url) { 29 + let (allowed, to_persist) = hydrant.state.pds_daily_limit.try_increment(); 30 + if !allowed { 31 + return Err(rate_limited( 32 + nsid, 33 + "daily limit for new PDS sources reached", 34 + )); 35 + } 36 + 37 + // persist the new count before returning so a crash cannot reset the counter 38 + // and allow the budget to be replayed. 39 + if let Some((day, count)) = to_persist { 40 + let state = hydrant.state.clone(); 41 + tokio::task::spawn_blocking(move || { 42 + crate::db::save_pds_daily_adds(&state.db, day, count) 43 + }) 44 + .await 45 + .into_diagnostic() 46 + .flatten() 47 + .map_err(|e| internal_error(nsid, e))?; 48 + } 24 49 } 25 50 26 51 hydrant
+13
src/config.rs
··· 375 375 /// set via `HYDRANT_ONLY_INDEX_LINKS=true`. 376 376 pub only_index_links: bool, 377 377 378 + /// maximum number of new PDS sources that may be added (via seeding or API) in a single 379 + /// UTC calendar day. `None` means unlimited. 380 + /// set via `HYDRANT_NEW_HOST_LIMIT`. 381 + pub new_host_limit: Option<u64>, 382 + 378 383 /// base URL(s) of relay or aggregator services to seed firehose PDS sources from at startup. 379 384 /// 380 385 /// hydrant calls `com.atproto.sync.listHosts` on each URL and adds the returned PDSes ··· 495 500 filter_excludes: None, 496 501 enable_backlinks: false, 497 502 only_index_links: false, 503 + new_host_limit: Some(50), 498 504 tier_rules: vec![], 499 505 tier_policy: { 500 506 let mut tiers = HashMap::new(); ··· 671 677 672 678 let enable_backlinks: bool = cfg!("ENABLE_BACKLINKS", defaults.enable_backlinks); 673 679 let only_index_links: bool = cfg!("ONLY_INDEX_LINKS", defaults.only_index_links); 680 + let max_pds_added_per_day: Option<u64> = std::env::var("HYDRANT_NEW_HOST_LIMIT") 681 + .ok() 682 + .and_then(|s| s.parse().ok()); 674 683 675 684 // start with built-in tier definitions, then layer in any env-defined overrides. 676 685 // format: HYDRANT_RATE_TIERS=name:base/mul/hourly/daily,... ··· 785 794 filter_excludes, 786 795 enable_backlinks, 787 796 only_index_links, 797 + new_host_limit: max_pds_added_per_day, 788 798 tier_policy, 789 799 tier_rules, 790 800 cache_size, ··· 914 924 .collect::<Vec<_>>() 915 925 ) 916 926 )?; 927 + } 928 + if let Some(limit) = self.new_host_limit { 929 + config_line!(f, "max pds/day", limit)?; 917 930 } 918 931 Ok(()) 919 932 }
-4
src/control/crawler.rs
··· 32 32 pub struct CrawlerSourceInfo { 33 33 pub url: Url, 34 34 pub mode: crate::config::CrawlerMode, 35 - /// whether this source is persisted in the database (i.e. it was dynamically added 36 - /// and will survive restarts). config-sourced entries have `persisted: false`. 37 - pub persisted: bool, 38 35 } 39 36 40 37 pub(super) fn spawn_crawler_producer( ··· 179 176 sources.push(CrawlerSourceInfo { 180 177 url: url.clone(), 181 178 mode: h.mode, 182 - persisted: self.persisted.contains_sync(url), 183 179 }); 184 180 true 185 181 })
+12 -9
src/control/firehose.rs
··· 34 34 #[derive(Debug, Clone, serde::Serialize)] 35 35 pub struct FirehoseSourceInfo { 36 36 pub url: Url, 37 - /// true if added via the API and persisted to the database; false for `RELAY_HOSTS` sources. 38 - pub persisted: bool, 39 37 /// true when this is a direct PDS connection; enables host authority enforcement. 40 38 pub is_pds: bool, 41 39 } ··· 48 46 pub(super) shared: Arc<std::sync::OnceLock<FirehoseShared>>, 49 47 /// per-relay running tasks, keyed by url. 50 48 pub(super) tasks: Arc<scc::HashMap<Url, FirehoseIngestorHandle>>, 51 - /// set of urls persisted in the database (dynamically added sources). 52 - pub(super) persisted: Arc<scc::HashSet<Url>>, 49 + /// set of known source urls, includes API-added (db-persisted) and static config sources. 50 + pub(super) known_sources: Arc<scc::HashSet<Url>>, 53 51 /// ids assigned to spawned tasks 54 52 next_task_id: Arc<AtomicUsize>, 55 53 } ··· 60 58 state, 61 59 shared: Arc::new(std::sync::OnceLock::new()), 62 60 tasks: Arc::new(scc::HashMap::new()), 63 - persisted: Arc::new(scc::HashSet::new()), 61 + known_sources: Arc::new(scc::HashSet::new()), 64 62 next_task_id: Arc::new(AtomicUsize::new(0)), 65 63 } 66 64 } ··· 153 151 *self.state.firehose_enabled.borrow() 154 152 } 155 153 154 + /// returns `true` if this URL is already a known firehose source — either currently 155 + /// running or persisted (e.g. the host is offline but was previously added). 156 + pub fn is_source_known(&self, url: &Url) -> bool { 157 + self.known_sources.contains_sync(url) 158 + } 159 + 156 160 /// list all currently active firehose sources. 157 161 pub async fn list_sources(&self) -> Vec<FirehoseSourceInfo> { 158 162 let mut out = Vec::new(); ··· 160 164 .any_async(|url, handle| { 161 165 out.push(FirehoseSourceInfo { 162 166 url: url.clone(), 163 - persisted: self.persisted.contains_sync(url), 164 167 is_pds: handle.is_pds, 165 168 }); 166 169 false ··· 197 200 .await 198 201 .into_diagnostic()??; 199 202 200 - let _ = self.persisted.insert_async(url.clone()).await; 203 + let _ = self.known_sources.insert_async(url.clone()).await; 201 204 202 205 // reset failure state so the fresh task gets a clean slate. 203 206 // if the previous task exited after max failures, the failure counter ··· 217 220 /// if the source was added via the API, it is removed from the database; 218 221 /// if it came from the static config, only the running task is stopped. 219 222 pub async fn remove_source(&self, url: &Url) -> Result<bool> { 220 - if self.persisted.contains_async(url).await { 223 + if self.known_sources.contains_async(url).await { 221 224 let url_str = url.to_string(); 222 225 tokio::task::spawn_blocking({ 223 226 let state = self.state.clone(); ··· 232 235 }) 233 236 .await 234 237 .into_diagnostic()??; 235 - self.persisted.remove_async(url).await; 238 + self.known_sources.remove_async(url).await; 236 239 } 237 240 238 241 Ok(self.tasks.remove_async(url).await.is_some())
+8 -1
src/control/mod.rs
··· 414 414 "starting firehose ingestor(s)" 415 415 ); 416 416 for source in &relay_hosts { 417 + let _ = firehose 418 + .known_sources 419 + .insert_async(source.url.clone()) 420 + .await; 417 421 firehose 418 422 .spawn_firehose_ingestor(source, fire_shared, true) 419 423 .await?; ··· 428 432 .into_diagnostic()??; 429 433 430 434 for source in &persisted_sources { 431 - let _ = firehose.persisted.insert_async(source.url.clone()).await; 435 + let _ = firehose 436 + .known_sources 437 + .insert_async(source.url.clone()) 438 + .await; 432 439 if firehose.tasks.contains_async(&source.url).await { 433 440 continue; 434 441 }
+5
src/db/keys/mod.rs
··· 100 100 pub fn pds_account_count_key(host: &str) -> String { 101 101 format!("p|{host}") 102 102 } 103 + 104 + /// key for the persisted daily-PDS-add counter in the cursors keyspace. 105 + /// value layout: [day: u64 BE][count: u64 BE] = 16 bytes. 106 + #[cfg(feature = "relay")] 107 + pub const PDS_DAILY_ADDS_KEY: &[u8] = b"pds_daily_adds";
+30
src/db/mod.rs
··· 836 836 batch.commit().into_diagnostic() 837 837 } 838 838 839 + /// load the persisted (day, count) pair for the daily PDS add counter, if present. 840 + /// returns `None` if no entry exists or the stored data is malformed. 841 + #[cfg(feature = "relay")] 842 + pub fn load_pds_daily_adds(db: &Db) -> Result<Option<(u64, u64)>> { 843 + let Some(val) = db.cursors.get(keys::PDS_DAILY_ADDS_KEY).into_diagnostic()? else { 844 + return Ok(None); 845 + }; 846 + if val.len() < 16 { 847 + miette::bail!("malformed pds daily limit value"); 848 + } 849 + let day = u64::from_be_bytes(val[..8].try_into().into_diagnostic()?); 850 + let count = u64::from_be_bytes(val[8..].try_into().into_diagnostic()?); 851 + Ok(Some((day, count))) 852 + } 853 + 854 + /// persist the daily PDS add counter (day, count) to the cursors keyspace. 855 + /// value layout: [day: u64 BE][count: u64 BE] = 16 bytes. 856 + /// 857 + /// takes the `cursors` keyspace directly so the caller can clone it into a 858 + /// `spawn_blocking` closure without needing an owned `Db`. 859 + #[cfg(feature = "relay")] 860 + pub fn save_pds_daily_adds(db: &Db, day: u64, count: u64) -> Result<()> { 861 + let mut value = [0u8; 16]; 862 + value[..8].copy_from_slice(&day.to_be_bytes()); 863 + value[8..].copy_from_slice(&count.to_be_bytes()); 864 + db.cursors 865 + .insert(keys::PDS_DAILY_ADDS_KEY, value) 866 + .into_diagnostic() 867 + } 868 + 839 869 pub fn load_persisted_firehose_sources( 840 870 db: &crate::db::Db, 841 871 ) -> Result<Vec<crate::config::FirehoseSource>> {
+2
src/lib.rs
··· 3 3 /// hydrant main api, includes the Hydrant type for programmatic control. 4 4 pub mod control; 5 5 pub(crate) mod filter; 6 + #[cfg(feature = "relay")] 7 + pub(crate) mod pds_daily_limit; 6 8 pub(crate) mod pds_meta; 7 9 pub mod types; 8 10
+73
src/pds_daily_limit.rs
··· 1 + use std::sync::atomic::{AtomicU64, Ordering}; 2 + use std::time::{SystemTime, UNIX_EPOCH}; 3 + 4 + /// per-UTC-day counter for PDS additions via `requestCrawl`. 5 + /// 6 + /// the in-memory state is initialised from the database on startup (see 7 + /// [`crate::db::load_pds_daily_adds`]). the counter resets automatically when the UTC day 8 + /// rolls over. 9 + pub(crate) struct PdsDailyLimit { 10 + limit: Option<u64>, 11 + /// current UTC day index (unix seconds / 86400). 12 + day: AtomicU64, 13 + /// requestCrawl calls accepted on the current UTC day. 14 + count: AtomicU64, 15 + } 16 + 17 + impl PdsDailyLimit { 18 + /// construct from the previously-persisted `(day, count)` pair loaded from the database. 19 + /// if the stored day doesn't match today the count is treated as 0. 20 + pub(crate) fn new(limit: Option<u64>, stored: Option<(u64, u64)>) -> Self { 21 + let today = utc_day(); 22 + let count = stored 23 + .filter(|(day, _)| *day == today) 24 + .map(|(_, count)| count) 25 + .unwrap_or(0); 26 + Self { 27 + limit, 28 + day: AtomicU64::new(today), 29 + count: AtomicU64::new(count), 30 + } 31 + } 32 + 33 + /// attempt to consume a daily slot. 34 + /// 35 + /// returns `(allowed, to_persist)`: 36 + /// - `allowed`: whether the request is permitted. 37 + /// - `to_persist`: when `Some((day, new_count))`, the caller must persist these values to 38 + /// the database before returning success, so that a process crash cannot reset the counter 39 + /// and allow the budget to be replayed. `None` when no limit is configured. 40 + /// 41 + /// when the UTC day rolls over the counter resets and a fresh quota starts. 42 + pub(crate) fn try_increment(&self) -> (bool, Option<(u64, u64)>) { 43 + let Some(limit) = self.limit else { 44 + return (true, None); 45 + }; 46 + 47 + let today = utc_day(); 48 + if self.day.load(Ordering::Relaxed) != today { 49 + self.count.store(0, Ordering::Relaxed); 50 + self.day.store(today, Ordering::Relaxed); 51 + } 52 + 53 + // fetch_add returns the value *before* the increment 54 + let prev = self.count.fetch_add(1, Ordering::Relaxed); 55 + if prev >= limit { 56 + // undo to avoid the counter drifting upwards on repeated rejections 57 + self.count.fetch_sub(1, Ordering::Relaxed); 58 + return (false, None); 59 + } 60 + 61 + let new_count = prev + 1; 62 + let day = self.day.load(Ordering::Relaxed); 63 + (true, Some((day, new_count))) 64 + } 65 + } 66 + 67 + fn utc_day() -> u64 { 68 + SystemTime::now() 69 + .duration_since(UNIX_EPOCH) 70 + .unwrap_or_default() 71 + .as_secs() 72 + / 86400 73 + }
+11 -2
src/state.rs
··· 10 10 use tokio::sync::watch; 11 11 use url::Url; 12 12 13 + #[cfg(feature = "relay")] 14 + use crate::pds_daily_limit::PdsDailyLimit; 13 15 use crate::{ 14 16 config::Config, 15 17 db::Db, ··· 22 24 pub struct AppState { 23 25 pub db: Db, 24 26 pub resolver: Resolver, 27 + pub throttler: Throttler, 25 28 pub(crate) filter: FilterHandle, 26 29 pub(crate) pds_meta: PdsMetaHandle, 30 + #[cfg(feature = "relay")] 31 + pub(crate) pds_daily_limit: PdsDailyLimit, 27 32 pub(crate) tier_policy: TierPolicy, 28 33 pub firehose_cursors: scc::HashIndex<Url, AtomicI64>, 34 + pub firehose_enabled: watch::Sender<bool>, 29 35 #[cfg(feature = "indexer")] 30 36 pub backfill_notify: Notify, 31 37 #[cfg(feature = "indexer")] 32 38 pub crawler_enabled: watch::Sender<bool>, 33 - pub firehose_enabled: watch::Sender<bool>, 34 39 #[cfg(feature = "indexer")] 35 40 pub backfill_enabled: watch::Sender<bool>, 36 41 pub ephemeral: bool, 37 42 pub ephemeral_ttl: Duration, 38 43 pub only_index_links: bool, 39 - pub throttler: Throttler, 40 44 } 41 45 42 46 impl AppState { ··· 87 91 let pds_meta = new_pds_handle(PdsMeta { hosts }); 88 92 89 93 let relay_cursors = scc::HashIndex::new(); 94 + #[cfg(feature = "relay")] 95 + let pds_daily_limit = 96 + PdsDailyLimit::new(config.new_host_limit, crate::db::load_pds_daily_adds(&db)?); 90 97 91 98 #[cfg(feature = "indexer")] 92 99 let (crawler_enabled, _) = watch::channel(crawler_default); ··· 112 119 ephemeral_ttl: config.ephemeral_ttl.clone(), 113 120 only_index_links: config.only_index_links, 114 121 throttler: Throttler::new(), 122 + #[cfg(feature = "relay")] 123 + pds_daily_limit, 115 124 }) 116 125 } 117 126
+5 -20
tests/api.nu
··· 30 30 if $s.mode != "list_repos" { 31 31 fail $"expected mode=list_repos, got ($s.mode)" $pid 32 32 } 33 - if not $s.persisted { 34 - fail "expected persisted=true for dynamically added source" $pid 35 - } 36 - print $" ok: 1 source, url=($s.url), mode=($s.mode), persisted=($s.persisted)" 33 + print $" ok: 1 source, url=($s.url), mode=($s.mode)" 37 34 38 35 # posting the same URL with a different mode replaces the existing entry 39 36 print " POST /crawler/sources (should override)..." ··· 113 110 fail $"expected 1 source after restart, got ($after | length)" $instance2.pid 114 111 } 115 112 let s = ($after | first) 116 - if not $s.persisted { 117 - fail "expected persisted=true after restart" $instance2.pid 118 - } 119 113 if $s.mode != "by_collection" { 120 114 fail $"expected mode=by_collection after restart, got ($s.mode)" $instance2.pid 121 115 } ··· 141 135 fail "hydrant did not start" 142 136 } 143 137 144 - # config source should appear, but with persisted=false 145 - print " checking config source appears with persisted=false..." 138 + # config source should appear 139 + print " checking config source appears..." 146 140 let sources = (http get $"($url)/crawler/sources") 147 141 if ($sources | length) != 1 { 148 142 fail $"expected 1 source, got ($sources | length)" $instance.pid 149 143 } 150 - if ($sources | first).persisted { 151 - fail "expected persisted=false for a CRAWLER_URLS source" $instance.pid 152 - } 153 - print " ok: config source has persisted=false" 144 + print " ok: config source present" 154 145 155 146 # the task can be stopped at runtime 156 147 print " deleting config source at runtime..." ··· 179 170 if ($after_restart | length) != 1 { 180 171 fail $"expected config source to reappear after restart, got ($after_restart | length)" $instance2.pid 181 172 } 182 - if ($after_restart | first).persisted { 183 - fail "expected persisted=false after restart" $instance2.pid 184 - } 185 173 print " ok: config source reappears on restart (not persisted to DB)" 186 174 187 175 kill $instance2.pid ··· 213 201 fail $"expected 1 source, got ($sources | length)" $pid 214 202 } 215 203 let s = ($sources | first) 216 - if not $s.persisted { 217 - fail "expected persisted=true for dynamically added source" $pid 218 - } 219 - print $" ok: 1 source, url=($s.url), persisted=($s.persisted)" 204 + print $" ok: 1 source, url=($s.url)" 220 205 221 206 # posting the same URL replaces the existing entry 222 207 print " POST /firehose/sources (should override)..."