very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
59
fork

Configure Feed

Select the types of activity you want to include in your feed.

[ingest,lib,api] add glob rules for pds tiers, TRUSTED_HOSTS -> TIER_RULES

dawn b918a396 1c7e199b

+395 -146
+11 -14
README.md
··· 237 237 | `ENABLE_CRAWLER` | `true` if full network or crawler sources are configured, `false` otherwise | whether to actively query the network for unknown repositories. | 238 238 | `CRAWLER_MAX_PENDING_REPOS` | `2000` | max pending repos for crawler. | 239 239 | `CRAWLER_RESUME_PENDING_REPOS` | `1000` | resume threshold for crawler pending repos. | 240 - | `TRUSTED_HOSTS` | | comma-separated list of PDS hostnames to pre-assign to the `trusted` rate tier at startup. hosts not listed here use the `default` tier unless assigned via the API. | 241 240 | `RATE_TIERS` | | comma-separated list of named rate tier definitions in `name:base/mul/hourly/daily[/account_limit]` format (e.g. `trusted:5000/10.0/18000000/432000000/10000000`). the optional account limit prevents new accounts from being created on this PDS once reached. built-in tiers (`default`, `trusted`) are always present and can be overridden. | 241 + | `TIER_RULES` | | comma-separated ordered list of glob rules in `pattern:tier_name` format (e.g. `*.bsky.network:trusted`). rules are evaluated in order; first match wins. explicit API assignments via `PUT /pds/tiers` take precedence over rules; the `default` tier is the final fallback. uses standard glob wildcards (`*`, `?`) matched against the PDS hostname. | 242 242 243 243 ## build features 244 244 ··· 407 407 - `GET /pds/tiers`: list all current tier assignments alongside the available 408 408 tier definitions. 409 409 - returns `{ "assignments": [{ "host": string, "tier": string }], "rate_tiers": { <name>: { "per_second_base": int, "per_second_account_mul": float, "per_hour": int, "per_day": int } } }`. 410 - - `assignments` only contains PDSes with an explicit assignment; any PDS not 411 - listed uses the `default` tier. 410 + - `assignments` only contains PDSes with an explicit API assignment. hosts without one resolve via glob rules or fall back to `default`. 412 411 - `PUT /pds/tiers`: assign a PDS to a named rate tier. 413 412 - body: `{ "host": string, "tier": string }`. 414 413 - `host` is the PDS hostname (e.g. `pds.example.com`). 415 414 - `tier` must be one of the configured tier names. returns `400` if unknown. 416 415 - assignments are persisted to the database and survive restarts. 417 416 - re-assigning the same host updates the tier in place without creating a duplicate. 418 - - `DELETE /pds/tiers`: remove an explicit tier assignment for a PDS, reverting 419 - it to the `default` tier. 417 + - `DELETE /pds/tiers`: remove an explicit tier assignment for a PDS. 420 418 - query parameter: `?host=<hostname>` (e.g. `?host=pds.example.com`). 419 + - reverts the host to glob-rule resolution (not necessarily `default`, a matching `TIER_RULES` pattern still applies). 421 420 - returns `200` even if no assignment existed. 422 421 - `GET /pds/rate-tiers`: list the available rate tier definitions. 423 422 - returns a map of tier name to `{ "per_second_base", "per_second_account_mul", "per_hour", "per_day", "account_limit" }`. 424 423 425 - hosts listed in `TRUSTED_HOSTS` are seeded as `trusted` at startup, but only 426 - when no database assignment already exists for that host — DB entries always win. 427 - the seed is not written to the database, so it is re-applied on every restart. 428 - consequences: if you remove a host from `TRUSTED_HOSTS` and it has no DB entry, 429 - it will revert to `default` on the next restart. if you explicitly assign a host 430 - via the API (which writes to the DB), that assignment persists regardless of 431 - `TRUSTED_HOSTS`. if you delete a host's DB assignment via the API while it is 432 - still listed in `TRUSTED_HOSTS`, it will be re-seeded as `trusted` on the next 433 - restart. 424 + tiers are resolved in this order: 425 + 426 + 1. **explicit API assignment**, set via `PUT /pds/tiers`, stored in the database, survives restarts. 427 + 2. **glob rules**, from `TIER_RULES`, evaluated in order; first match wins. 428 + 3. **`default` tier**, applied if no rule or explicit assignment matches. 429 + 430 + deleting an API assignment reverts the host to glob-rule resolution, not necessarily back to `default`. if a rule like `*.bsky.network:trusted` matches the host, it will become trusted again without any further action. 434 431 435 432 ### repository management 436 433
+4 -2
src/api/xrpc/com_atproto_describe_repo.rs
··· 1 1 use futures::TryFutureExt; 2 - use jacquard_api::com_atproto::repo::describe_repo::{DescribeRepoOutput, DescribeRepoRequest}; 2 + use jacquard_api::com_atproto::repo::describe_repo::{ 3 + DescribeRepoOutput, DescribeRepoRequest, DescribeRepoResponse, 4 + }; 3 5 4 6 use crate::util::invalid_handle; 5 7 ··· 9 11 State(hydrant): State<Hydrant>, 10 12 ExtractXrpc(req): ExtractXrpc<DescribeRepoRequest>, 11 13 ) -> XrpcResult<Json<DescribeRepoOutput<'static>>> { 12 - let nsid = "com.atproto.repo.describeRepo"; 14 + let nsid = DescribeRepoResponse::NSID; 13 15 let resolver = &hydrant.state.resolver; 14 16 15 17 let did = resolver
+55 -26
src/config.rs
··· 1 + use crate::pds_meta::{TierPolicy, TierRule}; 1 2 use miette::Result; 2 3 use serde::{Deserialize, Serialize}; 3 - use smol_str::ToSmolStr; 4 + use smol_str::{SmolStr, ToSmolStr}; 4 5 use std::collections::HashMap; 5 6 use std::fmt; 6 7 use std::path::PathBuf; ··· 375 376 /// 376 377 /// set via `HYDRANT_SEED_HOSTS` as a comma-separated list of base URLs. 377 378 pub seed_hosts: Vec<Url>, 378 - /// list of trusted PDS/relay hosts to pre-assign to the "trusted" rate tier at startup. 379 - /// set via `HYDRANT_TRUSTED_HOSTS` as a comma-separated list of hostnames. 380 - /// hosts not present in this list use the "default" tier unless assigned via the API. 381 - pub trusted_hosts: Vec<String>, 382 379 /// named rate tier definitions for PDS rate limiting. 383 380 /// 384 381 /// built-in tiers ("default" and "trusted") are always present and may be overridden. 385 382 /// set via `HYDRANT_RATE_TIERS` as a comma-separated list of `name:base/mul/hourly/daily` entries, 386 383 /// e.g. `trusted:5000/10.0/18000000/432000000,custom:100/1.0/7200000/172800000`. 387 - pub rate_tiers: HashMap<String, RateTier>, 384 + /// 385 + /// built from `HYDRANT_TIER_RULES` and `HYDRANT_RATE_TIERS` at startup. 386 + pub tier_policy: TierPolicy, 387 + 388 + /// glob rules mapping host patterns to named rate tiers. 389 + /// 390 + /// set via `HYDRANT_TIER_RULES` as a comma-separated list of `pattern:tiername` entries, 391 + /// e.g. `*.bsky.network:trusted,pds.example.com:custom`. rules are evaluated in order; 392 + /// api-assigned per-host overrides always take priority over these rules. 393 + pub tier_rules: Vec<(String, String)>, 388 394 389 395 /// db internals, tune only if you know what you're doing. 390 396 /// ··· 478 484 filter_collections: None, 479 485 filter_excludes: None, 480 486 enable_backlinks: false, 481 - trusted_hosts: vec![], 482 - rate_tiers: { 483 - let mut m = HashMap::new(); 484 - m.insert("default".to_string(), RateTier::default_tier()); 485 - m.insert("trusted".to_string(), RateTier::trusted()); 486 - m 487 + tier_rules: vec![], 488 + tier_policy: { 489 + let mut tiers = HashMap::new(); 490 + tiers.insert(SmolStr::new("default"), RateTier::default_tier()); 491 + tiers.insert(SmolStr::new("trusted"), RateTier::trusted()); 492 + TierPolicy { 493 + tiers, 494 + rules: vec![], 495 + } 487 496 }, 488 497 cache_size: 256, 489 498 data_compression: Compression::Zstd, ··· 651 660 652 661 let enable_backlinks: bool = cfg!("ENABLE_BACKLINKS", defaults.enable_backlinks); 653 662 654 - // start with built-in tiers, then layer in any env-defined overrides. 663 + // start with built-in tier definitions, then layer in any env-defined overrides. 655 664 // format: HYDRANT_RATE_TIERS=name:base/mul/hourly/daily,... 656 - let mut rate_tiers = defaults.rate_tiers.clone(); 665 + let mut tiers = defaults.tier_policy.tiers.clone(); 657 666 if let Ok(s) = std::env::var("HYDRANT_RATE_TIERS") { 658 667 for entry in s.split(',') { 659 668 let entry = entry.trim(); 660 669 if let Some((name, spec)) = entry.split_once(':') { 661 670 match RateTier::parse(spec) { 662 671 Some(tier) => { 663 - rate_tiers.insert(name.trim().to_string(), tier); 672 + tiers.insert(SmolStr::new(name.trim()), tier); 664 673 } 665 674 None => tracing::warn!( 666 675 "ignoring invalid rate tier '{name}': expected base/mul/hourly/daily format" ··· 688 697 }) 689 698 .unwrap_or_else(|| defaults.seed_hosts.clone()); 690 699 691 - let trusted_hosts = std::env::var("HYDRANT_TRUSTED_HOSTS") 692 - .ok() 693 - .map(|s| { 694 - s.split(',') 695 - .map(|s| s.trim().to_string()) 696 - .filter(|s| !s.is_empty()) 697 - .collect() 698 - }) 699 - .unwrap_or_else(|| defaults.trusted_hosts.clone()); 700 + // build ordered glob rules from HYDRANT_TIER_RULES 701 + let mut rules: Vec<TierRule> = vec![]; 702 + let mut tier_rules: Vec<(String, String)> = vec![]; 703 + if let Ok(s) = std::env::var("HYDRANT_TIER_RULES") { 704 + for entry in s.split(',') { 705 + let entry = entry.trim(); 706 + if entry.is_empty() { 707 + continue; 708 + } 709 + if let Some((pattern_str, tier_name)) = entry.split_once(':') { 710 + let pattern_str = pattern_str.trim(); 711 + let tier_name = tier_name.trim(); 712 + match glob::Pattern::new(pattern_str) { 713 + Ok(pattern) => { 714 + rules.push(TierRule { 715 + pattern, 716 + tier_name: SmolStr::new(tier_name), 717 + }); 718 + tier_rules.push((pattern_str.to_string(), tier_name.to_string())); 719 + } 720 + Err(e) => tracing::warn!( 721 + "ignoring invalid tier rule pattern '{pattern_str}': {e}" 722 + ), 723 + } 724 + } 725 + } 726 + } 727 + 728 + let tier_policy = TierPolicy { tiers, rules }; 700 729 701 730 let default_mode = CrawlerMode::default_for(full_network); 702 731 let crawler_sources = match std::env::var("HYDRANT_CRAWLER_URLS") { ··· 743 772 filter_collections, 744 773 filter_excludes, 745 774 enable_backlinks, 746 - trusted_hosts, 747 - rate_tiers, 775 + tier_policy, 776 + tier_rules, 748 777 cache_size, 749 778 data_compression, 750 779 journal_compression,
+6
src/control/firehose.rs
··· 199 199 200 200 let _ = self.persisted.insert_async(url.clone()).await; 201 201 202 + // reset failure state so the fresh task gets a clean slate. 203 + // if the previous task exited after max failures, the failure counter 204 + // would otherwise cause the new task to exit immediately. 205 + let throttle = self.state.throttler.get_handle(&url).await; 206 + throttle.record_success(); 207 + 202 208 self.spawn_firehose_ingestor(&FirehoseSource { url, is_pds }, shared, false) 203 209 .await?; 204 210
+49 -29
src/control/pds.rs
··· 4 4 use miette::{IntoDiagnostic, Result}; 5 5 use serde::Serialize; 6 6 use smol_str::SmolStr; 7 + use tracing::debug; 7 8 8 9 use crate::config::RateTier; 10 + use crate::db::keys::pds_account_count_key; 9 11 use crate::db::pds_meta as db_pds; 10 - use crate::pds_meta::{HostStatus, PdsMeta}; 12 + use crate::pds_meta::{HostDesc, HostStatus, PdsMeta}; 11 13 use crate::state::AppState; 12 14 13 15 /// a single PDS-to-tier assignment. ··· 50 52 G: FnOnce(&mut PdsMeta), 51 53 { 52 54 let state = self.0.clone(); 53 - tokio::task::spawn_blocking(move || { 55 + tokio::task::spawn_blocking(move || -> Result<()> { 54 56 let mut batch = state.db.inner.batch(); 55 57 db_op(&mut batch, &state.db.filter); 56 58 batch.commit().into_diagnostic()?; ··· 66 68 Ok(()) 67 69 } 68 70 69 - fn check_limit_transition(&self, host: &str, account_limit: Option<u64>) -> Option<HostStatus> { 70 - let count_key = crate::db::keys::pds_account_count_key(host); 71 - let count = self.0.db.get_count_sync(&count_key); 72 - let current_status = self.0.pds_meta.load().status(host); 73 - current_status.check_limit_transition(count, account_limit) 74 - } 75 - 76 - /// list all current per-PDS tier assignments. 71 + /// list all current per-PDS tier assignments (explicit api-assigned overrides only). 77 72 pub async fn list_tiers(&self) -> HashMap<String, String> { 78 73 let snapshot = self.0.pds_meta.load(); 79 74 snapshot 80 75 .hosts 81 76 .iter() 82 - .filter_map(|(host, desc)| desc.tier.as_ref().map(|t| (host.clone(), t.to_string()))) 77 + .filter_map(|(host, desc): (&String, &HostDesc)| { 78 + desc.tier 79 + .as_ref() 80 + .map(|t: &smol_str::SmolStr| (host.clone(), t.to_string())) 81 + }) 83 82 .collect() 84 83 } 85 84 86 - /// returns the assigned tier for `host`, or "default" if none is assigned. 85 + /// returns the assigned tier for `host`, or \"default\" if none is assigned. 87 86 pub fn get_tier(&self, host: impl AsRef<str>) -> String { 88 87 let snapshot = self.0.pds_meta.load(); 89 88 snapshot ··· 105 104 snapshot 106 105 .hosts 107 106 .iter() 108 - .filter_map(|(host, desc)| { 107 + .filter_map(|(host, desc): (&String, &crate::pds_meta::HostDesc)| { 109 108 matches!(desc.status, HostStatus::Banned).then(|| host.clone()) 110 109 }) 111 110 .collect() ··· 114 113 /// list all configured rate tier definitions. 115 114 pub fn list_rate_tiers(&self) -> HashMap<String, PdsTierDefinition> { 116 115 self.0 117 - .rate_tiers 116 + .tier_policy 117 + .tiers 118 118 .iter() 119 - .map(|(name, tier)| (name.clone(), PdsTierDefinition::from(*tier))) 119 + .map(|(name, tier): (&smol_str::SmolStr, &RateTier)| { 120 + (name.to_string(), PdsTierDefinition::from(*tier)) 121 + }) 120 122 .collect() 121 123 } 122 124 123 - /// assign `host` to `tier`, persisting the change to the database. 125 + /// assign `host` to `tier`. 124 126 /// returns an error if `tier` is not a known tier name. 125 127 pub async fn set_tier(&self, host: impl AsRef<str>, tier: String) -> Result<()> { 126 - if !self.0.rate_tiers.contains_key(&tier) { 128 + if !self.0.tier_policy.tiers.contains_key(tier.as_str()) { 127 129 miette::bail!( 128 130 "unknown tier '{tier}'; known tiers: {:?}", 129 - self.0.rate_tiers.keys().collect::<Vec<_>>() 131 + self.0.tier_policy.tiers.keys().collect::<Vec<_>>() 130 132 ); 131 133 } 132 134 ··· 134 136 let host_clone = host.clone(); 135 137 let tier_clone = tier.clone(); 136 138 137 - let new_tier_limit = self.0.rate_tiers.get(&tier).unwrap().account_limit; 138 - let maybe_status = self.check_limit_transition(&host, new_tier_limit); 139 + // read the new tier's account limit and check for a status transition, 140 + // now that the override is about to change. 141 + let new_tier_limit = self 142 + .0 143 + .tier_policy 144 + .tiers 145 + .get(tier.as_str()) 146 + .unwrap() 147 + .account_limit; 148 + let count = self.0.db.get_count_sync(&pds_account_count_key(&host)); 149 + let current_status = self.0.pds_meta.load().status(&host); 150 + let maybe_status = current_status.check_limit_transition(count, new_tier_limit); 139 151 140 152 self.update( 141 153 move |batch, ks| { ··· 156 168 .await 157 169 } 158 170 159 - /// remove any explicit tier assignment for `host`, reverting it to the default tier. 171 + /// remove any explicit tier assignment for `host`, reverting it to the matched rule or default. 160 172 pub async fn remove_tier(&self, host: impl AsRef<str>) -> Result<()> { 161 173 let host = host.as_ref().to_string(); 162 174 let host_clone = host.clone(); 163 175 164 - let default_tier_limit = self 165 - .0 166 - .rate_tiers 167 - .get("default") 168 - .and_then(|t| t.account_limit); 169 - let maybe_status = self.check_limit_transition(&host, default_tier_limit); 176 + // after removing the override, the effective tier is determined by glob rules. 177 + // resolve it without the override to get the correct limit. 178 + let effective_limit = self.0.tier_policy.resolve(&host, None).account_limit; 179 + let count = self.0.db.get_count_sync(&pds_account_count_key(&host)); 180 + let current_status = self.0.pds_meta.load().status(&host); 181 + let maybe_status = current_status.check_limit_transition(count, effective_limit); 182 + debug!( 183 + host, 184 + ?current_status, 185 + ?effective_limit, 186 + count, 187 + ?maybe_status, 188 + "remove_tier: computed status transition" 189 + ); 170 190 171 191 self.update( 172 192 move |batch, ks| { ··· 187 207 .await 188 208 } 189 209 190 - /// ban `host`, persisting the change to the database. 210 + /// ban `host` 191 211 pub async fn ban(&self, host: impl AsRef<str>) -> Result<()> { 192 212 let host = host.as_ref().to_string(); 193 213 let host_clone = host.clone(); ··· 204 224 .await 205 225 } 206 226 207 - /// unban `host`, removing it from the database. 227 + /// unban `host` 208 228 pub async fn unban(&self, host: impl AsRef<str>) -> Result<()> { 209 229 let host = host.as_ref().to_string(); 210 230 let host_clone = host.clone();
+16 -6
src/ingest/firehose.rs
··· 130 130 if banned { 131 131 break Ok(()); 132 132 } 133 - meta.tier_for(host, &self.state.rate_tiers) 133 + let override_name = meta.hosts.get(host).and_then(|h| h.tier.as_ref()); 134 + self.state.tier_policy.resolve(host, override_name) 134 135 }; 135 136 let accounts = self.state.db.get_count(&count_key).await; 136 137 tokio::select! { ··· 162 163 } 163 164 _ = &mut active_sleep, if !marked_active => { 164 165 marked_active = true; 165 - // only reset failure state once the stream has been healthy for 166 - // a full window — prevents hosts that connect but immediately 167 - // send garbage from resetting their backoff on every attempt 166 + // only reset failure state once the stream has been healthy for a bit 167 + // so we dont get in a "connects successfully, sends garbage" situation 168 168 self.throttle.record_success(); 169 169 if self.is_pds { 170 170 let (current_status, tier) = { 171 171 let meta = self.state.pds_meta.load(); 172 - (meta.status(host), meta.tier_for(host, &self.state.rate_tiers)) 172 + let override_name = meta.hosts.get(host).and_then(|h| h.tier.as_ref()); 173 + (meta.status(host), self.state.tier_policy.resolve(host, override_name)) 173 174 }; 174 175 if current_status == HostStatus::Banned { 175 176 break Ok(()); 176 177 } 177 178 let count = self.state.db.get_count_sync(&count_key); 178 179 let new_status = tier.account_limit.is_some_and(|l| count >= l) 179 - .then_some(HostStatus::Throttled).unwrap_or(HostStatus::Active); 180 + .then_some(HostStatus::Throttled) 181 + .unwrap_or(HostStatus::Active); 182 + debug!( 183 + host, 184 + ?current_status, 185 + account_limit = ?tier.account_limit, 186 + count, 187 + ?new_status, 188 + "active_sleep: computed status transition" 189 + ); 180 190 181 191 if current_status != new_status { 182 192 if let Err(e) = self.set_host_status(new_status) {
+12 -38
src/ingest/relay.rs
··· 18 18 use tracing::{debug, error, info, info_span, trace, warn}; 19 19 use url::Url; 20 20 21 + use crate::db::keys::pds_account_count_key; 21 22 use crate::db::{self, keys}; 22 23 use crate::ingest::stream::AccountStatus; 23 24 #[cfg(feature = "relay")] ··· 523 524 // update per-PDS active account count on transitions 524 525 if is_pds { 525 526 if let Some(host) = firehose.host_str() { 526 - let count_key = keys::pds_account_count_key(host); 527 + let count_key = pds_account_count_key(host); 527 528 let changed = if !was_active && repo_state.active { 528 529 Some(ctx.state.db.update_count(&count_key, 1)) 529 530 } else if was_active && !repo_state.active { ··· 533 534 }; 534 535 535 536 if let Some(count) = changed { 536 - let (current_status, limit) = { 537 - let meta = ctx.state.pds_meta.load(); 538 - ( 539 - meta.status(host), 540 - meta.tier_for(host, &ctx.state.rate_tiers).account_limit, 541 - ) 542 - }; 543 - 544 - if let Some(status) = current_status.check_limit_transition(count, limit) { 545 - debug!(%host, count, ?limit, ?status, "account count crossed limit, shifting status"); 546 - if let Err(e) = crate::db::pds_meta::set_status( 547 - &mut ctx.batch, 548 - &ctx.state.db.filter, 549 - host, 550 - status, 551 - ) { 552 - error!(err = %e, "failed to write host status"); 553 - } else { 554 - crate::pds_meta::PdsMeta::update_host(&ctx.state.pds_meta, host, |h| { 555 - h.status = status 556 - }); 557 - } 537 + let mut batch_for_status = ctx.state.db.inner.batch(); 538 + ctx.state 539 + .apply_host_limit_status(&mut batch_for_status, host, count); 540 + if let Err(e) = batch_for_status.commit() { 541 + error!(%host, err = %e, "failed to commit host status update"); 558 542 } 559 543 } 560 544 } ··· 878 862 } 879 863 880 864 if let Some(host) = msg.firehose.host_str() { 881 - let tier = self 882 - .state 883 - .pds_meta 884 - .load() 885 - .tier_for(host, &self.state.rate_tiers); 886 - if let Some(limit) = tier.account_limit { 887 - let count = self 888 - .state 889 - .db 890 - .get_count_sync(&crate::db::keys::pds_account_count_key(host)); 891 - if count >= limit { 892 - warn!(did = %did, host, count, limit, "account limit reached for host, dropping new account"); 893 - return Ok(None); 894 - } 865 + let count = self.state.db.get_count_sync(&pds_account_count_key(host)); 866 + if self.state.is_over_account_limit(host, count) { 867 + warn!(did = %did, host, count, "account limit reached for host, dropping new account"); 868 + return Ok(None); 895 869 } 896 870 } 897 871 } ··· 924 898 // track initial active state for per-PDS rate limiting 925 899 if msg.is_pds && repo_state.active { 926 900 if let Some(host) = msg.firehose.host_str() { 927 - db.update_count(&keys::pds_account_count_key(host), 1); 901 + db.update_count(&pds_account_count_key(host), 1); 928 902 } 929 903 } 930 904
+54 -14
src/pds_meta.rs
··· 4 4 use smol_str::SmolStr; 5 5 use std::collections::HashMap; 6 6 use std::sync::Arc; 7 + use tracing::debug; 7 8 8 9 #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] 9 10 pub enum HostStatus { ··· 82 83 next 83 84 }); 84 85 } 85 - } 86 - 87 - impl PdsMeta { 88 - pub fn tier_for(&self, host: &str, rate_tiers: &HashMap<String, RateTier>) -> RateTier { 89 - let default = rate_tiers 90 - .get("default") 91 - .copied() 92 - .unwrap_or_else(RateTier::default_tier); 93 - self.hosts 94 - .get(host) 95 - .and_then(|h| h.tier.as_ref()) 96 - .and_then(|name| rate_tiers.get(name.as_str()).copied()) 97 - .unwrap_or(default) 98 - } 99 86 100 87 pub fn status(&self, host: &str) -> HostStatus { 101 88 self.hosts ··· 114 101 pub(crate) fn new_handle(meta: PdsMeta) -> PdsMetaHandle { 115 102 Arc::new(ArcSwap::new(Arc::new(meta))) 116 103 } 104 + 105 + #[derive(Debug, Clone)] 106 + pub struct TierRule { 107 + pub pattern: glob::Pattern, 108 + pub tier_name: SmolStr, 109 + } 110 + 111 + /// policy for resolving rate tiers for PDS hosts. 112 + /// 113 + /// resolution order: 114 + /// 1. explicit api-assigned override for the host (stored in `HostDesc.tier`) 115 + /// 2. first matching rule in `rules` (config glob patterns, in order) 116 + /// 3. built-in `"default"` tier 117 + #[derive(Debug, Clone)] 118 + pub struct TierPolicy { 119 + /// named rate tier definitions. 120 + pub tiers: HashMap<SmolStr, RateTier>, 121 + /// ordered glob rules, first match wins (for unassigned hosts). 122 + pub rules: Vec<TierRule>, 123 + } 124 + 125 + impl TierPolicy { 126 + /// resolves the effective `RateTier` for `host`. 127 + /// 128 + /// `override_name` is the api-assigned tier name from `HostDesc.tier`, if any. 129 + pub fn resolve(&self, host: &str, override_name: Option<&SmolStr>) -> RateTier { 130 + let default = self 131 + .tiers 132 + .get("default") 133 + .copied() 134 + .unwrap_or_else(RateTier::default_tier); 135 + 136 + if let Some(name) = override_name { 137 + let tier = self.tiers.get(name).copied().unwrap_or(default); 138 + debug!(host, override = %name, account_limit = ?tier.account_limit, "tier resolved via explicit override"); 139 + return tier; 140 + } 141 + 142 + let matched = self.rules.iter().find(|r| r.pattern.matches(host)); 143 + 144 + let tier = matched 145 + .and_then(|r| self.tiers.get(&r.tier_name).copied()) 146 + .unwrap_or(default); 147 + 148 + debug!( 149 + host, 150 + matched_rule = matched.map(|r| format!("{}:{}", r.pattern, r.tier_name)).as_deref(), 151 + account_limit = ?tier.account_limit, 152 + "tier resolved via glob rules" 153 + ); 154 + tier 155 + } 156 + }
+50 -12
src/state.rs
··· 11 11 use url::Url; 12 12 13 13 use crate::{ 14 - config::{Config, RateTier}, 14 + config::Config, 15 15 db::Db, 16 16 filter::{FilterHandle, new_handle as new_filter_handle}, 17 - pds_meta::{PdsMeta, PdsMetaHandle, new_handle as new_pds_handle}, 17 + pds_meta::{PdsMeta, PdsMetaHandle, TierPolicy, new_handle as new_pds_handle}, 18 18 resolver::Resolver, 19 19 util::throttle::Throttler, 20 20 }; ··· 24 24 pub resolver: Resolver, 25 25 pub(crate) filter: FilterHandle, 26 26 pub(crate) pds_meta: PdsMetaHandle, 27 - pub(crate) rate_tiers: HashMap<String, RateTier>, 27 + pub(crate) tier_policy: TierPolicy, 28 28 pub firehose_cursors: scc::HashIndex<Url, AtomicI64>, 29 29 #[cfg(feature = "indexer")] 30 30 pub backfill_notify: Notify, ··· 81 81 .or_insert_with(crate::pds_meta::HostDesc::default) 82 82 .status = stat; 83 83 } 84 - for host in &config.trusted_hosts { 85 - let entry = hosts 86 - .entry(host.clone()) 87 - .or_insert_with(crate::pds_meta::HostDesc::default); 88 - if entry.tier.is_none() { 89 - entry.tier = Some(SmolStr::new("trusted")); 90 - } 91 - } 92 84 93 85 let pds_meta = new_pds_handle(PdsMeta { hosts }); 94 86 ··· 105 97 resolver, 106 98 filter, 107 99 pds_meta, 108 - rate_tiers: config.rate_tiers.clone(), 100 + tier_policy: config.tier_policy.clone(), 109 101 firehose_cursors: relay_cursors, 110 102 #[cfg(feature = "indexer")] 111 103 backfill_notify: Notify::new(), ··· 152 144 self.backfill_enabled.send_replace(backfill_was); 153 145 154 146 result 147 + } 148 + 149 + /// applies an account limit status transition for `host`, writing to `batch` and updating 150 + /// in-memory state. call this after any event that changes the active account count for a PDS. 151 + pub(crate) fn apply_host_limit_status( 152 + &self, 153 + batch: &mut fjall::OwnedWriteBatch, 154 + host: &str, 155 + count: u64, 156 + ) { 157 + use crate::db::pds_meta as db_pds; 158 + use crate::pds_meta::PdsMeta; 159 + use tracing::{debug, error}; 160 + 161 + let (current_status, limit) = { 162 + let meta = self.pds_meta.load(); 163 + let override_name = meta.hosts.get(host).and_then(|h| h.tier.as_ref()); 164 + let tier = self.tier_policy.resolve(host, override_name); 165 + (meta.status(host), tier.account_limit) 166 + }; 167 + 168 + debug!(%host, ?current_status, ?limit, count, "apply_host_limit_status"); 169 + 170 + let Some(new_status) = current_status.check_limit_transition(count, limit) else { 171 + return; 172 + }; 173 + 174 + debug!(%host, count, ?limit, ?new_status, "account count crossed limit, shifting status"); 175 + 176 + if let Err(e) = db_pds::set_status(batch, &self.db.filter, host, new_status) { 177 + error!(%host, err = %e, "failed to write host status"); 178 + return; 179 + } 180 + 181 + PdsMeta::update_host(&self.pds_meta, host, |h| h.status = new_status); 182 + } 183 + 184 + /// checks whether `host` is at or over its account limit at the given count. 185 + /// does not modify any state. 186 + pub(crate) fn is_over_account_limit(&self, host: &str, count: u64) -> bool { 187 + let meta = self.pds_meta.load(); 188 + let override_name = meta.hosts.get(host).and_then(|h| h.tier.as_ref()); 189 + self.tier_policy 190 + .resolve(host, override_name) 191 + .account_limit 192 + .is_some_and(|l| count >= l) 155 193 } 156 194 }
+15 -4
tests/common.nu
··· 91 91 curl -X POST -H "Content-Type: application/json" -H $"Authorization: Bearer ($jwt)" $"($pds_url)/xrpc/com.atproto.server.activateAccount" 92 92 } 93 93 94 + # extract the hydrant executable path from cargo's json build output 95 + def parse-hydrant-executable [output: string] { 96 + $output 97 + | lines 98 + | each { |line| try { $line | from json } catch { null } } 99 + | compact 100 + | where { |r| $r.reason? == "compiler-artifact" and $r.executable? != null and ($r.target?.name? == "hydrant") } 101 + | last 102 + | get executable 103 + } 104 + 94 105 # build the hydrant binary 95 106 export def build-hydrant [] { 96 107 if ($env | get --optional HYDRANT_BINARY | is-not-empty) { 97 108 return $env.HYDRANT_BINARY 98 109 } 99 110 print "building hydrant..." 100 - cargo build 101 - "target/debug/hydrant" 111 + let out = (^cargo build --message-format json err> /dev/null | complete) 112 + parse-hydrant-executable $out.stdout 102 113 } 103 114 104 115 # build the hydrant binary with extra cargo features (space-separated string) ··· 107 118 return $env.HYDRANT_BINARY 108 119 } 109 120 print $"building hydrant with features: ($features)..." 110 - cargo build --features $features 111 - "target/debug/hydrant" 121 + let out = (^cargo build --features $features --message-format json err> /dev/null | complete) 122 + parse-hydrant-executable $out.stdout 112 123 } 113 124 114 125 # start hydrant in the background
+123 -1
tests/pds_status.nu
··· 62 62 print "starting mock pds websocket server..." 63 63 let mock_pds_handle = (start-mock-pds $mock_port) 64 64 65 + sleep 500ms 66 + http post -t application/json $"($url)/firehose/sources" { 67 + url: $"ws://($mock_host):($mock_port)/", 68 + is_pds: true 69 + } 70 + 65 71 print "checking status transitions back to Active..." 66 72 mut active = false 67 73 ··· 132 138 133 139 if $re_active { 134 140 print "ok: host transitioned back to active successfully." 135 - exit 0 136 141 } else { 137 142 fail "host did not transition back to active after tier removed" 143 + } 144 + 145 + # verify that, 146 + # 1. the glob rule resolves the tier for unassigned hosts (no explicit PUT /pds/tiers needed) 147 + # 2. after removing an explicit tier override, the glob rule still applies -> host stays throttled 148 + 149 + print "starting hydrant instance with a glob tier rule..." 150 + let port2 = ($port + 100) 151 + let url2 = $"http://localhost:($port2)" 152 + let db2 = (mktemp -d -t hydrant_test.XXXXXX) 153 + 154 + let instance2 = (with-env { 155 + HYDRANT_RELAY_HOSTS: "", 156 + HYDRANT_CRAWLER_URLS: "", 157 + HYDRANT_RATE_TIERS: "custom:1/1/1/1/0", 158 + HYDRANT_TIER_RULES: $"127.0.0.*:custom" 159 + } { 160 + start-hydrant $binary $db2 $port2 161 + }) 162 + if not (wait-for-api $url2) { 163 + try { kill $instance2.pid } 164 + fail "hydrant instance did not start" 165 + } 166 + 167 + # kill any stale listener from first round 168 + try { bash -c $"fuser -k ($mock_port)/tcp" } catch {} 169 + sleep 100ms 170 + 171 + # connect mock pds and wait for offline -> active cycle (same as above) 172 + http post -t application/json $"($url2)/firehose/sources" { 173 + url: $"ws://($mock_host):($mock_port)/", 174 + is_pds: true 175 + } 176 + 177 + # wait for offline 178 + print "waiting for offline..." 179 + mut offline2 = false 180 + for i in 1..20 { 181 + let res = (http get -fe $"($url2)/xrpc/com.atproto.sync.getHostStatus?hostname=($mock_host)") 182 + if $res.status == 200 and $res.body.status == "offline" { 183 + $offline2 = true 184 + break 185 + } 186 + sleep 2sec 187 + } 188 + if not $offline2 { 189 + try { kill $instance2.pid } 190 + fail "glob test: host did not go offline" 191 + } 192 + 193 + print "starting mock pds for glob test..." 194 + let mock_pds2 = (start-mock-pds $mock_port) 195 + sleep 500ms 196 + http post -t application/json $"($url2)/firehose/sources" { 197 + url: $"ws://($mock_host):($mock_port)/", 198 + is_pds: true 199 + } 200 + 201 + # with account_limit=0 and the glob rule active, the host goes straight to throttled 202 + # on the first successful connection — no explicit set_tier call needed. 203 + print "waiting for connected (expect throttled, not active)..." 204 + mut connected2 = false 205 + for i in 1..20 { 206 + let res = (http get -fe $"($url2)/xrpc/com.atproto.sync.getHostStatus?hostname=($mock_host)") 207 + if $res.status == 200 and $res.body.status != "offline" { 208 + $connected2 = true 209 + print $" connected with status: ($res.body.status)" 210 + break 211 + } 212 + sleep 2sec 213 + } 214 + if not $connected2 { 215 + stop-mock-pds $mock_pds2 216 + try { kill $instance2.pid } 217 + fail "glob test: host did not reconnect" 218 + } 219 + 220 + # verify the glob rule throttled the host automatically (no set_tier was called) 221 + print "checking glob test: glob rule throttles host without explicit tier assignment..." 222 + let res = (http get -fe $"($url2)/xrpc/com.atproto.sync.getHostStatus?hostname=($mock_host)") 223 + print $" status \(no set_tier\): ($res.body.status?)" 224 + if $res.status != 200 or $res.body.status != "throttled" { 225 + stop-mock-pds $mock_pds2 226 + try { kill $instance2.pid } 227 + fail $"glob test: expected throttled without set_tier \(glob rule should apply\), got ($res.body.status?)" 228 + } 229 + print "ok: host throttled by glob rule without explicit tier assignment." 230 + 231 + # set explicit tier -> still throttled (sanity check) 232 + print "checking glob test: explicit tier assignment also throttles..." 233 + http put -fe -t application/json $"($url2)/pds/tiers" { host: $mock_host, tier: "custom" } 234 + let res = (http get -fe $"($url2)/xrpc/com.atproto.sync.getHostStatus?hostname=($mock_host)") 235 + print $" status after set_tier: ($res.body.status?)" 236 + if $res.status != 200 or $res.body.status != "throttled" { 237 + stop-mock-pds $mock_pds2 238 + try { kill $instance2.pid } 239 + fail $"glob test: expected throttled after set_tier, got ($res.body.status?)" 240 + } 241 + print "ok: host throttled via explicit tier." 242 + 243 + # remove explicit override -> glob rule still applies -> still throttled 244 + print "checking glob test: remove explicit tier keeps host throttled via glob rule..." 245 + http delete -fe $"($url2)/pds/tiers?host=($mock_host)" 246 + 247 + sleep 500ms 248 + let res = (http get -fe $"($url2)/xrpc/com.atproto.sync.getHostStatus?hostname=($mock_host)") 249 + print $" status after remove_tier: ($res.body.status?)" 250 + let still_throttled = ($res.status == 200 and $res.body.status == "throttled") 251 + 252 + stop-mock-pds $mock_pds2 253 + try { kill $instance2.pid } 254 + 255 + if $still_throttled { 256 + print "ok: host remains throttled after tier override removed (glob rule applies)." 257 + exit 0 258 + } else { 259 + fail $"glob test: expected throttled after remove_tier \(glob rule should apply\), got ($res.body.status?)" 138 260 } 139 261 }