very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
59
fork

Configure Feed

Select the types of activity you want to include in your feed.

[ingest,lib,api] implement host status tracking for pds' and enforce account limit

dawn c4d13023 b4f9db7d

+661 -164
+11 -3
README.md
··· 171 171 | `CRAWLER_MAX_PENDING_REPOS` | `2000` | max pending repos for crawler. | 172 172 | `CRAWLER_RESUME_PENDING_REPOS` | `1000` | resume threshold for crawler pending repos. | 173 173 | `TRUSTED_HOSTS` | | comma-separated list of PDS hostnames to pre-assign to the `trusted` rate tier at startup. hosts not listed here use the `default` tier unless assigned via the API. | 174 - | `RATE_TIERS` | | comma-separated list of named rate tier definitions in `name:base/mul/hourly/daily` format (e.g. `trusted:5000/10.0/18000000/432000000`). built-in tiers (`default`, `trusted`) are always present and can be overridden. | 174 + | `RATE_TIERS` | | comma-separated list of named rate tier definitions in `name:base/mul/hourly/daily[/account_limit]` format (e.g. `trusted:5000/10.0/18000000/432000000/10000000`). the optional account limit prevents new accounts from being created on this PDS once reached. built-in tiers (`default`, `trusted`) are always present and can be overridden. | 175 175 176 176 ## build features 177 177 ··· 329 329 the per-second limit scales with the number of active accounts on the PDS: 330 330 `max(per_second_base, accounts × per_second_account_mul)`. 331 331 332 + you can also define an optional `account_limit` for a rate tier. if a PDS 333 + exceeds this number of active accounts, hydrant will reject any new account 334 + creation events from it. 335 + 336 + the built-in tiers are defined as follows: 337 + - `default`: `50` per sec (floor), `+0.5` per account. max `3_600_000`/hr, `86_400_000`/day. `100` account limit. 338 + - `trusted`: `5000` per sec (floor), `+10.0` per account. max `18_000_000`/hr, `432_000_000`/day. `10_000_000` account limit. 339 + 332 340 - `GET /pds/tiers`: list all current tier assignments alongside the available 333 341 tier definitions. 334 342 - returns `{ "assignments": [{ "host": string, "tier": string }], "rate_tiers": { <name>: { "per_second_base": int, "per_second_account_mul": float, "per_hour": int, "per_day": int } } }`. ··· 342 350 - re-assigning the same host updates the tier in place without creating a duplicate. 343 351 - `DELETE /pds/tiers`: remove an explicit tier assignment for a PDS, reverting 344 352 it to the `default` tier. 345 - - body: `{ "host": string }`. 353 + - query parameter: `?host=<hostname>` (e.g. `?host=pds.example.com`). 346 354 - returns `200` even if no assignment existed. 347 355 - `GET /pds/rate-tiers`: list the available rate tier definitions. 348 - - returns a map of tier name to `{ "per_second_base", "per_second_account_mul", "per_hour", "per_day" }`. 356 + - returns a map of tier name to `{ "per_second_base", "per_second_account_mul", "per_hour", "per_day", "account_limit" }`. 349 357 350 358 hosts listed in `TRUSTED_HOSTS` are seeded as `trusted` at startup, but only 351 359 when no database assignment already exists for that host — DB entries always win.
+1
flake.nix
··· 30 30 http-nu 31 31 clang 32 32 wild 33 + psmisc 33 34 ]; 34 35 }; 35 36 };
+4 -4
src/api/pds.rs
··· 2 2 3 3 use axum::{ 4 4 Json, Router, 5 - extract::State, 5 + extract::{Query, State}, 6 6 http::StatusCode, 7 7 routing::{delete, get, put}, 8 8 }; ··· 64 64 } 65 65 66 66 #[derive(Deserialize)] 67 - pub struct RemoveTierBody { 67 + pub struct RemoveTierQuery { 68 68 pub host: String, 69 69 } 70 70 71 71 pub async fn remove_tier( 72 72 State(hydrant): State<Hydrant>, 73 - Json(body): Json<RemoveTierBody>, 73 + Query(query): Query<RemoveTierQuery>, 74 74 ) -> Result<StatusCode, (StatusCode, String)> { 75 75 hydrant 76 76 .pds 77 - .remove_tier(body.host) 77 + .remove_tier(query.host) 78 78 .await 79 79 .map(|_| StatusCode::OK) 80 80 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))
+3 -6
src/api/xrpc/get_host_status.rs
··· 1 - use jacquard_api::com_atproto::sync::{ 2 - HostStatus, 3 - get_host_status::{ 4 - GetHostStatusError, GetHostStatusOutput, GetHostStatusRequest, GetHostStatusResponse, 5 - }, 1 + use jacquard_api::com_atproto::sync::get_host_status::{ 2 + GetHostStatusError, GetHostStatusOutput, GetHostStatusRequest, GetHostStatusResponse, 6 3 }; 7 4 use jacquard_common::CowStr; 8 5 ··· 26 23 account_count: Some(host.account_count as i64), 27 24 hostname: CowStr::Owned(host.name), 28 25 seq: Some(host.seq), 29 - status: host.is_banned.then_some(HostStatus::Banned), 26 + status: Some(host.status.into()), 30 27 extra_data: None, 31 28 })) 32 29 }
+3 -4
src/api/xrpc/list_hosts.rs
··· 1 - use jacquard_api::com_atproto::sync::{ 2 - HostStatus, 3 - list_hosts::{Host, ListHostsOutput, ListHostsRequest, ListHostsResponse}, 1 + use jacquard_api::com_atproto::sync::list_hosts::{ 2 + Host, ListHostsOutput, ListHostsRequest, ListHostsResponse, 4 3 }; 5 4 use jacquard_common::CowStr; 6 5 ··· 25 24 .map(|h| Host { 26 25 hostname: CowStr::Owned(h.name), 27 26 seq: Some(h.seq), 28 - status: h.is_banned.then_some(HostStatus::Banned), 27 + status: Some(h.status.into()), 29 28 account_count: Some(h.account_count as i64), 30 29 extra_data: None, 31 30 })
+7 -2
src/config.rs
··· 22 22 pub per_hour: u64, 23 23 /// per-day limit. 24 24 pub per_day: u64, 25 + /// maximum active account limit for this host before dropping tracking of new accounts 26 + pub account_limit: Option<u64>, 25 27 } 26 28 27 29 impl RateTier { ··· 32 34 per_second_account_mul: 10.0, 33 35 per_hour: 5000 * 3600, 34 36 per_day: 5000 * 86400, 37 + account_limit: Some(10_000_000), 35 38 } 36 39 } 37 40 ··· 42 45 per_second_account_mul: 0.5, 43 46 per_hour: 1000 * 3600, 44 47 per_day: 1000 * 86400, 48 + account_limit: Some(100), 45 49 } 46 50 } 47 51 48 - /// parse `base/mul/hourly/daily` format used by `HYDRANT_RATE_TIERS`. 52 + /// parse `base/mul/hourly/daily[/account_limit]` format used by `HYDRANT_RATE_TIERS`. 49 53 fn parse(s: &str) -> Option<Self> { 50 54 let parts: Vec<&str> = s.split('/').collect(); 51 - if parts.len() != 4 { 55 + if parts.len() < 4 || parts.len() > 5 { 52 56 return None; 53 57 } 54 58 Some(Self { ··· 56 60 per_second_account_mul: parts[1].parse().ok()?, 57 61 per_hour: parts[2].parse().ok()?, 58 62 per_day: parts[3].parse().ok()?, 63 + account_limit: parts.get(4).and_then(|p| p.parse().ok()), 59 64 }) 60 65 } 61 66 }
+31 -15
src/control/mod.rs
··· 56 56 #[cfg(feature = "relay")] 57 57 use stream::relay_stream_thread; 58 58 59 + #[derive(Debug, Clone)] 59 60 /// infromation about a host hydrant is consuming from. 60 61 pub struct Host { 61 62 /// hostname of the host. ··· 64 65 pub seq: i64, 65 66 /// the amount of accounts hydrant has seen from this host. 66 67 pub account_count: u64, 67 - /// whether this host is banned or not. 68 - pub is_banned: bool, 68 + /// the status of this host in hydrant. 69 + pub status: crate::pds_meta::HostStatus, 69 70 } 70 71 71 72 /// an event emitted by the hydrant event stream. ··· 767 768 768 769 tokio::task::spawn_blocking(move || { 769 770 let key = keys::firehose_cursor_key(&hostname); 770 - let Some(seq) = state.db.cursors.get(&key).into_diagnostic()? else { 771 - return Ok(None); 772 - }; 773 - let seq = i64::from_be_bytes( 774 - seq.as_ref() 775 - .try_into() 776 - .into_diagnostic() 777 - .wrap_err("cursor value is not 8 bytes")?, 778 - ); 771 + 772 + let mut seq = 0; 773 + if let Some(cursor_bytes) = state.db.cursors.get(&key).into_diagnostic()? { 774 + seq = i64::from_be_bytes(cursor_bytes.as_ref().try_into().into_diagnostic()?); 775 + } else { 776 + // if it has no cursor, check if it's explicitly tracked in hosts map 777 + // or firehose tasks (recently added via API but no messages yet) 778 + let meta = state.pds_meta.load(); 779 + if !meta.hosts.contains_key(hostname.as_str()) { 780 + // we should also allow it if it's an active firehose ingestor 781 + let mut found_in_cursors = false; 782 + state.firehose_cursors.iter_sync(|u, _| { 783 + if u.host_str() == Some(hostname.as_str()) { 784 + found_in_cursors = true; 785 + } 786 + !found_in_cursors // continue if not found 787 + }); 788 + 789 + if !found_in_cursors { 790 + return Ok(None); 791 + } 792 + } 793 + } 794 + 779 795 let account_count = state 780 796 .db 781 797 .get_count_sync(&keys::pds_account_count_key(&hostname)); 782 - let is_banned = state.pds_meta.load().is_banned(&hostname); 798 + let status = state.pds_meta.load().status(&hostname); 783 799 784 800 Ok(Some(Host { 785 801 name: hostname.into(), 786 802 seq, 787 803 account_count, 788 - is_banned, 804 + status, 789 805 })) 790 806 }) 791 807 .await ··· 835 851 let account_count = state 836 852 .db 837 853 .get_count_sync(&keys::pds_account_count_key(hostname)); 838 - let is_banned = state.pds_meta.load().is_banned(&hostname); 854 + let status = state.pds_meta.load().status(hostname); 839 855 hosts.push(Host { 840 856 name: hostname.into(), 841 857 seq, 842 858 account_count, 843 - is_banned, 859 + status, 844 860 }); 845 861 } 846 862
+67 -13
src/control/pds.rs
··· 7 7 8 8 use crate::config::RateTier; 9 9 use crate::db::pds_meta as db_pds; 10 - use crate::pds_meta::PdsMeta; 10 + use crate::pds_meta::{HostStatus, PdsMeta}; 11 11 use crate::state::AppState; 12 12 13 13 /// a single PDS-to-tier assignment. ··· 24 24 pub per_second_account_mul: f64, 25 25 pub per_hour: u64, 26 26 pub per_day: u64, 27 + pub account_limit: Option<u64>, 27 28 } 28 29 29 30 impl From<RateTier> for PdsTierDefinition { ··· 33 34 per_second_account_mul: t.per_second_account_mul, 34 35 per_hour: t.per_hour, 35 36 per_day: t.per_day, 37 + account_limit: t.account_limit, 36 38 } 37 39 } 38 40 } ··· 64 66 Ok(()) 65 67 } 66 68 69 + fn check_limit_transition(&self, host: &str, account_limit: Option<u64>) -> Option<HostStatus> { 70 + let count_key = crate::db::keys::pds_account_count_key(host); 71 + let count = self.0.db.get_count_sync(&count_key); 72 + let current_status = self.0.pds_meta.load().status(host); 73 + current_status.check_limit_transition(count, account_limit) 74 + } 75 + 67 76 /// list all current per-PDS tier assignments. 68 77 pub async fn list_tiers(&self) -> HashMap<String, String> { 69 78 let snapshot = self.0.pds_meta.load(); 70 79 snapshot 71 - .tiers 80 + .hosts 72 81 .iter() 73 - .map(|(host, tier)| (host.clone(), tier.to_string())) 82 + .filter_map(|(host, desc)| desc.tier.as_ref().map(|t| (host.clone(), t.to_string()))) 74 83 .collect() 75 84 } 76 85 ··· 78 87 pub fn get_tier(&self, host: impl AsRef<str>) -> String { 79 88 let snapshot = self.0.pds_meta.load(); 80 89 snapshot 81 - .tiers 90 + .hosts 82 91 .get(host.as_ref()) 92 + .and_then(|h| h.tier.as_ref()) 83 93 .map(|t| t.to_string()) 84 94 .unwrap_or_else(|| "default".to_string()) 85 95 } ··· 92 102 /// list all currently banned PDS hosts. 93 103 pub async fn list_banned(&self) -> Vec<String> { 94 104 let snapshot = self.0.pds_meta.load(); 95 - snapshot.banned.iter().cloned().collect() 105 + snapshot 106 + .hosts 107 + .iter() 108 + .filter_map(|(host, desc)| { 109 + matches!(desc.status, HostStatus::Banned).then(|| host.clone()) 110 + }) 111 + .collect() 96 112 } 97 113 98 114 /// list all configured rate tier definitions. ··· 117 133 let host = host.as_ref().to_string(); 118 134 let host_clone = host.clone(); 119 135 let tier_clone = tier.clone(); 136 + 137 + let new_tier_limit = self.0.rate_tiers.get(&tier).unwrap().account_limit; 138 + let maybe_status = self.check_limit_transition(&host, new_tier_limit); 139 + 120 140 self.update( 121 - move |batch, ks| db_pds::set_tier(batch, ks, &host_clone, &tier_clone), 141 + move |batch, ks| { 142 + let _ = db_pds::set_tier(batch, ks, &host_clone, &tier_clone); 143 + if let Some(status) = maybe_status { 144 + let _ = db_pds::set_status(batch, ks, &host_clone, status); 145 + } 146 + }, 122 147 move |meta| { 123 - meta.tiers.insert(host, SmolStr::new(&tier)); 148 + meta.update_host_entry(&host, |entry| { 149 + entry.tier = Some(SmolStr::new(&tier)); 150 + if let Some(status) = maybe_status { 151 + entry.status = status; 152 + } 153 + }); 124 154 }, 125 155 ) 126 156 .await ··· 130 160 pub async fn remove_tier(&self, host: impl AsRef<str>) -> Result<()> { 131 161 let host = host.as_ref().to_string(); 132 162 let host_clone = host.clone(); 163 + 164 + let default_tier_limit = self 165 + .0 166 + .rate_tiers 167 + .get("default") 168 + .and_then(|t| t.account_limit); 169 + let maybe_status = self.check_limit_transition(&host, default_tier_limit); 170 + 133 171 self.update( 134 - move |batch, ks| db_pds::remove_tier(batch, ks, &host_clone), 172 + move |batch, ks| { 173 + let _ = db_pds::remove_tier(batch, ks, &host_clone); 174 + if let Some(status) = maybe_status { 175 + let _ = db_pds::set_status(batch, ks, &host_clone, status); 176 + } 177 + }, 135 178 move |meta| { 136 - meta.tiers.remove(&host); 179 + meta.update_host_entry(&host, |desc| { 180 + desc.tier = None; 181 + if let Some(status) = maybe_status { 182 + desc.status = status; 183 + } 184 + }); 137 185 }, 138 186 ) 139 187 .await ··· 144 192 let host = host.as_ref().to_string(); 145 193 let host_clone = host.clone(); 146 194 self.update( 147 - move |batch, ks| db_pds::set_banned(batch, ks, &host_clone), 195 + move |batch, ks| { 196 + let _ = db_pds::set_status(batch, ks, &host_clone, HostStatus::Banned); 197 + }, 148 198 move |meta| { 149 - meta.banned.insert(host); 199 + meta.update_host_entry(&host, |desc| { 200 + desc.status = HostStatus::Banned; 201 + }); 150 202 }, 151 203 ) 152 204 .await ··· 157 209 let host = host.as_ref().to_string(); 158 210 let host_clone = host.clone(); 159 211 self.update( 160 - move |batch, ks| db_pds::remove_banned(batch, ks, &host_clone), 212 + move |batch, ks| db_pds::remove_status(batch, ks, &host_clone), 161 213 move |meta| { 162 - meta.banned.remove(&host); 214 + meta.update_host_entry(&host, |desc| { 215 + desc.status = HostStatus::Active; 216 + }); 163 217 }, 164 218 ) 165 219 .await
+2 -2
src/crawler/list_repos.rs
··· 234 234 return (did, retry_state.into()); 235 235 } 236 236 if is_throttle_worthy(&e) { 237 - if let Some(mins) = throttle.record_failure() { 238 - warn!(url = %pds_url, mins, "throttling pds due to hard failure"); 237 + if let Some(secs) = throttle.record_failure() { 238 + warn!(url = %pds_url, secs, "throttling pds due to hard failure"); 239 239 } 240 240 let mut retry_state = throttle.to_retry_state(); 241 241 retry_state.status = e.status();
+2
src/db/migration/mod.rs
··· 8 8 mod v2; 9 9 mod v3; 10 10 mod v4; 11 + mod v5; 11 12 12 13 type MigrationFn = fn(&Db, &mut OwnedWriteBatch) -> Result<()>; 13 14 ··· 17 18 ("repo_state_root_commit", v2::repo_state_root_commit), 18 19 ("firehose_source_is_pds", v3::firehose_source_is_pds), 19 20 ("repo_state_active", v4::repo_state_active), 21 + ("pds_meta_layout", v5::pds_meta_layout), 20 22 ]; 21 23 22 24 fn read_version(db: &Db) -> Result<u64> {
+38
src/db/migration/v5.rs
··· 1 + use crate::db::Db; 2 + use fjall::OwnedWriteBatch; 3 + use miette::{Context, IntoDiagnostic, Result}; 4 + 5 + pub mod v4 { 6 + pub const PDS_TIER_PREFIX: &[u8] = b"pt|"; 7 + pub const PDS_BANNED_PREFIX: &[u8] = b"pb|"; 8 + } 9 + 10 + pub(crate) fn pds_meta_layout(db: &Db, batch: &mut OwnedWriteBatch) -> Result<()> { 11 + for guard in db.filter.prefix(v4::PDS_TIER_PREFIX) { 12 + let (k, v) = guard.into_inner().into_diagnostic()?; 13 + let host = std::str::from_utf8(&k[v4::PDS_TIER_PREFIX.len()..]) 14 + .into_diagnostic() 15 + .wrap_err("failed to parse host as utf8")?; 16 + let tier = std::str::from_utf8(&v) 17 + .into_diagnostic() 18 + .wrap_err("failed to parse tier as utf8")?; 19 + crate::db::pds_meta::set_tier(batch, &db.filter, host, tier); 20 + batch.remove(&db.filter, k); 21 + } 22 + 23 + for guard in db.filter.prefix(v4::PDS_BANNED_PREFIX) { 24 + let (k, _) = guard.into_inner().into_diagnostic()?; 25 + let host = std::str::from_utf8(&k[v4::PDS_BANNED_PREFIX.len()..]) 26 + .into_diagnostic() 27 + .wrap_err("failed to parse host as utf8")?; 28 + crate::db::pds_meta::set_status( 29 + batch, 30 + &db.filter, 31 + host, 32 + crate::pds_meta::HostStatus::Banned, 33 + )?; 34 + batch.remove(&db.filter, k); 35 + } 36 + 37 + Ok(()) 38 + }
+6 -5
src/db/mod.rs
··· 85 85 // pending 86 86 match ($old, $new) { 87 87 (GaugeState::Pending, GaugeState::Pending) => {} 88 - (GaugeState::Pending, _) => $self.$update_method("pending", -1) $(.$await)?, 89 - (_, GaugeState::Pending) => $self.$update_method("pending", 1) $(.$await)?, 88 + (GaugeState::Pending, _) => {$self.$update_method("pending", -1) $(.$await)?;}, 89 + (_, GaugeState::Pending) => {$self.$update_method("pending", 1) $(.$await)?;}, 90 90 _ => {} 91 91 } 92 92 ··· 94 94 let old_resync = $old.is_resync(); 95 95 let new_resync = $new.is_resync(); 96 96 match (old_resync, new_resync) { 97 - (true, false) => $self.$update_method("resync", -1) $(.$await)?, 98 - (false, true) => $self.$update_method("resync", 1) $(.$await)?, 97 + (true, false) => {$self.$update_method("resync", -1) $(.$await)?;}, 98 + (false, true) => {$self.$update_method("resync", 1) $(.$await)?;}, 99 99 _ => {} 100 100 } 101 101 ··· 697 697 .into_diagnostic()? 698 698 } 699 699 700 - pub fn update_count(&self, key: &str, delta: i64) { 700 + pub fn update_count(&self, key: &str, delta: i64) -> u64 { 701 701 let mut entry = self.counts_map.entry_sync(SmolStr::new(key)).or_insert(0); 702 702 if delta >= 0 { 703 703 *entry = entry.saturating_add(delta as u64); ··· 715 715 *entry -= decrement; 716 716 } 717 717 } 718 + *entry 718 719 } 719 720 720 721 pub async fn update_count_async(&self, key: &str, delta: i64) {
+61 -46
src/db/pds_meta.rs
··· 1 + use crate::pds_meta::HostStatus; 1 2 use fjall::{Keyspace, OwnedWriteBatch}; 2 3 use miette::{IntoDiagnostic, Result}; 3 4 use smol_str::SmolStr; 4 5 5 - pub const PDS_TIER_PREFIX: &[u8] = b"pt|"; 6 + pub mod v5 { 7 + use super::*; 6 8 7 - // `pt|{host}` -> tier name 8 - pub fn pds_tier_key(host: &str) -> Vec<u8> { 9 - let mut key = Vec::with_capacity(PDS_TIER_PREFIX.len() + host.len()); 10 - key.extend_from_slice(PDS_TIER_PREFIX); 11 - key.extend_from_slice(host.as_bytes()); 12 - key 13 - } 9 + // `{host}|tier` -> tier name 10 + pub fn pds_tier_key(host: &str) -> Vec<u8> { 11 + let mut key = Vec::with_capacity(host.len() + 5); 12 + key.extend_from_slice(host.as_bytes()); 13 + key.extend_from_slice(b"|tier"); 14 + key 15 + } 14 16 15 - /// load all PDS tier assignments from the filter keyspace 16 - pub fn load_tiers(ks: &Keyspace) -> Result<Vec<(SmolStr, SmolStr)>> { 17 - let mut out = Vec::new(); 18 - for guard in ks.prefix(PDS_TIER_PREFIX) { 19 - let (k, v) = guard.into_inner().into_diagnostic()?; 20 - let host = std::str::from_utf8(&k[PDS_TIER_PREFIX.len()..]).into_diagnostic()?; 21 - let tier = std::str::from_utf8(&v).into_diagnostic()?; 22 - out.push((SmolStr::new(host), SmolStr::new(tier))); 17 + /// load all PDS tier assignments from the filter keyspace 18 + pub fn load_tiers(ks: &Keyspace) -> Result<Vec<(SmolStr, SmolStr)>> { 19 + let mut out = Vec::new(); 20 + for guard in ks.iter() { 21 + let (k, v) = guard.into_inner().into_diagnostic()?; 22 + if k.ends_with(b"|tier") { 23 + let host = std::str::from_utf8(&k[..k.len() - 5]).into_diagnostic()?; 24 + let tier = std::str::from_utf8(&v).into_diagnostic()?; 25 + out.push((SmolStr::new(host), SmolStr::new(tier))); 26 + } 27 + } 28 + Ok(out) 23 29 } 24 - Ok(out) 25 - } 26 30 27 - pub fn set_tier(batch: &mut OwnedWriteBatch, ks: &Keyspace, host: &str, tier: &str) { 28 - batch.insert(ks, pds_tier_key(host), tier.as_bytes()); 29 - } 31 + pub fn set_tier(batch: &mut OwnedWriteBatch, ks: &Keyspace, host: &str, tier: &str) { 32 + batch.insert(ks, pds_tier_key(host), tier.as_bytes()); 33 + } 30 34 31 - pub fn remove_tier(batch: &mut OwnedWriteBatch, ks: &Keyspace, host: &str) { 32 - batch.remove(ks, pds_tier_key(host)); 33 - } 35 + pub fn remove_tier(batch: &mut OwnedWriteBatch, ks: &Keyspace, host: &str) { 36 + batch.remove(ks, pds_tier_key(host)); 37 + } 34 38 35 - pub const PDS_BANNED_PREFIX: &[u8] = b"pb|"; 39 + // `{host}|status` -> encoded HostStatus (msgpack) 40 + pub fn pds_status_key(host: &str) -> Vec<u8> { 41 + let mut key = Vec::with_capacity(host.len() + 7); 42 + key.extend_from_slice(host.as_bytes()); 43 + key.extend_from_slice(b"|status"); 44 + key 45 + } 36 46 37 - // `pb|{host}` -> empty value 38 - pub fn pds_banned_key(host: &str) -> Vec<u8> { 39 - let mut key = Vec::with_capacity(PDS_BANNED_PREFIX.len() + host.len()); 40 - key.extend_from_slice(PDS_BANNED_PREFIX); 41 - key.extend_from_slice(host.as_bytes()); 42 - key 43 - } 47 + /// load all host statuses from the filter keyspace 48 + pub fn load_statuses(ks: &Keyspace) -> Result<Vec<(SmolStr, HostStatus)>> { 49 + let mut out = Vec::new(); 50 + for guard in ks.iter() { 51 + let (k, v) = guard.into_inner().into_diagnostic()?; 52 + if k.ends_with(b"|status") { 53 + let host = std::str::from_utf8(&k[..k.len() - 7]).into_diagnostic()?; 54 + let status: HostStatus = rmp_serde::from_slice(&v).into_diagnostic()?; 55 + out.push((SmolStr::new(host), status)); 56 + } 57 + } 58 + Ok(out) 59 + } 44 60 45 - /// load all banned PDS hosts from the filter keyspace 46 - pub fn load_banned(ks: &Keyspace) -> Result<Vec<SmolStr>> { 47 - let mut out = Vec::new(); 48 - for guard in ks.prefix(PDS_BANNED_PREFIX) { 49 - let (k, _) = guard.into_inner().into_diagnostic()?; 50 - let host = std::str::from_utf8(&k[PDS_BANNED_PREFIX.len()..]).into_diagnostic()?; 51 - out.push(SmolStr::new(host)); 61 + pub fn set_status( 62 + batch: &mut OwnedWriteBatch, 63 + ks: &Keyspace, 64 + host: &str, 65 + status: HostStatus, 66 + ) -> Result<()> { 67 + let bytes = rmp_serde::to_vec(&status).into_diagnostic()?; 68 + batch.insert(ks, pds_status_key(host), bytes); 69 + Ok(()) 52 70 } 53 - Ok(out) 54 - } 55 71 56 - pub fn set_banned(batch: &mut OwnedWriteBatch, ks: &Keyspace, host: &str) { 57 - batch.insert(ks, pds_banned_key(host), &[]); 72 + pub fn remove_status(batch: &mut OwnedWriteBatch, ks: &Keyspace, host: &str) { 73 + batch.remove(ks, pds_status_key(host)); 74 + } 58 75 } 59 76 60 - pub fn remove_banned(batch: &mut OwnedWriteBatch, ks: &Keyspace, host: &str) { 61 - batch.remove(ks, pds_banned_key(host)); 62 - } 77 + pub use v5::*;
+78 -19
src/ingest/firehose.rs
··· 1 1 use crate::filter::{FilterHandle, FilterMode}; 2 2 use crate::ingest::stream::{FirehoseError, FirehoseStream, SubscribeReposMessage, decode_frame}; 3 3 use crate::ingest::{BufferTx, IngestMessage}; 4 + use crate::pds_meta::HostStatus; 4 5 use crate::state::AppState; 5 6 use crate::util::throttle::ThrottleHandle; 6 7 use crate::util::{ ··· 142 143 || matches!(&e, FirehoseError::EmptyFrame); 143 144 let timeout = if do_throttle { 144 145 self.throttle.record_failure(); 146 + if self.is_pds && self.throttle.consecutive_failures() >= 4 { 147 + if let Err(e) = self.set_host_status(HostStatus::Offline) { 148 + error!(err = %e, "failed to update host status to offline"); 149 + } 150 + } 145 151 let until = self.throttle.throttled_until(); 146 - Duration::from_secs((until - chrono::Utc::now().timestamp()) as u64) 152 + Duration::from_secs( 153 + 0.max((until - chrono::Utc::now().timestamp()) as i64) as u64 154 + ) 147 155 } else { 148 156 Duration::from_secs(10) 149 157 }; ··· 157 165 158 166 self.throttle.record_success(); 159 167 info!("firehose connected"); 160 - let connected_at = tokio::time::Instant::now(); 168 + let mut marked_active = false; 169 + let active_sleep_secs = if cfg!(debug_assertions) { 1 } else { 60 }; 170 + let mut active_sleep = 171 + std::pin::pin!(tokio::time::sleep(Duration::from_secs(active_sleep_secs))); 161 172 162 173 let res = loop { 163 174 tokio::select! { ··· 188 199 } 189 200 } 190 201 } 191 - self.handle_message(msg).await 202 + self.handle_message(msg).await; 192 203 }, 193 204 Err(e) => match e { 194 205 // dont disconnect on unknown op or type ··· 204 215 e => break Err(e), 205 216 }, 206 217 } 207 - if connected_at.elapsed() > Duration::from_secs(60) { 208 - backoff = Duration::from_secs(0); 218 + } 219 + _ = &mut active_sleep, if !marked_active => { 220 + marked_active = true; 221 + backoff = Duration::from_secs(0); 222 + if self.is_pds { 223 + let (current_status, tier) = { 224 + let meta = self.state.pds_meta.load(); 225 + (meta.status(host), meta.tier_for(host, &self.state.rate_tiers)) 226 + }; 227 + if current_status != HostStatus::Banned { 228 + let count = self.state.db.get_count_sync(&count_key); 229 + let new_status = tier.account_limit.is_some_and(|l| count >= l) 230 + .then_some(HostStatus::Throttled).unwrap_or(HostStatus::Active); 231 + 232 + if current_status != new_status { 233 + if let Err(e) = self.set_host_status(new_status) { 234 + error!(err = %e, "failed to update host status"); 235 + } 236 + } 237 + } 209 238 } 210 239 } 211 240 _ = self.enabled.changed() => { ··· 218 247 }; 219 248 220 249 if let Err(e) = res { 221 - if let FirehoseError::StreamClosed { code, reason } = &e 222 - && *code == 1001 223 - { 224 - debug!(reason = %reason, "host gone away"); 225 - tokio::time::sleep(Duration::from_secs(1)).await; 226 - continue; 227 - } 228 - if let FirehoseError::RelayError { error, message } = e { 229 - let message = message.map_or(Cow::Borrowed("<no message>"), Cow::Owned); 230 - error!(err = %error, "relay sent error: {message}"); 231 - } else if backoff.as_secs() < 60 { 232 - // stop logging errors after a minute of retries 233 - // as to not spam logs, unlikely for error to change atp 234 - error!(err = %e, "firehose stream error"); 250 + match &e { 251 + FirehoseError::StreamClosed { code: 1001, reason } => { 252 + debug!(reason = %reason, "host gone away"); 253 + tokio::time::sleep(Duration::from_secs(1)).await; 254 + continue; 255 + } 256 + FirehoseError::FutureCursor => { 257 + if self.is_pds 258 + && let Err(e) = self.set_host_status(HostStatus::Idle) 259 + { 260 + error!(err = %e, "failed to update host status to idle"); 261 + } 262 + debug!("outdated cursor, backing off"); 263 + tokio::time::sleep(Duration::from_secs(60)).await; 264 + continue; 265 + } 266 + FirehoseError::RelayError { error, message } => { 267 + let message = message 268 + .as_deref() 269 + .map_or(Cow::Borrowed("<no message>"), Cow::Borrowed); 270 + error!(err = %error, "relay sent error: {message}"); 271 + } 272 + _ if backoff.as_secs() < 60 => { 273 + // stop logging errors after a minute of retries 274 + // as to not spam logs, unlikely for error to change atp 275 + error!(err = %e, "firehose stream error"); 276 + } 277 + _ => {} 235 278 } 236 279 if backoff.is_zero() { 237 280 backoff = Duration::from_secs(5); ··· 243 286 backoff = (backoff * 2).min(MAX_BACKOFF); 244 287 } 245 288 } 289 + } 290 + 291 + fn set_host_status(&self, status: HostStatus) -> Result<()> { 292 + let Some(host) = self.relay_host.host_str() else { 293 + return Ok(()); 294 + }; 295 + 296 + debug!(host = %host, status = ?status, "updating host status"); 297 + 298 + let mut batch = self.state.db.inner.batch(); 299 + crate::db::pds_meta::set_status(&mut batch, &self.state.db.filter, host, status)?; 300 + batch.commit().into_diagnostic()?; 301 + 302 + crate::pds_meta::PdsMeta::update_host(&self.state.pds_meta, host, |h| h.status = status); 303 + 304 + Ok(()) 246 305 } 247 306 248 307 async fn handle_message(&self, msg: SubscribeReposMessage<'_>) {
+49 -3
src/ingest/relay.rs
··· 524 524 if is_pds { 525 525 if let Some(host) = firehose.host_str() { 526 526 let count_key = keys::pds_account_count_key(host); 527 - if !was_active && repo_state.active { 528 - ctx.state.db.update_count(&count_key, 1); 527 + let changed = if !was_active && repo_state.active { 528 + Some(ctx.state.db.update_count(&count_key, 1)) 529 529 } else if was_active && !repo_state.active { 530 - ctx.state.db.update_count(&count_key, -1); 530 + Some(ctx.state.db.update_count(&count_key, -1)) 531 + } else { 532 + None 533 + }; 534 + 535 + if let Some(count) = changed { 536 + let (current_status, limit) = { 537 + let meta = ctx.state.pds_meta.load(); 538 + ( 539 + meta.status(host), 540 + meta.tier_for(host, &ctx.state.rate_tiers).account_limit, 541 + ) 542 + }; 543 + 544 + if let Some(status) = current_status.check_limit_transition(count, limit) { 545 + debug!(%host, count, ?limit, ?status, "account count crossed limit, shifting status"); 546 + if let Err(e) = crate::db::pds_meta::set_status( 547 + &mut ctx.batch, 548 + &ctx.state.db.filter, 549 + host, 550 + status, 551 + ) { 552 + error!(err = %e, "failed to write host status"); 553 + } else { 554 + crate::pds_meta::PdsMeta::update_host(&ctx.state.pds_meta, host, |h| { 555 + h.status = status 556 + }); 557 + } 558 + } 531 559 } 532 560 } 533 561 } ··· 847 875 if pds_host.as_deref() != msg.firehose.host_str() { 848 876 warn!(did = %did, got = ?pds_host, expected = ?msg.firehose.host_str(), "message rejected: wrong host for new account"); 849 877 return Ok(None); 878 + } 879 + 880 + if let Some(host) = msg.firehose.host_str() { 881 + let tier = self 882 + .state 883 + .pds_meta 884 + .load() 885 + .tier_for(host, &self.state.rate_tiers); 886 + if let Some(limit) = tier.account_limit { 887 + let count = self 888 + .state 889 + .db 890 + .get_count_sync(&crate::db::keys::pds_account_count_key(host)); 891 + if count >= limit { 892 + warn!(did = %did, host, count, limit, "account limit reached for host, dropping new account"); 893 + return Ok(None); 894 + } 895 + } 850 896 } 851 897 } 852 898
+9 -1
src/ingest/stream.rs
··· 47 47 StreamClosed { code: u16, reason: String }, 48 48 #[error("tcp layer dropped")] 49 49 TcpDropped, 50 + #[error("future cursor")] 51 + FutureCursor, 50 52 } 51 53 52 54 impl From<serde_ipld_dagcbor::DecodeError<Infallible>> for FirehoseError { ··· 612 614 SubscribeReposMessage::Identity(Box::new(Deserialize::deserialize(&mut de)?)) 613 615 } 614 616 "#sync" => SubscribeReposMessage::Sync(Box::new(Deserialize::deserialize(&mut de)?)), 615 - "#info" => SubscribeReposMessage::Info(Box::new(Deserialize::deserialize(&mut de)?)), 617 + "#info" => { 618 + let info: crate::ingest::stream::Info<'i> = Deserialize::deserialize(&mut de)?; 619 + if info.name == InfoName::OutdatedCursor { 620 + return Err(FirehoseError::FutureCursor); 621 + } 622 + SubscribeReposMessage::Info(Box::new(info)) 623 + } 616 624 other => return Err(FirehoseError::UnknownType(other.to_string())), 617 625 }; 618 626
+87 -5
src/pds_meta.rs
··· 1 1 use crate::config::RateTier; 2 2 use arc_swap::ArcSwap; 3 + use serde::{Deserialize, Serialize}; 3 4 use smol_str::SmolStr; 4 - use std::collections::{HashMap, HashSet}; 5 + use std::collections::HashMap; 5 6 use std::sync::Arc; 6 7 8 + #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] 9 + pub enum HostStatus { 10 + Active, 11 + Idle, 12 + Offline, 13 + Throttled, 14 + Banned, 15 + } 16 + 17 + impl From<HostStatus> for jacquard_api::com_atproto::sync::HostStatus<'static> { 18 + fn from(status: HostStatus) -> Self { 19 + match status { 20 + HostStatus::Active => Self::Active, 21 + HostStatus::Idle => Self::Idle, 22 + HostStatus::Offline => Self::Offline, 23 + HostStatus::Throttled => Self::Throttled, 24 + HostStatus::Banned => Self::Banned, 25 + } 26 + } 27 + } 28 + 29 + impl HostStatus { 30 + /// returns the new status to apply if the count dynamically crossed limits. 31 + pub fn check_limit_transition( 32 + self, 33 + current_count: u64, 34 + account_limit: Option<u64>, 35 + ) -> Option<Self> { 36 + if self == Self::Banned { 37 + return None; 38 + } 39 + match account_limit { 40 + Some(limit) if current_count >= limit && self != Self::Throttled => { 41 + Some(Self::Throttled) 42 + } 43 + Some(limit) if current_count < limit && self == Self::Throttled => Some(Self::Active), 44 + None if self == Self::Throttled => Some(Self::Active), 45 + _ => None, 46 + } 47 + } 48 + } 49 + 50 + impl Default for HostStatus { 51 + fn default() -> Self { 52 + Self::Active 53 + } 54 + } 55 + 56 + #[derive(Debug, Default, Clone)] 57 + pub struct HostDesc { 58 + pub tier: Option<SmolStr>, 59 + pub status: HostStatus, 60 + } 61 + 7 62 #[derive(Default, Clone)] 8 63 pub(crate) struct PdsMeta { 9 - pub tiers: HashMap<String, SmolStr>, 10 - pub banned: HashSet<String>, 64 + pub hosts: HashMap<String, HostDesc>, 65 + } 66 + 67 + impl PdsMeta { 68 + /// update (or insert) the `HostDesc` for `host` by applying `f` to it. 69 + pub fn update_host_entry(&mut self, host: &str, f: impl FnOnce(&mut HostDesc)) { 70 + f(self.hosts.entry(host.to_string()).or_default()); 71 + } 72 + 73 + /// atomically update (or insert) the `HostDesc` for `host` by applying `f` to it via RCU. 74 + pub(crate) fn update_host( 75 + cell: &arc_swap::ArcSwap<Self>, 76 + host: &str, 77 + mut f: impl FnMut(&mut HostDesc), 78 + ) { 79 + cell.rcu(|meta| { 80 + let mut next = (**meta).clone(); 81 + next.update_host_entry(host, &mut f); 82 + next 83 + }); 84 + } 11 85 } 12 86 13 87 impl PdsMeta { ··· 16 90 .get("default") 17 91 .copied() 18 92 .unwrap_or_else(RateTier::default_tier); 19 - self.tiers 93 + self.hosts 20 94 .get(host) 95 + .and_then(|h| h.tier.as_ref()) 21 96 .and_then(|name| rate_tiers.get(name.as_str()).copied()) 22 97 .unwrap_or(default) 23 98 } 24 99 100 + pub fn status(&self, host: &str) -> HostStatus { 101 + self.hosts 102 + .get(host) 103 + .map(|h| h.status) 104 + .unwrap_or(HostStatus::Active) 105 + } 106 + 25 107 pub fn is_banned(&self, host: &str) -> bool { 26 - self.banned.contains(host) 108 + self.status(host) == HostStatus::Banned 27 109 } 28 110 } 29 111
+29 -14
src/state.rs
··· 1 - use std::collections::{HashMap, HashSet}; 1 + use std::collections::HashMap; 2 2 use std::future::Future; 3 3 use std::sync::atomic::AtomicI64; 4 4 use std::time::Duration; ··· 55 55 56 56 let filter = new_filter_handle(filter_config); 57 57 58 - // load persisted per-PDS tier assignments from the filter keyspace. 59 - // trusted_hosts from config are merged in as defaults (not persisted here; they seed 60 - // only if the host has no existing assignment in the DB). 61 - let mut tiers: HashMap<String, SmolStr> = crate::db::pds_meta::load_tiers(&db.filter) 58 + let tiers: HashMap<String, SmolStr> = crate::db::pds_meta::load_tiers(&db.filter) 62 59 .unwrap_or_default() 63 60 .into_iter() 64 61 .map(|(host, tier)| (host.to_string(), tier)) 65 62 .collect(); 63 + 64 + let statuses: HashMap<String, crate::pds_meta::HostStatus> = 65 + crate::db::pds_meta::load_statuses(&db.filter) 66 + .unwrap_or_default() 67 + .into_iter() 68 + .map(|(host, stat)| (host.to_string(), stat)) 69 + .collect(); 70 + 71 + let mut hosts = HashMap::new(); 72 + for (host, tier) in tiers { 73 + hosts 74 + .entry(host) 75 + .or_insert_with(crate::pds_meta::HostDesc::default) 76 + .tier = Some(tier); 77 + } 78 + for (host, stat) in statuses { 79 + hosts 80 + .entry(host) 81 + .or_insert_with(crate::pds_meta::HostDesc::default) 82 + .status = stat; 83 + } 66 84 for host in &config.trusted_hosts { 67 - tiers 85 + let entry = hosts 68 86 .entry(host.clone()) 69 - .or_insert_with(|| SmolStr::new("trusted")); 87 + .or_insert_with(crate::pds_meta::HostDesc::default); 88 + if entry.tier.is_none() { 89 + entry.tier = Some(SmolStr::new("trusted")); 90 + } 70 91 } 71 92 72 - let banned: HashSet<String> = crate::db::pds_meta::load_banned(&db.filter) 73 - .unwrap_or_default() 74 - .into_iter() 75 - .map(|host| host.to_string()) 76 - .collect(); 77 - 78 - let pds_meta = new_pds_handle(PdsMeta { tiers, banned }); 93 + let pds_meta = new_pds_handle(PdsMeta { hosts }); 79 94 80 95 let relay_cursors = scc::HashIndex::new(); 81 96
+17 -11
src/util/throttle.rs
··· 112 112 } 113 113 114 114 /// called on hard failures (timeout, TLS error, bad gateway, etc). 115 - /// returns throttle duration in minutes if this is a *new* throttle, 116 - /// and notifies all in-flight tasks to cancel immediately. 115 + /// always increments `consecutive_failures`. only sets a new `throttled_until` 116 + /// (and notifies waiters) if not already throttled. 117 117 pub fn record_failure(&self) -> Option<u64> { 118 - if self.is_throttled() { 119 - return None; 120 - } 121 - 122 118 let failures = self 123 119 .state 124 120 .consecutive_failures 125 121 .fetch_add(1, Ordering::AcqRel) 126 122 + 1; 127 123 128 - // 30 min, 60 min, 120 min, ... capped at ~512 hours 129 - let base_minutes = 30u64; 124 + if self.is_throttled() { 125 + return None; 126 + } 127 + 128 + let base_secs = 15u64; 130 129 let exponent = (failures as u32).saturating_sub(1); 131 - let minutes = base_minutes * 2u64.pow(exponent.min(10)); 132 - let until = chrono::Utc::now().timestamp() + (minutes * 60) as i64; 130 + let secs = (base_secs * 2u64.pow(exponent.min(10))).min(300); 131 + #[cfg(debug_assertions)] 132 + let secs = secs.min(1); 133 + 134 + let until = chrono::Utc::now().timestamp() + secs as i64; 133 135 134 136 self.state.throttled_until.store(until, Ordering::Release); 135 137 self.state.failure_notify.notify_waiters(); 136 138 137 - Some(minutes) 139 + Some(secs) 138 140 } 139 141 140 142 /// returns current timeout duration — 3s, 6s, or 12s depending on prior timeouts. 141 143 pub fn timeout(&self) -> Duration { 142 144 let n = self.state.consecutive_timeouts.load(Ordering::Acquire); 143 145 Duration::from_secs(3 * 2u64.pow(n.min(2) as u32)) 146 + } 147 + 148 + pub fn consecutive_failures(&self) -> usize { 149 + self.state.consecutive_failures.load(Ordering::Acquire) as usize 144 150 } 145 151 146 152 /// returns whether the timeout attempts are exhausted
+4 -10
tests/api.nu
··· 272 272 let rate_tiers = (http get $"($url)/pds/rate-tiers") 273 273 for tier_name in ["default", "trusted"] { 274 274 let tier = ($rate_tiers | get $tier_name) 275 - for field in ["per_second_base", "per_second_account_mul", "per_hour", "per_day"] { 275 + for field in ["per_second_base", "per_second_account_mul", "per_hour", "per_day", "account_limit"] { 276 276 if not ($field in $tier) { 277 277 fail $"($tier_name) tier missing field ($field)" $pid 278 278 } ··· 346 346 347 347 # remove the first host 348 348 print " DELETE /pds/tiers (first host)..." 349 - http delete -f -e -t application/json $"($url)/pds/tiers" --data { 350 - host: "pds.example.com" 351 - } | assert-status 200 "DELETE /pds/tiers" $pid 349 + http delete -f -e $"($url)/pds/tiers?host=pds.example.com" | assert-status 200 "DELETE /pds/tiers" $pid 352 350 let after_del = (http get $"($url)/pds/tiers") 353 351 if ($after_del.assignments | columns | length) != 1 { 354 352 fail $"expected 1 assignment after delete, got ($after_del.assignments | columns | length)" $pid ··· 359 357 print " ok: correct host removed, other assignment intact" 360 358 361 359 # remove the second host 362 - http delete -f -e -t application/json $"($url)/pds/tiers" --data { 363 - host: "other.example.com" 364 - } | assert-status 200 "DELETE /pds/tiers second" $pid 360 + http delete -f -e $"($url)/pds/tiers?host=other.example.com" | assert-status 200 "DELETE /pds/tiers second" $pid 365 361 366 362 # deleting a non-existent host is idempotent (returns 200, not an error) 367 363 print " DELETE /pds/tiers (non-existent, expect 200)..." 368 - http delete -f -e -t application/json $"($url)/pds/tiers" --data { 369 - host: "pds.example.com" 370 - } | assert-status 200 "DELETE /pds/tiers non-existent" $pid 364 + http delete -f -e $"($url)/pds/tiers?host=pds.example.com" | assert-status 200 "DELETE /pds/tiers non-existent" $pid 371 365 let after_idempotent = (http get $"($url)/pds/tiers") 372 366 if ($after_idempotent.assignments | columns | length) != 0 { 373 367 fail "expected empty assignments after cleanup" $pid
+12
tests/mock_pds.nu
··· 1 + export def start-mock-pds [port: int] { 2 + # kill any stale process from a previous failed run holding this port 3 + try { bash -c $"fuser -k ($port)/tcp" } catch {} 4 + sleep 100ms 5 + let log_file = (mktemp) 6 + let pid = (bash -c $"websocat -s ($port) >($log_file) 2>&1 & echo $!" | str trim | into int) 7 + { pid: $pid, log: $log_file } 8 + } 9 + 10 + export def stop-mock-pds [handle: record] { 11 + try { kill $handle.pid } 12 + }
+139
tests/pds_status.nu
··· 1 + source common.nu 2 + 3 + source mock_pds.nu 4 + 5 + def main [] { 6 + let port = resolve-test-port 3033 7 + let url = $"http://localhost:($port)" 8 + let binary = build-hydrant 9 + let db = (mktemp -d -t hydrant_test.XXXXXX) 10 + 11 + let instance = (with-env { 12 + HYDRANT_RELAY_HOSTS: "", 13 + HYDRANT_CRAWLER_URLS: "", 14 + HYDRANT_RATE_TIERS: "custom:1/1/1/1/0" 15 + } { 16 + start-hydrant $binary $db $port 17 + }) 18 + if not (wait-for-api $url) { 19 + fail "hydrant did not start" $instance.pid 20 + } 21 + 22 + let mock_port = resolve-test-mock-port 9999 23 + let mock_host = "127.0.0.1" 24 + 25 + # kill any stale listener on the mock port from a previous failed run 26 + try { bash -c $"fuser -k ($mock_port)/tcp" } catch {} 27 + sleep 100ms 28 + 29 + print "adding offline mock pds via firehose sources..." 30 + http post -t application/json $"($url)/firehose/sources" { 31 + url: $"ws://($mock_host):($mock_port)/", 32 + is_pds: true 33 + } 34 + 35 + print "checking status transitions to Offline..." 36 + mut offline = false 37 + 38 + # the throttle backoff will cap at 1 second in debug builds. 39 + # it takes 4 consecutive failures to mark as offline. 40 + # therefore, 4 * 1 = ~4 seconds maximum for transition. 41 + for i in 1..20 { 42 + let res = (http get -fe $"($url)/xrpc/com.atproto.sync.getHostStatus?hostname=($mock_host)") 43 + if $res.status == 200 { 44 + if $res.body.status == "offline" { 45 + $offline = true 46 + break 47 + } 48 + if $res.body.status == "active" { 49 + print $" ... currently ($res.body.status), waiting for offline" 50 + } 51 + } else { 52 + print $" ... could not get status, waiting: ($res.status)" 53 + } 54 + sleep 2sec 55 + } 56 + 57 + if not $offline { 58 + fail "host did not transition to offline within time limit" $instance.pid 59 + } 60 + print "ok: host transitioned to offline successfully." 61 + 62 + print "starting mock pds websocket server..." 63 + let mock_pds_handle = (start-mock-pds $mock_port) 64 + 65 + print "checking status transitions back to Active..." 66 + mut active = false 67 + 68 + # now wait for it to successfully reconnect and the active_sleep of 1s to pass. 69 + for i in 1..20 { 70 + let res = (http get -fe $"($url)/xrpc/com.atproto.sync.getHostStatus?hostname=($mock_host)") 71 + if $res.status == 200 { 72 + if $res.body.status == "active" { 73 + $active = true 74 + break 75 + } 76 + if $res.body.status == "offline" { 77 + print $" ... currently ($res.body.status), waiting for active" 78 + } 79 + } else { 80 + print $" ... could not get status, waiting: ($res.status)" 81 + } 82 + sleep 2sec 83 + } 84 + 85 + if $active { 86 + print "ok: host transitioned to active successfully." 87 + } else { 88 + stop-mock-pds $mock_pds_handle 89 + try { kill $instance.pid } 90 + fail "host did not transition to active within time limit" 91 + } 92 + 93 + print "checking status transitions to Throttled..." 94 + let put_res = (http put -fe -t application/json $"($url)/pds/tiers" { 95 + host: $mock_host, 96 + tier: "custom" 97 + }) 98 + if $put_res.status != 200 { 99 + print $"PUT /pds/tiers failed with status ($put_res.status)" 100 + print $put_res.body 101 + stop-mock-pds $mock_pds_handle 102 + try { kill $instance.pid } 103 + fail "failed to change tier" 104 + } 105 + 106 + # since we updated the tier via API, the status should change immediately 107 + mut throttled = false 108 + let res = (http get -fe $"($url)/xrpc/com.atproto.sync.getHostStatus?hostname=($mock_host)") 109 + if $res.status == 200 and $res.body.status == "throttled" { 110 + $throttled = true 111 + } 112 + 113 + if not $throttled { 114 + stop-mock-pds $mock_pds_handle 115 + try { kill $instance.pid } 116 + fail "host did not transition to throttled after tier update" 117 + } 118 + print "ok: host transitioned to throttled successfully." 119 + 120 + print "checking status transitions back to Active when limits loosen..." 121 + http delete -fe $"($url)/pds/tiers?host=($mock_host)" 122 + 123 + # should change back immediately 124 + mut re_active = false 125 + let res = (http get -fe $"($url)/xrpc/com.atproto.sync.getHostStatus?hostname=($mock_host)") 126 + if $res.status == 200 and $res.body.status == "active" { 127 + $re_active = true 128 + } 129 + 130 + stop-mock-pds $mock_pds_handle 131 + try { kill $instance.pid } 132 + 133 + if $re_active { 134 + print "ok: host transitioned back to active successfully." 135 + exit 0 136 + } else { 137 + fail "host did not transition back to active after tier removed" 138 + } 139 + }
+1 -1
tests/run_all.nu
··· 46 46 print "" 47 47 48 48 # discover all test scripts, excluding infrastructure files 49 - mut excluded = ["common", "mock_relay", "run_all"] 49 + mut excluded = ["common", "mock_relay", "mock_pds", "run_all"] 50 50 if $skip_creds { 51 51 $excluded = ($excluded | append ["authenticated_stream", "repo_sync_integrity"]) 52 52 }