very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[lib,api,firehose] let pds' be bannable

dawn 640aefcf 26b126d1

+475 -225
+1 -1
examples/statusphere.rs
··· 20 20 21 21 use chrono::DateTime; 22 22 use futures::StreamExt; 23 + use hydrant::FilterMode; 23 24 use hydrant::config::Config; 24 25 use hydrant::control::{EventStream, Hydrant, ReposControl}; 25 - use hydrant::filter::FilterMode; 26 26 use jacquard_common::types::did::Did; 27 27 use jacquard_common::types::tid::Tid; 28 28 use scc::HashMap;
+39 -3
src/api/pds.rs
··· 8 8 }; 9 9 use serde::{Deserialize, Serialize}; 10 10 11 - use crate::control::{Hydrant, PdsTierAssignment, PdsTierDefinition}; 11 + use crate::control::{Hydrant, PdsTierDefinition}; 12 12 13 13 pub fn router() -> Router<Hydrant> { 14 14 Router::new() ··· 16 16 .route("/pds/tiers", put(set_tier)) 17 17 .route("/pds/tiers", delete(remove_tier)) 18 18 .route("/pds/rate-tiers", get(list_rate_tiers)) 19 + .route("/pds/banned", get(list_banned)) 20 + .route("/pds/banned", put(ban)) 21 + .route("/pds/banned", delete(unban)) 19 22 } 20 23 21 24 /// combined response: tier assignments + available tier definitions. 22 25 #[derive(Serialize)] 23 26 pub struct TiersResponse { 24 - pub assignments: Vec<PdsTierAssignment>, 27 + pub assignments: HashMap<String, String>, 25 28 pub rate_tiers: HashMap<String, PdsTierDefinition>, 26 29 } 27 30 28 31 pub async fn list_tiers(State(hydrant): State<Hydrant>) -> Json<TiersResponse> { 29 32 Json(TiersResponse { 30 - assignments: hydrant.pds.list_assignments().await, 33 + assignments: hydrant.pds.list_tiers().await, 31 34 rate_tiers: hydrant.pds.list_rate_tiers(), 32 35 }) 36 + } 37 + 38 + pub async fn list_banned(State(hydrant): State<Hydrant>) -> Json<Vec<String>> { 39 + Json(hydrant.pds.list_banned().await) 33 40 } 34 41 35 42 pub async fn list_rate_tiers( ··· 72 79 .map(|_| StatusCode::OK) 73 80 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())) 74 81 } 82 + 83 + #[derive(Deserialize)] 84 + pub struct BanBody { 85 + pub host: String, 86 + } 87 + 88 + pub async fn ban( 89 + State(hydrant): State<Hydrant>, 90 + Json(body): Json<BanBody>, 91 + ) -> Result<StatusCode, (StatusCode, String)> { 92 + hydrant 93 + .pds 94 + .ban(body.host) 95 + .await 96 + .map(|_| StatusCode::OK) 97 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())) 98 + } 99 + 100 + pub async fn unban( 101 + State(hydrant): State<Hydrant>, 102 + Json(body): Json<BanBody>, 103 + ) -> Result<StatusCode, (StatusCode, String)> { 104 + hydrant 105 + .pds 106 + .unban(body.host) 107 + .await 108 + .map(|_| StatusCode::OK) 109 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())) 110 + }
+6 -3
src/api/xrpc/get_host_status.rs
··· 1 - use jacquard_api::com_atproto::sync::get_host_status::{ 2 - GetHostStatusError, GetHostStatusOutput, GetHostStatusRequest, GetHostStatusResponse, 1 + use jacquard_api::com_atproto::sync::{ 2 + HostStatus, 3 + get_host_status::{ 4 + GetHostStatusError, GetHostStatusOutput, GetHostStatusRequest, GetHostStatusResponse, 5 + }, 3 6 }; 4 7 use jacquard_common::CowStr; 5 8 ··· 23 26 account_count: Some(host.account_count as i64), 24 27 hostname: CowStr::Owned(host.name), 25 28 seq: Some(host.seq), 26 - status: None, 29 + status: host.is_banned.then_some(HostStatus::Banned), 27 30 extra_data: None, 28 31 })) 29 32 }
+4 -3
src/api/xrpc/list_hosts.rs
··· 1 - use jacquard_api::com_atproto::sync::list_hosts::{ 2 - Host, ListHostsOutput, ListHostsRequest, ListHostsResponse, 1 + use jacquard_api::com_atproto::sync::{ 2 + HostStatus, 3 + list_hosts::{Host, ListHostsOutput, ListHostsRequest, ListHostsResponse}, 3 4 }; 4 5 use jacquard_common::CowStr; 5 6 ··· 24 25 .map(|h| Host { 25 26 hostname: CowStr::Owned(h.name), 26 27 seq: Some(h.seq), 27 - status: None, 28 + status: h.is_banned.then_some(HostStatus::Banned), 28 29 account_count: Some(h.account_count as i64), 29 30 extra_data: None, 30 31 })
+56 -43
src/control/firehose.rs
··· 5 5 use tracing::{error, info}; 6 6 use url::Url; 7 7 8 + use crate::config::FirehoseSource; 8 9 use crate::db::{self, keys}; 9 10 use crate::ingest::{BufferTx, firehose::FirehoseIngestor}; 10 11 use crate::state::AppState; ··· 35 36 pub is_pds: bool, 36 37 } 37 38 38 - pub(super) async fn spawn_firehose_ingestor( 39 - relay_url: &Url, 40 - is_pds: bool, 41 - state: &Arc<AppState>, 42 - shared: &FirehoseShared, 43 - enabled: watch::Receiver<bool>, 44 - ) -> Result<FirehoseIngestorHandle> { 45 - use std::sync::atomic::AtomicI64; 46 - 47 - let start = db::get_firehose_cursor(&state.db, relay_url).await?; 48 - // insert into relay_cursors if not already present; existing in-memory cursor takes precedence 49 - let _ = state 50 - .firehose_cursors 51 - .insert_async(relay_url.clone(), AtomicI64::new(start.unwrap_or(0))) 52 - .await; 53 - 54 - info!(relay = %relay_url, is_pds, cursor = ?start, "starting firehose ingestor"); 55 - 56 - let ingestor = FirehoseIngestor::new( 57 - state.clone(), 58 - shared.buffer_tx.clone(), 59 - relay_url.clone(), 60 - is_pds, 61 - state.filter.clone(), 62 - enabled, 63 - shared.verify_signatures, 64 - ) 65 - .await; 66 - 67 - let relay_for_log = relay_url.clone(); 68 - let abort = tokio::spawn(async move { 69 - if let Err(e) = ingestor.run().await { 70 - error!(relay = %relay_for_log, err = %e, "firehose ingestor exited with error"); 71 - } 72 - }) 73 - .abort_handle(); 74 - 75 - Ok(FirehoseIngestorHandle { abort, is_pds }) 76 - } 77 - 78 39 /// runtime control over the firehose ingestor component. 79 40 #[derive(Clone)] 80 41 pub struct FirehoseHandle { ··· 97 58 } 98 59 } 99 60 61 + pub(super) async fn spawn_firehose_ingestor( 62 + &self, 63 + source: &FirehoseSource, 64 + shared: &FirehoseShared, 65 + ) -> Result<()> { 66 + use std::sync::atomic::AtomicI64; 67 + let state = &self.state; 68 + 69 + let start = db::get_firehose_cursor(&state.db, &source.url).await?; 70 + // insert into relay_cursors if not already present; existing in-memory cursor takes precedence 71 + let _ = state 72 + .firehose_cursors 73 + .insert_async(source.url.clone(), AtomicI64::new(start.unwrap_or(0))) 74 + .await; 75 + 76 + info!(relay = %source.url, source.is_pds, cursor = ?start, "starting firehose ingestor"); 77 + 78 + let enabled = state.firehose_enabled.subscribe(); 79 + let ingestor = FirehoseIngestor::new( 80 + state.clone(), 81 + shared.buffer_tx.clone(), 82 + source.url.clone(), 83 + source.is_pds, 84 + state.filter.clone(), 85 + enabled, 86 + shared.verify_signatures, 87 + ) 88 + .await; 89 + 90 + let abort = tokio::spawn({ 91 + let relay_url = source.url.clone(); 92 + let tasks = self.tasks.clone(); 93 + async move { 94 + if let Err(e) = ingestor.run().await { 95 + error!(relay = %relay_url, err = %e, "firehose ingestor exited with error"); 96 + } else { 97 + // remove from tasks since we shutdown 98 + tasks.remove_async(&relay_url).await; 99 + info!(relay = %relay_url, "firehose shut down!"); 100 + } 101 + } 102 + }) 103 + .abort_handle(); 104 + 105 + let handle = FirehoseIngestorHandle { 106 + abort, 107 + is_pds: source.is_pds, 108 + }; 109 + self.tasks.upsert_async(source.url.clone(), handle).await; 110 + 111 + Ok(()) 112 + } 113 + 100 114 /// enable firehose ingestion, no-op if already enabled. 101 115 pub fn enable(&self) { 102 116 self.state.firehose_enabled.send_replace(true); ··· 156 170 157 171 let _ = self.persisted.insert_async(url.clone()).await; 158 172 159 - let enabled_rx = self.state.firehose_enabled.subscribe(); 160 - let handle = spawn_firehose_ingestor(&url, is_pds, &self.state, shared, enabled_rx).await?; 161 - self.tasks.upsert_async(url, handle).await; 173 + self.spawn_firehose_ingestor(&FirehoseSource { url, is_pds }, shared) 174 + .await?; 162 175 163 176 Ok(()) 164 177 }
+19 -32
src/control/mod.rs
··· 46 46 use crate::filter::FilterMode; 47 47 #[cfg(feature = "indexer")] 48 48 use crate::ingest::indexer::FirehoseWorker; 49 + use crate::pds_meta::{PdsMeta, PdsMetaHandle}; 49 50 use crate::state::AppState; 50 51 use crate::types::MarshallableEvt; 51 52 52 - use firehose::{FirehoseShared, spawn_firehose_ingestor}; 53 + use firehose::FirehoseShared; 53 54 #[cfg(feature = "indexer")] 54 55 use stream::event_stream_thread; 55 56 #[cfg(feature = "relay")] ··· 63 64 pub seq: i64, 64 65 /// the amount of accounts hydrant has seen from this host. 65 66 pub account_count: u64, 67 + /// whether this host is banned or not. 68 + pub is_banned: bool, 66 69 } 67 70 68 71 /// an event emitted by the hydrant event stream. ··· 190 193 let state = Arc::new(state); 191 194 192 195 Ok(Self { 196 + firehose: FirehoseHandle::new(state.clone()), 197 + filter: FilterControl(state.clone()), 198 + pds: pds::PdsControl(state.clone()), 199 + repos: ReposControl(state.clone()), 200 + db: DbControl(state.clone()), 193 201 #[cfg(feature = "indexer")] 194 202 crawler: crawler::CrawlerHandle { 195 203 state: state.clone(), ··· 197 205 tasks: Arc::new(scc::HashMap::new()), 198 206 persisted: Arc::new(scc::HashSet::new()), 199 207 }, 200 - firehose: FirehoseHandle::new(state.clone()), 201 208 #[cfg(feature = "indexer")] 202 209 backfill: BackfillHandle::new(state.clone()), 203 - filter: FilterControl(state.clone()), 204 - pds: pds::PdsControl(state.clone()), 205 - repos: ReposControl(state.clone()), 206 - db: DbControl(state.clone()), 207 210 #[cfg(feature = "backlinks")] 208 211 backlinks: crate::backlinks::BacklinksControl(state.clone()), 209 212 state, ··· 404 407 "starting firehose ingestor(s)" 405 408 ); 406 409 for source in &relay_hosts { 407 - let enabled_rx = state.firehose_enabled.subscribe(); 408 - let handle = spawn_firehose_ingestor( 409 - &source.url, 410 - source.is_pds, 411 - &state, 412 - fire_shared, 413 - enabled_rx, 414 - ) 415 - .await?; 416 - let _ = firehose 417 - .tasks 418 - .insert_async(source.url.clone(), handle) 419 - .await; 410 + firehose 411 + .spawn_firehose_ingestor(source, fire_shared) 412 + .await?; 420 413 } 421 414 } 422 415 ··· 432 425 if firehose.tasks.contains_async(&source.url).await { 433 426 continue; 434 427 } 435 - let enabled_rx = state.firehose_enabled.subscribe(); 436 - let handle = spawn_firehose_ingestor( 437 - &source.url, 438 - source.is_pds, 439 - &state, 440 - fire_shared, 441 - enabled_rx, 442 - ) 443 - .await?; 444 - let _ = firehose 445 - .tasks 446 - .insert_async(source.url.clone(), handle) 447 - .await; 428 + firehose 429 + .spawn_firehose_ingestor(source, fire_shared) 430 + .await?; 448 431 } 449 432 450 433 // 10c. seed firehose PDS sources from listHosts on configured seed URLs ··· 796 779 let account_count = state 797 780 .db 798 781 .get_count_sync(&keys::pds_account_count_key(&hostname)); 782 + let is_banned = state.pds_meta.load().is_banned(&hostname); 799 783 800 784 Ok(Some(Host { 801 785 name: hostname.into(), 802 786 seq, 803 787 account_count, 788 + is_banned, 804 789 })) 805 790 }) 806 791 .await ··· 850 835 let account_count = state 851 836 .db 852 837 .get_count_sync(&keys::pds_account_count_key(hostname)); 838 + let is_banned = state.pds_meta.load().is_banned(&hostname); 853 839 hosts.push(Host { 854 840 name: hostname.into(), 855 841 seq, 856 842 account_count, 843 + is_banned, 857 844 }); 858 845 } 859 846
+89 -35
src/control/pds.rs
··· 6 6 use smol_str::SmolStr; 7 7 8 8 use crate::config::RateTier; 9 - use crate::db::pds_tiers as db_pds; 9 + use crate::db::pds_meta as db_pds; 10 + use crate::pds_meta::PdsMeta; 10 11 use crate::state::AppState; 11 12 12 13 /// a single PDS-to-tier assignment. ··· 41 42 pub struct PdsControl(pub(super) Arc<AppState>); 42 43 43 44 impl PdsControl { 45 + async fn update<F, G>(&self, db_op: F, mem_op: G) -> Result<()> 46 + where 47 + F: FnOnce(&mut fjall::OwnedWriteBatch, &fjall::Keyspace) + Send + 'static, 48 + G: FnOnce(&mut PdsMeta), 49 + { 50 + let state = self.0.clone(); 51 + tokio::task::spawn_blocking(move || { 52 + let mut batch = state.db.inner.batch(); 53 + db_op(&mut batch, &state.db.filter); 54 + batch.commit().into_diagnostic()?; 55 + state.db.persist() 56 + }) 57 + .await 58 + .into_diagnostic()??; 59 + 60 + let mut snapshot = (**self.0.pds_meta.load()).clone(); 61 + mem_op(&mut snapshot); 62 + self.0.pds_meta.store(Arc::new(snapshot)); 63 + 64 + Ok(()) 65 + } 66 + 44 67 /// list all current per-PDS tier assignments. 45 - pub async fn list_assignments(&self) -> Vec<PdsTierAssignment> { 46 - let snapshot = self.0.pds_tiers.load(); 68 + pub async fn list_tiers(&self) -> HashMap<String, String> { 69 + let snapshot = self.0.pds_meta.load(); 47 70 snapshot 71 + .tiers 48 72 .iter() 49 - .map(|(host, tier)| PdsTierAssignment { 50 - host: host.clone(), 51 - tier: tier.to_string(), 52 - }) 73 + .map(|(host, tier)| (host.clone(), tier.to_string())) 53 74 .collect() 54 75 } 55 76 77 + /// returns the assigned tier for `host`, or "default" if none is assigned. 78 + pub fn get_tier(&self, host: impl AsRef<str>) -> String { 79 + let snapshot = self.0.pds_meta.load(); 80 + snapshot 81 + .tiers 82 + .get(host.as_ref()) 83 + .map(|t| t.to_string()) 84 + .unwrap_or_else(|| "default".to_string()) 85 + } 86 + 87 + /// returns true if `host` is currently banned. 88 + pub fn is_banned(&self, host: impl AsRef<str>) -> bool { 89 + self.0.pds_meta.load().is_banned(host.as_ref()) 90 + } 91 + 92 + /// list all currently banned PDS hosts. 93 + pub async fn list_banned(&self) -> Vec<String> { 94 + let snapshot = self.0.pds_meta.load(); 95 + snapshot.banned.iter().cloned().collect() 96 + } 97 + 56 98 /// list all configured rate tier definitions. 57 99 pub fn list_rate_tiers(&self) -> HashMap<String, PdsTierDefinition> { 58 100 self.0 ··· 64 106 65 107 /// assign `host` to `tier`, persisting the change to the database. 66 108 /// returns an error if `tier` is not a known tier name. 67 - pub async fn set_tier(&self, host: String, tier: String) -> Result<()> { 109 + pub async fn set_tier(&self, host: impl AsRef<str>, tier: String) -> Result<()> { 68 110 if !self.0.rate_tiers.contains_key(&tier) { 69 111 miette::bail!( 70 112 "unknown tier '{tier}'; known tiers: {:?}", ··· 72 114 ); 73 115 } 74 116 75 - let state = self.0.clone(); 117 + let host = host.as_ref().to_string(); 76 118 let host_clone = host.clone(); 77 119 let tier_clone = tier.clone(); 78 - tokio::task::spawn_blocking(move || { 79 - let mut batch = state.db.inner.batch(); 80 - db_pds::set(&mut batch, &state.db.filter, &host_clone, &tier_clone); 81 - batch.commit().into_diagnostic()?; 82 - state.db.persist() 83 - }) 120 + self.update( 121 + move |batch, ks| db_pds::set_tier(batch, ks, &host_clone, &tier_clone), 122 + move |meta| { 123 + meta.tiers.insert(host, SmolStr::new(&tier)); 124 + }, 125 + ) 84 126 .await 85 - .into_diagnostic()??; 86 - 87 - let mut snapshot = (**self.0.pds_tiers.load()).clone(); 88 - snapshot.insert(host, SmolStr::new(&tier)); 89 - self.0.pds_tiers.store(Arc::new(snapshot)); 90 - 91 - Ok(()) 92 127 } 93 128 94 129 /// remove any explicit tier assignment for `host`, reverting it to the default tier. 95 - pub async fn remove_tier(&self, host: String) -> Result<()> { 96 - let state = self.0.clone(); 130 + pub async fn remove_tier(&self, host: impl AsRef<str>) -> Result<()> { 131 + let host = host.as_ref().to_string(); 97 132 let host_clone = host.clone(); 98 - tokio::task::spawn_blocking(move || { 99 - let mut batch = state.db.inner.batch(); 100 - db_pds::remove(&mut batch, &state.db.filter, &host_clone); 101 - batch.commit().into_diagnostic()?; 102 - state.db.persist() 103 - }) 133 + self.update( 134 + move |batch, ks| db_pds::remove_tier(batch, ks, &host_clone), 135 + move |meta| { 136 + meta.tiers.remove(&host); 137 + }, 138 + ) 104 139 .await 105 - .into_diagnostic()??; 140 + } 106 141 107 - let mut snapshot = (**self.0.pds_tiers.load()).clone(); 108 - snapshot.remove(&host); 109 - self.0.pds_tiers.store(Arc::new(snapshot)); 142 + /// ban `host`, persisting the change to the database. 143 + pub async fn ban(&self, host: impl AsRef<str>) -> Result<()> { 144 + let host = host.as_ref().to_string(); 145 + let host_clone = host.clone(); 146 + self.update( 147 + move |batch, ks| db_pds::set_banned(batch, ks, &host_clone), 148 + move |meta| { 149 + meta.banned.insert(host); 150 + }, 151 + ) 152 + .await 153 + } 110 154 111 - Ok(()) 155 + /// unban `host`, removing it from the database. 156 + pub async fn unban(&self, host: impl AsRef<str>) -> Result<()> { 157 + let host = host.as_ref().to_string(); 158 + let host_clone = host.clone(); 159 + self.update( 160 + move |batch, ks| db_pds::remove_banned(batch, ks, &host_clone), 161 + move |meta| { 162 + meta.banned.remove(&host); 163 + }, 164 + ) 165 + .await 112 166 } 113 167 }
+2 -2
src/db/mod.rs
··· 27 27 pub mod filter; 28 28 pub mod keys; 29 29 pub mod migration; 30 - pub mod pds_tiers; 30 + pub mod pds_meta; 31 31 pub mod types; 32 32 33 33 use tokio::sync::broadcast; ··· 389 389 opts() 390 390 // only iterators are used here 391 391 .expect_point_read_hits(true) 392 - .max_memtable_size(mb(16)) 392 + .max_memtable_size(mb(8)) 393 393 // did -> failed state, not very compressable 394 394 .data_block_size_policy(BlockSizePolicy::all(kb(2))) 395 395 .data_block_compression_policy(CompressionPolicy::disabled())
+62
src/db/pds_meta.rs
··· 1 + use fjall::{Keyspace, OwnedWriteBatch}; 2 + use miette::{IntoDiagnostic, Result}; 3 + use smol_str::SmolStr; 4 + 5 + pub const PDS_TIER_PREFIX: &[u8] = b"pt|"; 6 + 7 + // `pt|{host}` -> tier name 8 + pub fn pds_tier_key(host: &str) -> Vec<u8> { 9 + let mut key = Vec::with_capacity(PDS_TIER_PREFIX.len() + host.len()); 10 + key.extend_from_slice(PDS_TIER_PREFIX); 11 + key.extend_from_slice(host.as_bytes()); 12 + key 13 + } 14 + 15 + /// load all PDS tier assignments from the filter keyspace 16 + pub fn load_tiers(ks: &Keyspace) -> Result<Vec<(SmolStr, SmolStr)>> { 17 + let mut out = Vec::new(); 18 + for guard in ks.prefix(PDS_TIER_PREFIX) { 19 + let (k, v) = guard.into_inner().into_diagnostic()?; 20 + let host = std::str::from_utf8(&k[PDS_TIER_PREFIX.len()..]).into_diagnostic()?; 21 + let tier = std::str::from_utf8(&v).into_diagnostic()?; 22 + out.push((SmolStr::new(host), SmolStr::new(tier))); 23 + } 24 + Ok(out) 25 + } 26 + 27 + pub fn set_tier(batch: &mut OwnedWriteBatch, ks: &Keyspace, host: &str, tier: &str) { 28 + batch.insert(ks, pds_tier_key(host), tier.as_bytes()); 29 + } 30 + 31 + pub fn remove_tier(batch: &mut OwnedWriteBatch, ks: &Keyspace, host: &str) { 32 + batch.remove(ks, pds_tier_key(host)); 33 + } 34 + 35 + pub const PDS_BANNED_PREFIX: &[u8] = b"pb|"; 36 + 37 + // `pb|{host}` -> empty value 38 + pub fn pds_banned_key(host: &str) -> Vec<u8> { 39 + let mut key = Vec::with_capacity(PDS_BANNED_PREFIX.len() + host.len()); 40 + key.extend_from_slice(PDS_BANNED_PREFIX); 41 + key.extend_from_slice(host.as_bytes()); 42 + key 43 + } 44 + 45 + /// load all banned PDS hosts from the filter keyspace 46 + pub fn load_banned(ks: &Keyspace) -> Result<Vec<SmolStr>> { 47 + let mut out = Vec::new(); 48 + for guard in ks.prefix(PDS_BANNED_PREFIX) { 49 + let (k, _) = guard.into_inner().into_diagnostic()?; 50 + let host = std::str::from_utf8(&k[PDS_BANNED_PREFIX.len()..]).into_diagnostic()?; 51 + out.push(SmolStr::new(host)); 52 + } 53 + Ok(out) 54 + } 55 + 56 + pub fn set_banned(batch: &mut OwnedWriteBatch, ks: &Keyspace, host: &str) { 57 + batch.insert(ks, pds_banned_key(host), &[]); 58 + } 59 + 60 + pub fn remove_banned(batch: &mut OwnedWriteBatch, ks: &Keyspace, host: &str) { 61 + batch.remove(ks, pds_banned_key(host)); 62 + }
-33
src/db/pds_tiers.rs
··· 1 - use fjall::{Keyspace, OwnedWriteBatch}; 2 - use miette::{IntoDiagnostic, Result}; 3 - use smol_str::SmolStr; 4 - 5 - pub const PDS_TIER_PREFIX: &[u8] = b"pt|"; 6 - 7 - // `pt|{host}` -> tier name 8 - pub fn pds_tier_key(host: &str) -> Vec<u8> { 9 - let mut key = Vec::with_capacity(PDS_TIER_PREFIX.len() + host.len()); 10 - key.extend_from_slice(PDS_TIER_PREFIX); 11 - key.extend_from_slice(host.as_bytes()); 12 - key 13 - } 14 - 15 - /// load all PDS tier assignments from the filter keyspace 16 - pub fn load(ks: &Keyspace) -> Result<Vec<(SmolStr, SmolStr)>> { 17 - let mut out = Vec::new(); 18 - for guard in ks.prefix(PDS_TIER_PREFIX) { 19 - let (k, v) = guard.into_inner().into_diagnostic()?; 20 - let host = std::str::from_utf8(&k[PDS_TIER_PREFIX.len()..]).into_diagnostic()?; 21 - let tier = std::str::from_utf8(&v).into_diagnostic()?; 22 - out.push((SmolStr::new(host), SmolStr::new(tier))); 23 - } 24 - Ok(out) 25 - } 26 - 27 - pub fn set(batch: &mut OwnedWriteBatch, ks: &Keyspace, host: &str, tier: &str) { 28 - batch.insert(ks, pds_tier_key(host), tier.as_bytes()); 29 - } 30 - 31 - pub fn remove(batch: &mut OwnedWriteBatch, ks: &Keyspace, host: &str) { 32 - batch.remove(ks, pds_tier_key(host)); 33 - }
+13 -2
src/ingest/firehose.rs
··· 105 105 // this is not for connection throttling (thats handled by ThrottleHandle) 106 106 // its for stream errors (cbor decode etc) 107 107 let mut backoff = Duration::from_secs(0); 108 - const MAX_BACKOFF: Duration = Duration::from_secs(60 * 60); // 1 ohur 108 + const MAX_BACKOFF: Duration = Duration::from_secs(60 * 60); // 1 hour 109 109 110 110 loop { 111 + if self.state.pds_meta.load().is_banned(host) { 112 + break Ok(()); 113 + } 114 + 111 115 self.enabled.wait_enabled("firehose").await; 112 116 113 117 tokio::time::sleep(backoff).await; ··· 169 173 match decode_frame(&bytes) { 170 174 Ok(msg) => { 171 175 if self.is_pds { 176 + let tier = { 177 + let meta = self.state.pds_meta.load(); 178 + let banned = meta.is_banned(host); 179 + if banned { 180 + break Ok(()); 181 + } 182 + meta.tier_for(host, &self.state.rate_tiers) 183 + }; 172 184 let accounts = self.state.db.get_count(&count_key).await; 173 - let tier = self.state.pds_tier_for(&host); 174 185 tokio::select! { 175 186 _ = self.throttle.wait_for_allow(accounts, &tier) => {} 176 187 _ = self.enabled.changed() => {
+5 -1
src/lib.rs
··· 1 1 pub mod config; 2 + /// hydrant main api, includes the Hydrant type for programmatic control. 2 3 pub mod control; 3 - pub mod filter; 4 + pub(crate) mod filter; 5 + pub(crate) mod pds_meta; 4 6 pub mod types; 5 7 6 8 #[cfg(all(feature = "relay", feature = "indexer"))] ··· 23 25 pub(crate) mod resolver; 24 26 pub(crate) mod state; 25 27 pub(crate) mod util; 28 + 29 + pub use filter::FilterMode;
+34
src/pds_meta.rs
··· 1 + use crate::config::RateTier; 2 + use arc_swap::ArcSwap; 3 + use smol_str::SmolStr; 4 + use std::collections::{HashMap, HashSet}; 5 + use std::sync::Arc; 6 + 7 + #[derive(Default, Clone)] 8 + pub(crate) struct PdsMeta { 9 + pub tiers: HashMap<String, SmolStr>, 10 + pub banned: HashSet<String>, 11 + } 12 + 13 + impl PdsMeta { 14 + pub fn tier_for(&self, host: &str, rate_tiers: &HashMap<String, RateTier>) -> RateTier { 15 + let default = rate_tiers 16 + .get("default") 17 + .copied() 18 + .unwrap_or_else(RateTier::default_tier); 19 + self.tiers 20 + .get(host) 21 + .and_then(|name| rate_tiers.get(name.as_str()).copied()) 22 + .unwrap_or(default) 23 + } 24 + 25 + pub fn is_banned(&self, host: &str) -> bool { 26 + self.banned.contains(host) 27 + } 28 + } 29 + 30 + pub(crate) type PdsMetaHandle = Arc<ArcSwap<PdsMeta>>; 31 + 32 + pub(crate) fn new_handle(meta: PdsMeta) -> PdsMetaHandle { 33 + Arc::new(ArcSwap::new(Arc::new(meta))) 34 + }
+17 -28
src/state.rs
··· 1 - use std::collections::HashMap; 2 - use std::sync::Arc; 1 + use std::collections::{HashMap, HashSet}; 2 + use std::future::Future; 3 3 use std::sync::atomic::AtomicI64; 4 4 use std::time::Duration; 5 5 6 - use arc_swap::ArcSwap; 7 6 use miette::Result; 8 7 use smol_str::SmolStr; 9 8 #[cfg(feature = "indexer")] ··· 14 13 use crate::{ 15 14 config::{Config, RateTier}, 16 15 db::Db, 17 - filter::{FilterHandle, new_handle}, 16 + filter::{FilterHandle, new_handle as new_filter_handle}, 17 + pds_meta::{PdsMeta, PdsMetaHandle, new_handle as new_pds_handle}, 18 18 resolver::Resolver, 19 19 util::throttle::Throttler, 20 20 }; 21 21 22 - /// pds hostname -> tier name. updated atomically via ArcSwap. 23 - pub(crate) type PdsTierHandle = Arc<ArcSwap<HashMap<String, SmolStr>>>; 24 - 25 22 pub struct AppState { 26 23 pub db: Db, 27 24 pub resolver: Resolver, 28 25 pub(crate) filter: FilterHandle, 29 - pub(crate) pds_tiers: PdsTierHandle, 26 + pub(crate) pds_meta: PdsMetaHandle, 30 27 pub(crate) rate_tiers: HashMap<String, RateTier>, 31 28 pub firehose_cursors: scc::HashIndex<Url, AtomicI64>, 32 29 #[cfg(feature = "indexer")] ··· 56 53 } 57 54 }; 58 55 59 - let filter = new_handle(filter_config); 56 + let filter = new_filter_handle(filter_config); 60 57 61 58 // load persisted per-PDS tier assignments from the filter keyspace. 62 59 // trusted_hosts from config are merged in as defaults (not persisted here; they seed 63 60 // only if the host has no existing assignment in the DB). 64 - let mut tier_map: HashMap<String, SmolStr> = crate::db::pds_tiers::load(&db.filter) 61 + let mut tiers: HashMap<String, SmolStr> = crate::db::pds_meta::load_tiers(&db.filter) 65 62 .unwrap_or_default() 66 63 .into_iter() 67 64 .map(|(host, tier)| (host.to_string(), tier)) 68 65 .collect(); 69 66 for host in &config.trusted_hosts { 70 - tier_map 67 + tiers 71 68 .entry(host.clone()) 72 69 .or_insert_with(|| SmolStr::new("trusted")); 73 70 } 74 - let pds_tiers = Arc::new(ArcSwap::new(Arc::new(tier_map))); 71 + 72 + let banned: HashSet<String> = crate::db::pds_meta::load_banned(&db.filter) 73 + .unwrap_or_default() 74 + .into_iter() 75 + .map(|host| host.to_string()) 76 + .collect(); 77 + 78 + let pds_meta = new_pds_handle(PdsMeta { tiers, banned }); 75 79 76 80 let relay_cursors = scc::HashIndex::new(); 77 81 ··· 85 89 db, 86 90 resolver, 87 91 filter, 88 - pds_tiers, 92 + pds_meta, 89 93 rate_tiers: config.rate_tiers.clone(), 90 94 firehose_cursors: relay_cursors, 91 95 #[cfg(feature = "indexer")] ··· 103 107 #[cfg(feature = "indexer")] 104 108 pub fn notify_backfill(&self) { 105 109 self.backfill_notify.notify_one(); 106 - } 107 - 108 - /// returns the rate tier for the given PDS hostname. 109 - /// falls back to the "default" tier if no assignment exists or the assigned tier is unknown. 110 - pub fn pds_tier_for(&self, host: &str) -> RateTier { 111 - let default = self 112 - .rate_tiers 113 - .get("default") 114 - .copied() 115 - .unwrap_or_else(RateTier::default_tier); 116 - let snapshot = self.pds_tiers.load(); 117 - snapshot 118 - .get(host) 119 - .and_then(|name| self.rate_tiers.get(name.as_str()).copied()) 120 - .unwrap_or(default) 121 110 } 122 111 123 112 /// pauses the crawler, firehose, and backfill worker, runs `f`, then restores their prior state.
+128 -39
tests/api.nu
··· 256 256 # initial state: no assignments, built-in rate tiers present 257 257 print " GET /pds/tiers (expect empty assignments, built-in rate_tiers)..." 258 258 let initial = (http get $"($url)/pds/tiers") 259 - if ($initial.assignments | length) != 0 { 260 - fail $"expected empty assignments, got ($initial.assignments | length)" $pid 259 + if ($initial.assignments | columns | length) != 0 { 260 + fail $"expected empty assignments, got ($initial.assignments | columns | length)" $pid 261 261 } 262 262 if not ("default" in $initial.rate_tiers) { 263 263 fail "expected 'default' tier in rate_tiers" $pid ··· 291 291 tier: "trusted" 292 292 } | assert-status 200 "PUT /pds/tiers" $pid 293 293 let after_assign = (http get $"($url)/pds/tiers") 294 - if ($after_assign.assignments | length) != 1 { 295 - fail $"expected 1 assignment, got ($after_assign.assignments | length)" $pid 294 + if ($after_assign.assignments | columns | length) != 1 { 295 + fail $"expected 1 assignment, got ($after_assign.assignments | columns | length)" $pid 296 296 } 297 - let a = ($after_assign.assignments | first) 298 - if $a.host != "pds.example.com" { 299 - fail $"expected host=pds.example.com, got ($a.host)" $pid 297 + if not ("pds.example.com" in $after_assign.assignments) { 298 + fail $"expected host=pds.example.com to be assigned" $pid 300 299 } 301 - if $a.tier != "trusted" { 302 - fail $"expected tier=trusted, got ($a.tier)" $pid 300 + if ($after_assign.assignments | get "pds.example.com") != "trusted" { 301 + fail $"expected tier=trusted" $pid 303 302 } 304 - print $" ok: assignment created host=($a.host), tier=($a.tier)" 303 + print $" ok: assignment created host=pds.example.com, tier=trusted" 305 304 306 305 # re-assigning the same host to a different tier updates without creating a duplicate 307 306 print " PUT /pds/tiers (re-assign to default)..." ··· 310 309 tier: "default" 311 310 } | assert-status 200 "PUT /pds/tiers re-assign" $pid 312 311 let after_reassign = (http get $"($url)/pds/tiers") 313 - if ($after_reassign.assignments | length) != 1 { 314 - fail $"expected 1 assignment after re-assign, got ($after_reassign.assignments | length)" $pid 312 + if ($after_reassign.assignments | columns | length) != 1 { 313 + fail $"expected 1 assignment after re-assign, got ($after_reassign.assignments | columns | length)" $pid 315 314 } 316 - if ($after_reassign.assignments | first).tier != "default" { 317 - fail $"expected tier=default after re-assign, got (($after_reassign.assignments | first).tier)" $pid 315 + if ($after_reassign.assignments | get "pds.example.com") != "default" { 316 + fail $"expected tier=default after re-assign" $pid 318 317 } 319 318 print " ok: re-assign updates tier without creating a duplicate" 320 319 ··· 325 324 tier: "nonexistent" 326 325 } | assert-status 400 "PUT /pds/tiers unknown tier" $pid 327 326 let after_bad = (http get $"($url)/pds/tiers") 328 - if ($after_bad.assignments | length) != 1 { 327 + if ($after_bad.assignments | columns | length) != 1 { 329 328 fail "expected assignment count unchanged after rejected request" $pid 330 329 } 331 - if ($after_bad.assignments | first).tier != "default" { 330 + if ($after_bad.assignments | get "pds.example.com") != "default" { 332 331 fail "expected tier unchanged after rejected request" $pid 333 332 } 334 333 print " ok: unknown tier name rejected with 400, existing assignment unchanged" ··· 340 339 tier: "trusted" 341 340 } | assert-status 200 "PUT /pds/tiers second host" $pid 342 341 let after_second = (http get $"($url)/pds/tiers") 343 - if ($after_second.assignments | length) != 2 { 344 - fail $"expected 2 assignments, got ($after_second.assignments | length)" $pid 342 + if ($after_second.assignments | columns | length) != 2 { 343 + fail $"expected 2 assignments, got ($after_second.assignments | columns | length)" $pid 345 344 } 346 345 print " ok: two distinct hosts listed independently" 347 346 ··· 351 350 host: "pds.example.com" 352 351 } | assert-status 200 "DELETE /pds/tiers" $pid 353 352 let after_del = (http get $"($url)/pds/tiers") 354 - if ($after_del.assignments | length) != 1 { 355 - fail $"expected 1 assignment after delete, got ($after_del.assignments | length)" $pid 353 + if ($after_del.assignments | columns | length) != 1 { 354 + fail $"expected 1 assignment after delete, got ($after_del.assignments | columns | length)" $pid 356 355 } 357 - if ($after_del.assignments | first).host != "other.example.com" { 356 + if not ("other.example.com" in $after_del.assignments) { 358 357 fail "expected only other.example.com to remain after delete" $pid 359 358 } 360 359 print " ok: correct host removed, other assignment intact" ··· 370 369 host: "pds.example.com" 371 370 } | assert-status 200 "DELETE /pds/tiers non-existent" $pid 372 371 let after_idempotent = (http get $"($url)/pds/tiers") 373 - if ($after_idempotent.assignments | length) != 0 { 372 + if ($after_idempotent.assignments | columns | length) != 0 { 374 373 fail "expected empty assignments after cleanup" $pid 375 374 } 376 375 print " ok: delete of non-existent host is idempotent" ··· 398 397 } 399 398 400 399 let before = (http get $"($url)/pds/tiers") 401 - if ($before.assignments | length) != 1 { 400 + if ($before.assignments | columns | length) != 1 { 402 401 fail "assignment was not created" $instance.pid 403 402 } 404 403 ··· 415 414 416 415 print " checking assignment survived restart..." 417 416 let after = (http get $"($url)/pds/tiers") 418 - if ($after.assignments | length) != 1 { 419 - fail $"expected 1 assignment after restart, got ($after.assignments | length)" $instance2.pid 417 + if ($after.assignments | columns | length) != 1 { 418 + fail $"expected 1 assignment after restart, got ($after.assignments | columns | length)" $instance2.pid 420 419 } 421 - let a = ($after.assignments | first) 422 - if $a.host != "persist.example.com" { 423 - fail $"expected host=persist.example.com after restart, got ($a.host)" $instance2.pid 420 + if not ("persist.example.com" in $after.assignments) { 421 + fail $"expected host=persist.example.com after restart" $instance2.pid 424 422 } 425 - if $a.tier != "trusted" { 426 - fail $"expected tier=trusted after restart, got ($a.tier)" $instance2.pid 423 + if ($after.assignments | get "persist.example.com") != "trusted" { 424 + fail $"expected tier=trusted after restart" $instance2.pid 427 425 } 428 426 print " ok: tier assignment persisted across restart" 429 427 ··· 455 453 let assignments = $tiers.assignments 456 454 457 455 for host in [$host_a, $host_b] { 458 - let match = ($assignments | where host == $host) 459 - if ($match | length) != 1 { 460 - fail $"expected assignment for ($host) from HYDRANT_TRUSTED_HOSTS, got ($assignments)" $instance.pid 456 + if not ($host in $assignments) { 457 + fail $"expected assignment for ($host) from HYDRANT_TRUSTED_HOSTS" $instance.pid 461 458 } 462 - if ($match | first).tier != "trusted" { 463 - fail $"expected tier=trusted for ($host), got (($match | first).tier)" $instance.pid 459 + if ($assignments | get $host) != "trusted" { 460 + fail $"expected tier=trusted for ($host)" $instance.pid 464 461 } 465 462 } 466 463 print $" ok: ($host_a) and ($host_b) pre-assigned to trusted tier" ··· 512 509 tier: "custom" 513 510 } | assert-status 200 "PUT /pds/tiers custom tier" $instance.pid 514 511 let after = (http get $"($url)/pds/tiers") 515 - let match = ($after.assignments | where host == "custom.example.com") 516 - if ($match | length) != 1 { 512 + if not ("custom.example.com" in $after.assignments) { 517 513 fail "expected assignment for custom.example.com" $instance.pid 518 514 } 519 - if ($match | first).tier != "custom" { 520 - fail $"expected tier=custom, got (($match | first).tier)" $instance.pid 515 + if ($after.assignments | get "custom.example.com") != "custom" { 516 + fail $"expected tier=custom" $instance.pid 521 517 } 522 518 print " ok: host assigned to custom tier successfully" 523 519 ··· 525 521 print "custom rate tier test passed!" 526 522 } 527 523 524 + def test-pds-banned [url: string, pid: int] { 525 + print "=== test: pds ban management ===" 526 + 527 + print " GET /pds/banned (expect empty)..." 528 + let initial = (http get $"($url)/pds/banned") 529 + if ($initial | length) != 0 { 530 + fail $"expected empty banned list, got ($initial | length)" $pid 531 + } 532 + print " ok: starts empty" 533 + 534 + print " PUT /pds/banned (ban host)..." 535 + http put -f -e -t application/json $"($url)/pds/banned" { 536 + host: "bad.example.com" 537 + } | assert-status 200 "PUT /pds/banned" $pid 538 + 539 + let after_ban = (http get $"($url)/pds/banned") 540 + if ($after_ban | length) != 1 { 541 + fail $"expected 1 banned host, got ($after_ban | length)" $pid 542 + } 543 + if ($after_ban | first) != "bad.example.com" { 544 + fail $"expected bad.example.com, got ($after_ban | first)" $pid 545 + } 546 + print " ok: host banned" 547 + 548 + print " DELETE /pds/banned (unban host)..." 549 + http delete -f -e -t application/json $"($url)/pds/banned" --data { 550 + host: "bad.example.com" 551 + } | assert-status 200 "DELETE /pds/banned" $pid 552 + 553 + let after_unban = (http get $"($url)/pds/banned") 554 + if ($after_unban | length) != 0 { 555 + fail "expected empty banned list after unban" $pid 556 + } 557 + print " ok: host unbanned" 558 + 559 + print "pds ban management tests passed!" 560 + } 561 + 562 + # verify that banned hosts are written to the database and survive a restart. 563 + def test-pds-banned-persistence [binary: string, db_path: string, port: int] { 564 + print "=== test: pds banned assignments persist across restart ===" 565 + 566 + let url = $"http://localhost:($port)" 567 + 568 + let instance = (with-env { HYDRANT_CRAWLER_URLS: "", HYDRANT_RELAY_HOSTS: "" } { 569 + start-hydrant $binary $db_path $port 570 + }) 571 + if not (wait-for-api $url) { 572 + fail "hydrant did not start" 573 + } 574 + 575 + print " banning host..." 576 + http put -t application/json $"($url)/pds/banned" { 577 + host: "persist-ban.example.com" 578 + } 579 + 580 + let before = (http get $"($url)/pds/banned") 581 + if ($before | length) != 1 { 582 + fail "host was not banned" $instance.pid 583 + } 584 + 585 + print " restarting hydrant..." 586 + kill $instance.pid 587 + sleep 2sec 588 + 589 + let instance2 = (with-env { HYDRANT_CRAWLER_URLS: "", HYDRANT_RELAY_HOSTS: "" } { 590 + start-hydrant $binary $db_path $port 591 + }) 592 + if not (wait-for-api $url) { 593 + fail "hydrant did not restart" $instance2.pid 594 + } 595 + 596 + print " checking ban survived restart..." 597 + let after = (http get $"($url)/pds/banned") 598 + if ($after | length) != 1 { 599 + fail $"expected 1 banned host after restart, got ($after | length)" $instance2.pid 600 + } 601 + if ($after | first) != "persist-ban.example.com" { 602 + fail $"expected persist-ban.example.com after restart, got ($after | first)" $instance2.pid 603 + } 604 + print " ok: banned host persisted across restart" 605 + 606 + kill $instance2.pid 607 + print "pds ban persistence test passed!" 608 + } 609 + 528 610 def main [] { 529 611 let port = resolve-test-port 3007 530 612 let url = $"http://localhost:($port)" ··· 544 626 test-crawler-sources $url $instance.pid 545 627 test-firehose-sources $url $instance.pid 546 628 test-pds-tiers $url $instance.pid 629 + test-pds-banned $url $instance.pid 547 630 548 631 kill $instance.pid 549 632 sleep 2sec ··· 575 658 let db_pds_custom = (mktemp -d -t hydrant_api.XXXXXX) 576 659 print $"db: ($db_pds_custom)" 577 660 test-pds-custom-rate-tier $binary $db_pds_custom $port 661 + 662 + sleep 1sec 663 + 664 + let db_pds_banned = (mktemp -d -t hydrant_api.XXXXXX) 665 + print $"db: ($db_pds_banned)" 666 + test-pds-banned-persistence $binary $db_pds_banned $port 578 667 579 668 print "" 580 669 print "all api tests passed!"