···143143<!-- gitnexus:start -->
144144# GitNexus — Code Intelligence
145145146146-This project is indexed by GitNexus as **hydrant** (655 symbols, 1810 relationships, 55 execution flows). Use the GitNexus MCP tools to understand code, assess impact, and navigate safely.
146146+This project is indexed by GitNexus as **hydrant** (1339 symbols, 3645 relationships, 113 execution flows). Use the GitNexus MCP tools to understand code, assess impact, and navigate safely.
147147148148> If any GitNexus tool warns the index is stale, run `npx gitnexus analyze` in terminal first.
149149
···11use miette::Result;
22use serde::{Deserialize, Serialize};
33use smol_str::ToSmolStr;
44+use std::collections::HashMap;
45use std::fmt;
56use std::path::PathBuf;
67use std::str::FromStr;
78use std::time::Duration;
89use url::Url;
9101111+/// rate limit parameters for a named tier of PDS connections.
1212+///
1313+/// the per-second limit is `max(per_second_base, accounts * per_second_account_mul)`,
1414+/// giving a floor at `per_second_base` that scales up with the PDS's active account count.
1515+#[derive(Debug, Clone, Copy)]
1616+pub struct RateTier {
1717+ /// floor for the per-second limit, regardless of account count.
1818+ pub per_second_base: u64,
1919+ /// per-second events allowed per active account on this PDS.
2020+ pub per_second_account_mul: f64,
2121+ /// per-hour limit.
2222+ pub per_hour: u64,
2323+ /// per-day limit.
2424+ pub per_day: u64,
2525+}
2626+2727+impl RateTier {
2828+ /// built-in "trusted" tier: high limits for well-behaved PDS operators.
2929+ pub fn trusted() -> Self {
3030+ Self {
3131+ per_second_base: 5000,
3232+ per_second_account_mul: 10.0,
3333+ per_hour: 5000 * 3600,
3434+ per_day: 5000 * 86400,
3535+ }
3636+ }
3737+3838+ /// built-in "default" tier: conservative limits for unknown PDS operators.
3939+ pub fn default_tier() -> Self {
4040+ Self {
4141+ per_second_base: 50,
4242+ per_second_account_mul: 0.5,
4343+ per_hour: 1000 * 3600,
4444+ per_day: 1000 * 86400,
4545+ }
4646+ }
4747+4848+ /// parse `base/mul/hourly/daily` format used by `HYDRANT_RATE_TIERS`.
4949+ fn parse(s: &str) -> Option<Self> {
5050+ let parts: Vec<&str> = s.split('/').collect();
5151+ if parts.len() != 4 {
5252+ return None;
5353+ }
5454+ Some(Self {
5555+ per_second_base: parts[0].parse().ok()?,
5656+ per_second_account_mul: parts[1].parse().ok()?,
5757+ per_hour: parts[2].parse().ok()?,
5858+ per_day: parts[3].parse().ok()?,
5959+ })
6060+ }
6161+}
6262+1063/// this is for internal use only, please don't use this macro.
1164#[doc(hidden)]
1265#[macro_export]
···309362 /// set via `HYDRANT_ENABLE_BACKLINKS=true`.
310363 pub enable_backlinks: bool,
311364365365+ /// list of trusted PDS/relay hosts to pre-assign to the "trusted" rate tier at startup.
366366+ /// set via `HYDRANT_TRUSTED_HOSTS` as a comma-separated list of hostnames.
367367+ /// hosts not present in this list use the "default" tier unless assigned via the API.
368368+ pub trusted_hosts: Vec<String>,
369369+ /// named rate tier definitions for PDS rate limiting.
370370+ ///
371371+ /// built-in tiers ("default" and "trusted") are always present and may be overridden.
372372+ /// set via `HYDRANT_RATE_TIERS` as a comma-separated list of `name:base/mul/hourly/daily` entries,
373373+ /// e.g. `trusted:5000/10.0/18000000/432000000,custom:100/1.0/7200000/172800000`.
374374+ pub rate_tiers: HashMap<String, RateTier>,
375375+312376 /// db internals, tune only if you know what you're doing.
313377 ///
314378 /// size of the fjall block cache in MB. set via `HYDRANT_CACHE_SIZE`.
···388452 filter_collections: None,
389453 filter_excludes: None,
390454 enable_backlinks: false,
455455+ trusted_hosts: vec![],
456456+ rate_tiers: {
457457+ let mut m = HashMap::new();
458458+ m.insert("default".to_string(), RateTier::default_tier());
459459+ m.insert("trusted".to_string(), RateTier::trusted());
460460+ m
461461+ },
391462 cache_size: 256,
392463 data_compression: Compression::Lz4,
393464 journal_compression: Compression::Lz4,
···549620550621 let enable_backlinks: bool = cfg!("ENABLE_BACKLINKS", defaults.enable_backlinks);
551622623623+ // start with built-in tiers, then layer in any env-defined overrides.
624624+ // format: HYDRANT_RATE_TIERS=name:base/mul/hourly/daily,...
625625+ let mut rate_tiers = defaults.rate_tiers.clone();
626626+ if let Ok(s) = std::env::var("HYDRANT_RATE_TIERS") {
627627+ for entry in s.split(',') {
628628+ let entry = entry.trim();
629629+ if let Some((name, spec)) = entry.split_once(':') {
630630+ match RateTier::parse(spec) {
631631+ Some(tier) => {
632632+ rate_tiers.insert(name.trim().to_string(), tier);
633633+ }
634634+ None => tracing::warn!(
635635+ "ignoring invalid rate tier '{name}': expected base/mul/hourly/daily format"
636636+ ),
637637+ }
638638+ }
639639+ }
640640+ }
641641+642642+ let trusted_hosts = std::env::var("HYDRANT_TRUSTED_HOSTS")
643643+ .ok()
644644+ .map(|s| {
645645+ s.split(',')
646646+ .map(|s| s.trim().to_string())
647647+ .filter(|s| !s.is_empty())
648648+ .collect()
649649+ })
650650+ .unwrap_or_else(|| defaults.trusted_hosts.clone());
651651+552652 let default_mode = CrawlerMode::default_for(full_network);
553653 let crawler_sources = match std::env::var("HYDRANT_CRAWLER_URLS") {
554654 Ok(s) => s
···593693 filter_collections,
594694 filter_excludes,
595695 enable_backlinks,
696696+ trusted_hosts,
697697+ rate_tiers,
596698 cache_size,
597699 data_compression,
598700 journal_compression,
+21-14
src/control/crawler.rs
···149149150150 /// delete all cursor entries associated with the given URL.
151151 pub async fn reset_cursor(&self, url: &str) -> Result<()> {
152152- let db = self.state.db.clone();
152152+ let state = self.state.clone();
153153 let point_keys = [keys::crawler_cursor_key(url)];
154154 let by_collection_prefix = keys::by_collection_cursor_prefix(url);
155155 tokio::task::spawn_blocking(move || {
156156- let mut batch = db.inner.batch();
156156+ let mut batch = state.db.inner.batch();
157157 for k in point_keys {
158158- batch.remove(&db.cursors, k);
158158+ batch.remove(&state.db.cursors, k);
159159 }
160160- for entry in db.cursors.prefix(&by_collection_prefix) {
160160+ for entry in state.db.cursors.prefix(&by_collection_prefix) {
161161 let k = entry.key().into_diagnostic()?;
162162- batch.remove(&db.cursors, k);
162162+ batch.remove(&state.db.cursors, k);
163163 }
164164- batch.commit().into_diagnostic()
164164+ batch.commit().into_diagnostic()?;
165165+ state.db.persist()
165166 })
166167 .await
167168 .into_diagnostic()??;
···198199 miette::bail!("crawler not yet started: call Hydrant::run() first");
199200 };
200201201201- let db = self.state.db.clone();
202202+ let state = self.state.clone();
202203 let key = keys::crawler_source_key(source.url.as_str());
203204 let val = rmp_serde::to_vec(&source.mode).into_diagnostic()?;
204204- tokio::task::spawn_blocking(move || db.crawler.insert(key, val).into_diagnostic())
205205- .await
206206- .into_diagnostic()??;
205205+ tokio::task::spawn_blocking(move || {
206206+ state.db.crawler.insert(key, val).into_diagnostic()?;
207207+ state.db.persist()
208208+ })
209209+ .await
210210+ .into_diagnostic()??;
207211208212 let enabled_rx = self.state.crawler_enabled.subscribe();
209213 let handle = spawn_crawler_producer(
···249253250254 // remove from DB if it was a persisted source
251255 if self.persisted.remove_async(url).await.is_some() {
252252- let db = self.state.db.clone();
256256+ let state = self.state.clone();
253257 let key = keys::crawler_source_key(url.as_str());
254254- tokio::task::spawn_blocking(move || db.crawler.remove(key).into_diagnostic())
255255- .await
256256- .into_diagnostic()??;
258258+ tokio::task::spawn_blocking(move || {
259259+ state.db.crawler.remove(key).into_diagnostic()?;
260260+ state.db.persist()
261261+ })
262262+ .await
263263+ .into_diagnostic()??;
257264 }
258265259266 Ok(true)
+4-1
src/control/filter.rs
···44use miette::{IntoDiagnostic, Result};
5566use crate::db::filter as db_filter;
77-use crate::filter::{FilterMode, SetUpdate};
77+use crate::filter::FilterMode;
88+use crate::patch::SetUpdate;
89use crate::state::AppState;
9101011/// a point-in-time snapshot of the filter configuration. returned by all [`FilterControl`] methods.
···273274 let filter_ks = self.state.db.filter.clone();
274275 let inner = self.state.db.inner.clone();
275276 let filter_handle = self.state.filter.clone();
277277+ let state = self.state.clone();
276278 let mode = self.mode;
277279 let signals = self.signals;
278280 let collections = self.collections;
···282284 let mut batch = inner.batch();
283285 db_filter::apply_patch(&mut batch, &filter_ks, mode, signals, collections, excludes)?;
284286 batch.commit().into_diagnostic()?;
287287+ state.db.persist()?;
285288 db_filter::load(&filter_ks)
286289 })
287290 .await
+84-78
src/control/firehose.rs
···11use std::sync::Arc;
22-use std::sync::atomic::Ordering;
3244-use miette::{Context, IntoDiagnostic, Result};
33+use miette::{IntoDiagnostic, Result};
54use tokio::sync::watch;
65use tracing::{error, info};
76use url::Url;
···6261 state.filter.clone(),
6362 enabled,
6463 shared.verify_signatures,
6565- );
6464+ )
6565+ .await;
66666767 let relay_for_log = relay_url.clone();
6868 let abort = tokio::spawn(async move {
···8888}
89899090impl FirehoseHandle {
9191- /// enable the firehose. no-op if already enabled.
9191+ pub(super) fn new(state: Arc<AppState>) -> Self {
9292+ Self {
9393+ state,
9494+ shared: Arc::new(std::sync::OnceLock::new()),
9595+ tasks: Arc::new(scc::HashMap::new()),
9696+ persisted: Arc::new(scc::HashSet::new()),
9797+ }
9898+ }
9999+100100+ /// enable firehose ingestion, no-op if already enabled.
92101 pub fn enable(&self) {
93102 self.state.firehose_enabled.send_replace(true);
94103 }
9595- /// disable the firehose. the current message finishes processing before the connection closes.
104104+ /// disable firehose ingestion, in-flight messages complete before pausing.
96105 pub fn disable(&self) {
97106 self.state.firehose_enabled.send_replace(false);
98107 }
9999- /// returns the current enabled state of the firehose.
108108+ /// returns the current enabled state of firehose ingestion.
100109 pub fn is_enabled(&self) -> bool {
101110 *self.state.firehose_enabled.borrow()
102111 }
103112104104- /// reset the stored cursor for the given relay URL.
105105- ///
106106- /// clears the `firehose_cursor|{host}|{scheme}` entry from the cursors keyspace and zeroes
107107- /// the in-memory cursor. the next connection will tail live events from the current head.
108108- pub async fn reset_cursor(&self, url: &str) -> Result<()> {
109109- let relay_url = Url::parse(url)
110110- .into_diagnostic()
111111- .wrap_err_with(|| format!("invalid relay url: {url:?}"))?;
112112- let key = keys::firehose_cursor_key_from_url(&relay_url);
113113- let db = self.state.db.clone();
114114- tokio::task::spawn_blocking(move || db.cursors.remove(key).into_diagnostic())
115115- .await
116116- .into_diagnostic()??;
117117-118118- self.state.firehose_cursors.peek_with(&relay_url, |_, c| {
119119- c.store(0, Ordering::SeqCst);
120120- });
121121- Ok(())
122122- }
123123-124124- /// return info on all currently active firehose sources.
113113+ /// list all currently active firehose sources.
125114 pub async fn list_sources(&self) -> Vec<FirehoseSourceInfo> {
126126- let mut sources = Vec::new();
115115+ let mut out = Vec::new();
127116 self.tasks
128128- .iter_async(|url, handle| {
129129- sources.push(FirehoseSourceInfo {
117117+ .any_async(|url, handle| {
118118+ out.push(FirehoseSourceInfo {
130119 url: url.clone(),
131120 persisted: self.persisted.contains_sync(url),
132121 is_pds: handle.is_pds,
133122 });
134134- true
123123+ false
135124 })
136125 .await;
137137- sources
126126+ out
138127 }
139128140140- /// add a new firehose relay at runtime.
141141- ///
142142- /// the URL is persisted to the database and will be re-spawned on restart. if a relay with
143143- /// the same URL already exists it is replaced: the running task is stopped and a new one
144144- /// is started. any cursor state for that URL is preserved.
129129+ /// add a new firehose source at runtime, persisting it to the database.
145130 ///
146146- /// returns an error if called before [`Hydrant::run`].
131131+ /// if a source with the same URL already exists, it is replaced: the
132132+ /// running task is stopped and a new one is started with the new `is_pds`
133133+ /// setting. existing cursor state for the URL is preserved.
147134 pub async fn add_source(&self, url: Url, is_pds: bool) -> Result<()> {
148148- let Some(shared) = self.shared.get() else {
149149- miette::bail!("firehose not yet started: call Hydrant::run() first");
150150- };
135135+ let shared = self
136136+ .shared
137137+ .get()
138138+ .ok_or_else(|| miette::miette!("firehose worker not started"))?;
151139152152- let db = self.state.db.clone();
140140+ // persist to db first
153141 let key = keys::firehose_source_key(url.as_str());
154154- let value = rmp_serde::to_vec(&crate::db::FirehoseSourceMeta { is_pds })
155155- .map_err(|e| miette::miette!("failed to serialize firehose source meta: {e}"))?;
156156- tokio::task::spawn_blocking(move || db.crawler.insert(key, value).into_diagnostic())
157157- .await
158158- .into_diagnostic()??;
142142+ tokio::task::spawn_blocking({
143143+ let state = self.state.clone();
144144+ move || {
145145+ let mut batch = state.db.inner.batch();
146146+ let value = rmp_serde::to_vec(&db::FirehoseSourceMeta { is_pds }).map_err(|e| {
147147+ miette::miette!("failed to serialize firehose source meta: {e}")
148148+ })?;
149149+ batch.insert(&state.db.crawler, key, &value);
150150+ batch.commit().into_diagnostic()?;
151151+ state.db.persist()
152152+ }
153153+ })
154154+ .await
155155+ .into_diagnostic()??;
156156+157157+ let _ = self.persisted.insert_async(url.clone()).await;
159158160159 let enabled_rx = self.state.firehose_enabled.subscribe();
161160 let handle = spawn_firehose_ingestor(&url, is_pds, &self.state, shared, enabled_rx).await?;
161161+ self.tasks.upsert_async(url, handle).await;
162162163163- let _ = self.persisted.insert_async(url.clone()).await;
164164- match self.tasks.entry_async(url).await {
165165- scc::hash_map::Entry::Vacant(e) => {
166166- e.insert_entry(handle);
167167- }
168168- scc::hash_map::Entry::Occupied(mut e) => {
169169- *e.get_mut() = handle;
170170- }
171171- }
172163 Ok(())
173164 }
174165175175- /// remove a firehose relay at runtime by URL.
176176- ///
177177- /// aborts the running ingestor task. if the source was added via the API it is removed from
178178- /// the database and will not reappear on restart. `RELAY_HOSTS` sources are only stopped for
179179- /// the current session; they reappear on the next restart.
166166+ /// remove a firehose source at runtime.
180167 ///
181181- /// returns `true` if the relay was found and removed, `false` if it was not running.
182182- /// returns an error if called before [`Hydrant::run`].
168168+ /// returns `true` if the source was found and removed, `false` otherwise.
169169+ /// if the source was added via the API, it is removed from the database;
170170+ /// if it came from the static config, only the running task is stopped.
183171 pub async fn remove_source(&self, url: &Url) -> Result<bool> {
184184- if self.shared.get().is_none() {
185185- miette::bail!("firehose not yet started: call Hydrant::run() first");
172172+ if self.persisted.contains_async(url).await {
173173+ let url_str = url.to_string();
174174+ tokio::task::spawn_blocking({
175175+ let state = self.state.clone();
176176+ move || {
177177+ state
178178+ .db
179179+ .crawler
180180+ .remove(keys::firehose_source_key(&url_str))
181181+ .into_diagnostic()?;
182182+ state.db.persist()
183183+ }
184184+ })
185185+ .await
186186+ .into_diagnostic()??;
187187+ self.persisted.remove_async(url).await;
186188 }
187189188188- if self.tasks.remove_async(url).await.is_none() {
189189- return Ok(false);
190190- }
190190+ Ok(self.tasks.remove_async(url).await.is_some())
191191+ }
191192192192- // remove from relay_cursors (persist thread will stop tracking it)
193193- self.state.firehose_cursors.remove_async(url).await;
193193+ /// reset the stored firehose cursor for a given URL.
194194+ pub async fn reset_cursor(&self, url: &str) -> Result<()> {
195195+ let url = Url::parse(url).into_diagnostic()?;
196196+ let key = keys::firehose_cursor_key_from_url(&url);
197197+ tokio::task::spawn_blocking({
198198+ let state = self.state.clone();
199199+ move || {
200200+ state.db.cursors.remove(key).into_diagnostic()?;
201201+ state.db.persist()
202202+ }
203203+ })
204204+ .await
205205+ .into_diagnostic()??;
194206195195- if self.persisted.remove_async(url).await.is_some() {
196196- let db = self.state.db.clone();
197197- let key = keys::firehose_source_key(url.as_str());
198198- tokio::task::spawn_blocking(move || db.crawler.remove(key).into_diagnostic())
199199- .await
200200- .into_diagnostic()??;
201201- }
207207+ self.state.firehose_cursors.remove_async(&url).await;
202208203203- Ok(true)
209209+ Ok(())
204210 }
205211}
+33-33
src/control/mod.rs
···33pub(crate) mod crawler;
44pub(crate) mod filter;
55pub(crate) mod firehose;
66+pub(crate) mod pds;
67pub(crate) mod repos;
78pub(crate) mod stream;
89910pub use crawler::{CrawlerHandle, CrawlerSourceInfo};
1011pub use filter::{FilterControl, FilterPatch, FilterSnapshot};
1112pub use firehose::{FirehoseHandle, FirehoseSourceInfo};
1313+pub use pds::{PdsControl, PdsTierAssignment, PdsTierDefinition};
1214pub use repos::{ListedRecord, Record, RecordList, RepoHandle, RepoInfo, ReposControl};
1315use smol_str::{SmolStr, ToSmolStr};
1416···8789 pub firehose: FirehoseHandle,
8890 pub backfill: BackfillHandle,
8991 pub filter: FilterControl,
9292+ pub pds: PdsControl,
9093 pub repos: ReposControl,
9194 pub db: DbControl,
9295 #[cfg(feature = "backlinks")]
···121124 let signals = config
122125 .filter_signals
123126 .clone()
124124- .map(crate::filter::SetUpdate::Set);
127127+ .map(crate::patch::SetUpdate::Set);
125128 let collections = config
126129 .filter_collections
127130 .clone()
128128- .map(crate::filter::SetUpdate::Set);
131131+ .map(crate::patch::SetUpdate::Set);
129132 let excludes = config
130133 .filter_excludes
131134 .clone()
132132- .map(crate::filter::SetUpdate::Set);
135135+ .map(crate::patch::SetUpdate::Set);
133136134137 tokio::task::spawn_blocking(move || {
135138 let mut batch = inner.batch();
···174177 tasks: Arc::new(scc::HashMap::new()),
175178 persisted: Arc::new(scc::HashSet::new()),
176179 },
177177- firehose: FirehoseHandle {
178178- state: state.clone(),
179179- shared: Arc::new(std::sync::OnceLock::new()),
180180- tasks: Arc::new(scc::HashMap::new()),
181181- persisted: Arc::new(scc::HashSet::new()),
182182- },
180180+ firehose: FirehoseHandle::new(state.clone()),
183181 backfill: BackfillHandle(state.clone()),
184182 filter: FilterControl(state.clone()),
183183+ pds: pds::PdsControl(state.clone()),
185184 repos: ReposControl(state.clone()),
186185 db: DbControl(state.clone()),
187186 #[cfg(feature = "backlinks")]
···436435 // 11. spawn crawler infrastructure
437436 #[cfg(feature = "indexer")]
438437 {
439439- use crate::crawler::throttle::Throttler;
440438 use crate::crawler::{
441439 CrawlerStats, CrawlerWorker, InFlight, RetryProducer, SignalChecker,
442440 };
441441+ use crate::util::throttle::Throttler;
443442444443 let http = reqwest::Client::builder()
445444 .user_agent(concat!(
···450449 .gzip(true)
451450 .build()
452451 .expect("that reqwest will build");
453453- let pds_throttler = Throttler::new();
452452+ let pds_throttler = state.throttler.clone();
454453 let in_flight = InFlight::new();
455454 let stats = CrawlerStats::new(
456455 state.clone(),
···714713 ///
715714 /// sizes are in bytes, reported per keyspace.
716715 pub async fn stats(&self) -> Result<StatsResponse> {
717717- let db = self.state.db.clone();
716716+ let state = self.state.clone();
718717719718 let mut counts: BTreeMap<&'static str, u64> = futures::future::join_all(
720719 [
···729728 ]
730729 .into_iter()
731730 .map(|name| {
732732- let db = db.clone();
733733- async move { (name, db.get_count(name).await) }
731731+ let state = state.clone();
732732+ async move { (name, state.db.get_count(name).await) }
734733 }),
735734 )
736735 .await
737736 .into_iter()
738737 .collect();
739738740740- counts.insert("events", db.events.approximate_len() as u64);
739739+ counts.insert("events", state.db.events.approximate_len() as u64);
741740742741 let sizes = tokio::task::spawn_blocking(move || {
743742 let mut s = BTreeMap::new();
744744- s.insert("repos", db.repos.disk_space());
745745- s.insert("records", db.records.disk_space());
746746- s.insert("blocks", db.blocks.disk_space());
747747- s.insert("cursors", db.cursors.disk_space());
748748- s.insert("pending", db.pending.disk_space());
749749- s.insert("resync", db.resync.disk_space());
750750- s.insert("resync_buffer", db.resync_buffer.disk_space());
751751- s.insert("events", db.events.disk_space());
752752- s.insert("counts", db.counts.disk_space());
753753- s.insert("filter", db.filter.disk_space());
754754- s.insert("crawler", db.crawler.disk_space());
743743+ s.insert("repos", state.db.repos.disk_space());
744744+ s.insert("records", state.db.records.disk_space());
745745+ s.insert("blocks", state.db.blocks.disk_space());
746746+ s.insert("cursors", state.db.cursors.disk_space());
747747+ s.insert("pending", state.db.pending.disk_space());
748748+ s.insert("resync", state.db.resync.disk_space());
749749+ s.insert("resync_buffer", state.db.resync_buffer.disk_space());
750750+ s.insert("events", state.db.events.disk_space());
751751+ s.insert("counts", state.db.counts.disk_space());
752752+ s.insert("filter", state.db.filter.disk_space());
753753+ s.insert("crawler", state.db.crawler.disk_space());
755754 s
756755 })
757756 .await
···788787 ///
789788 /// returns the seq we are on for this host.
790789 pub async fn get_host_status(&self, hostname: &str) -> Result<Option<Host>> {
791791- let db = self.state.db.clone();
790790+ let state = self.state.clone();
792791 let hostname = hostname.to_smolstr();
793792794793 tokio::task::spawn_blocking(move || {
795794 let key = keys::firehose_cursor_key(&hostname);
796796- let Some(seq) = db.cursors.get(&key).into_diagnostic()? else {
795795+ let Some(seq) = state.db.cursors.get(&key).into_diagnostic()? else {
797796 return Ok(None);
798797 };
799798 let seq = i64::from_be_bytes(
···820819 cursor: Option<&str>,
821820 limit: usize,
822821 ) -> Result<(Vec<Host>, Option<SmolStr>)> {
823823- let db = self.state.db.clone();
822822+ let state = self.state.clone();
824823 let cursor = cursor.map(str::to_string);
825824826825 tokio::task::spawn_blocking(move || {
···836835837836 // fetch one extra item to detect whether there is a next page
838837 let mut hosts: Vec<Host> = Vec::with_capacity(limit + 1);
839839- for item in db
838838+ for item in state
839839+ .db
840840 .cursors
841841 .range((start_bound, std::ops::Bound::Excluded(prefix_end)))
842842 .take(limit + 1)
···972972 state
973973 .with_ingestion_paused(async || {
974974 let train = |name: &'static str| {
975975- let db = state.db.clone();
976976- tokio::task::spawn_blocking(move || db.train_dict(name))
977977- .map(|res| res.into_diagnostic().flatten())
975975+ let state = state.clone();
976976+ tokio::task::spawn_blocking(move || state.db.train_dict(name))
977977+ .map(|res: Result<_, _>| res.into_diagnostic().flatten())
978978 };
979979 tokio::try_join!(train("repos"), train("blocks"), train("events")).map(|_| ())
980980 })
+113
src/control/pds.rs
···11+use std::collections::HashMap;
22+use std::sync::Arc;
33+44+use miette::{IntoDiagnostic, Result};
55+use serde::Serialize;
66+use smol_str::SmolStr;
77+88+use crate::config::RateTier;
99+use crate::db::pds_tiers as db_pds;
1010+use crate::state::AppState;
1111+1212+/// a single PDS-to-tier assignment.
1313+#[derive(Debug, Clone, Serialize)]
1414+pub struct PdsTierAssignment {
1515+ pub host: String,
1616+ pub tier: String,
1717+}
1818+1919+/// a rate tier definition, as returned by the API.
2020+#[derive(Debug, Clone, Serialize)]
2121+pub struct PdsTierDefinition {
2222+ pub per_second_base: u64,
2323+ pub per_second_account_mul: f64,
2424+ pub per_hour: u64,
2525+ pub per_day: u64,
2626+}
2727+2828+impl From<RateTier> for PdsTierDefinition {
2929+ fn from(t: RateTier) -> Self {
3030+ Self {
3131+ per_second_base: t.per_second_base,
3232+ per_second_account_mul: t.per_second_account_mul,
3333+ per_hour: t.per_hour,
3434+ per_day: t.per_day,
3535+ }
3636+ }
3737+}
3838+3939+/// runtime control over pds related behaviour (eg. ratelimits).
4040+#[derive(Clone)]
4141+pub struct PdsControl(pub(super) Arc<AppState>);
4242+4343+impl PdsControl {
4444+ /// list all current per-PDS tier assignments.
4545+ pub async fn list_assignments(&self) -> Vec<PdsTierAssignment> {
4646+ let snapshot = self.0.pds_tiers.load();
4747+ snapshot
4848+ .iter()
4949+ .map(|(host, tier)| PdsTierAssignment {
5050+ host: host.clone(),
5151+ tier: tier.to_string(),
5252+ })
5353+ .collect()
5454+ }
5555+5656+ /// list all configured rate tier definitions.
5757+ pub fn list_rate_tiers(&self) -> HashMap<String, PdsTierDefinition> {
5858+ self.0
5959+ .rate_tiers
6060+ .iter()
6161+ .map(|(name, tier)| (name.clone(), PdsTierDefinition::from(*tier)))
6262+ .collect()
6363+ }
6464+6565+ /// assign `host` to `tier`, persisting the change to the database.
6666+ /// returns an error if `tier` is not a known tier name.
6767+ pub async fn set_tier(&self, host: String, tier: String) -> Result<()> {
6868+ if !self.0.rate_tiers.contains_key(&tier) {
6969+ miette::bail!(
7070+ "unknown tier '{tier}'; known tiers: {:?}",
7171+ self.0.rate_tiers.keys().collect::<Vec<_>>()
7272+ );
7373+ }
7474+7575+ let state = self.0.clone();
7676+ let host_clone = host.clone();
7777+ let tier_clone = tier.clone();
7878+ tokio::task::spawn_blocking(move || {
7979+ let mut batch = state.db.inner.batch();
8080+ db_pds::set(&mut batch, &state.db.filter, &host_clone, &tier_clone);
8181+ batch.commit().into_diagnostic()?;
8282+ state.db.persist()
8383+ })
8484+ .await
8585+ .into_diagnostic()??;
8686+8787+ let mut snapshot = (**self.0.pds_tiers.load()).clone();
8888+ snapshot.insert(host, SmolStr::new(&tier));
8989+ self.0.pds_tiers.store(Arc::new(snapshot));
9090+9191+ Ok(())
9292+ }
9393+9494+ /// remove any explicit tier assignment for `host`, reverting it to the default tier.
9595+ pub async fn remove_tier(&self, host: String) -> Result<()> {
9696+ let state = self.0.clone();
9797+ let host_clone = host.clone();
9898+ tokio::task::spawn_blocking(move || {
9999+ let mut batch = state.db.inner.batch();
100100+ db_pds::remove(&mut batch, &state.db.filter, &host_clone);
101101+ batch.commit().into_diagnostic()?;
102102+ state.db.persist()
103103+ })
104104+ .await
105105+ .into_diagnostic()??;
106106+107107+ let mut snapshot = (**self.0.pds_tiers.load()).clone();
108108+ snapshot.remove(&host);
109109+ self.0.pds_tiers.store(Arc::new(snapshot));
110110+111111+ Ok(())
112112+ }
113113+}
+12-6
src/control/repos.rs
···8686 std::ops::Bound::Unbounded
8787 };
88888989- let db = self.0.db.clone();
8989+ let state = self.0.clone();
9090 self.0
9191 .db
9292 .repos
···9696 let repo_state = crate::db::deser_repo_state(&v)?.into_static();
9797 let did = TrimmedDid::try_from(k.as_ref())?.to_did();
9898 let metadata_key = keys::repo_metadata_key(&did);
9999- let metadata = db
9999+ let metadata = state
100100+ .db
100101 .repo_metadata
101102 .get(&metadata_key)
102103 .into_diagnostic()?
···122123 };
123124124125 let repos = self.0.db.repos.clone();
125125- let db = self.0.db.clone();
126126+ let state = self.0.clone();
126127 self.0
127128 .db
128129 .pending
···144145 let repo_state = crate::db::deser_repo_state(bytes.as_ref())?;
145146 let did = TrimmedDid::try_from(did_key.as_ref())?.to_did();
146147 let metadata_key = keys::repo_metadata_key(&did);
147147- let metadata = db
148148+ let metadata = state
149149+ .db
148150 .repo_metadata
149151 .get(&metadata_key)
150152 .into_diagnostic()?
···169171 };
170172171173 let repos = self.0.db.repos.clone();
172172- let db = self.0.db.clone();
174174+ let state = self.0.clone();
173175 self.0
174176 .db
175177 .resync
···184186 let repo_state = crate::db::deser_repo_state(bytes.as_ref())?;
185187 let did = TrimmedDid::try_from(did_key.as_ref())?.to_did();
186188 let metadata_key = keys::repo_metadata_key(&did);
187187- let metadata = db
189189+ let metadata = state
190190+ .db
188191 .repo_metadata
189192 .get(&metadata_key)
190193 .into_diagnostic()?
···304307 }
305308306309 batch.commit().into_diagnostic()?;
310310+ state.db.persist()?;
307311 Ok::<_, miette::Report>((queued, transitions))
308312 })
309313 .await
···369373 }
370374371375 batch.commit().into_diagnostic()?;
376376+ state.db.persist()?;
372377 Ok::<_, miette::Report>((added, queued, transitions))
373378 })
374379 .await
···438443 }
439444440445 batch.commit().into_diagnostic()?;
446446+ state.db.persist()?;
441447 Ok::<_, miette::Report>((untracked, gauge_decrements))
442448 })
443449 .await
+3-3
src/crawler/list_repos.rs
···11-use crate::crawler::throttle::{OrFailure, ThrottleHandle, Throttler};
21use crate::db::keys::crawler_cursor_key;
32use crate::db::{Db, keys};
43use crate::state::AppState;
44+use crate::util::throttle::{OrFailure, ThrottleHandle, Throttler};
55use crate::util::{
66 ErrorForStatus, RetryOutcome, RetryWithBackoff, WatchEnabledExt, parse_retry_after,
77};
···653653 }
654654655655 async fn process_queue(&self) -> Result<Option<Duration>> {
656656- let db = self.checker.state.db.clone();
656656+ let state = self.checker.state.clone();
657657658658 struct ScanResult {
659659 ready: Vec<Did<'static>>,
···675675 let mut next_wake: Option<Duration> = None;
676676 let mut had_more = false;
677677678678- for guard in db.crawler.prefix(keys::CRAWLER_RETRY_PREFIX) {
678678+ for guard in state.db.crawler.prefix(keys::CRAWLER_RETRY_PREFIX) {
679679 let (key, val) = guard.into_inner().into_diagnostic()?;
680680 let state: RetryState = rmp_serde::from_slice(&val).into_diagnostic()?;
681681 let did = keys::crawler_retry_parse_key(&key)?.to_did();
+1-2
src/crawler/mod.rs
···14141515mod by_collection;
1616mod list_repos;
1717-pub mod throttle;
1817mod worker;
19182020-use throttle::Throttler;
1919+use crate::util::throttle::Throttler;
21202221pub(crate) use by_collection::ByCollectionProducer;
2322pub(crate) use list_repos::{ListReposProducer, RetryProducer, SignalChecker};
+114-5
src/crawler/throttle.rs
src/util/throttle.rs
···11+use crate::config::RateTier;
22+use parking_lot::Mutex;
13use scc::HashMap;
24use std::future::Future;
35use std::sync::Arc;
46use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering};
55-use std::time::Duration;
77+use std::time::{Duration, Instant};
68use tokio::sync::{Notify, Semaphore, SemaphorePermit};
79use url::Url;
810···1012/// ref pds allows 10 requests per second... so 10 should be fine
1113const PER_PDS_CONCURRENCY: usize = 10;
12141515+// per second, hour and day
1616+const DURATIONS: [Duration; 3] = [
1717+ Duration::from_secs(1),
1818+ Duration::from_secs(3600),
1919+ Duration::from_secs(86400),
2020+];
2121+1322#[derive(Clone)]
1423pub struct Throttler {
1524 states: Arc<HashMap<Url, Arc<State>>>,
···5463 /// let tasks exit naturally, deferring to the background retry loop.
5564 failure_notify: Notify,
5665 semaphore: Semaphore,
6666+ rate_limiter: RateLimiter,
5767}
58685969impl State {
···6474 consecutive_timeouts: AtomicUsize::new(0),
6575 failure_notify: Notify::new(),
6676 semaphore: Semaphore::new(PER_PDS_CONCURRENCY),
7777+ rate_limiter: RateLimiter::new(),
6778 }
6879 }
6980}
···92103 /// called on a 429 response. `retry_after_secs` comes from the `Retry-After`
93104 /// header if present; falls back to 60s. uses `fetch_max` so concurrent callers
94105 /// don't race each other back to a shorter window.
9595- ///
9696- /// deliberately does NOT notify waiters — 429s are soft and tasks should exit
9797- /// naturally via the `Retry` result rather than being cancelled.
98106 pub fn record_ratelimit(&self, retry_after_secs: Option<u64>) {
99107 let secs = retry_after_secs.unwrap_or(60) as i64;
100108 let until = chrono::Utc::now().timestamp() + secs;
···155163 }
156164157165 /// resolves when this PDS gets a hard failure notification.
158158- /// used by `or_throttle` and the semaphore acquire select to cancel in-flight work.
159166 pub async fn wait_for_failure(&self) {
160167 loop {
161168 let notified = self.state.failure_notify.notified();
···163170 return;
164171 }
165172 notified.await;
173173+ }
174174+ }
175175+176176+ /// waits until the rate tier's limits allow more events for this PDS.
177177+ /// sleeps precisely until the most restrictive window opens rather than polling.
178178+ pub async fn wait_for_allow(&self, num_accounts: u64, tier: &RateTier) {
179179+ let limits = limits_for(num_accounts, tier);
180180+ while let Some(wait) = self.state.rate_limiter.try_acquire(limits) {
181181+ tokio::time::sleep(wait).await;
182182+ }
183183+ }
184184+}
185185+186186+fn limits_for(num_accounts: u64, tier: &RateTier) -> [u64; 3] {
187187+ let per_sec = tier
188188+ .per_second_base
189189+ .max((num_accounts as f64 * tier.per_second_account_mul) as u64);
190190+ [per_sec, tier.per_hour, tier.per_day]
191191+}
192192+193193+struct WindowState {
194194+ count: u64,
195195+ prev_count: u64,
196196+ window_start: Instant,
197197+}
198198+199199+impl WindowState {
200200+ fn new() -> Self {
201201+ Self {
202202+ count: 0,
203203+ prev_count: 0,
204204+ window_start: Instant::now(),
205205+ }
206206+ }
207207+208208+ fn rotate(&mut self, dur: Duration) {
209209+ let elapsed = self.window_start.elapsed();
210210+ if elapsed >= dur {
211211+ let n = (elapsed.as_nanos() / dur.as_nanos()).max(1) as u32;
212212+ self.prev_count = if n == 1 { self.count } else { 0 };
213213+ self.count = 0;
214214+ self.window_start += dur * n;
215215+ }
216216+ }
217217+218218+ /// returns how long to sleep before this window would allow one more event.
219219+ /// Duration::ZERO means allow now.
220220+ fn wait_needed(&self, dur: Duration, limit: u64) -> Duration {
221221+ let elapsed = self.window_start.elapsed();
222222+ let remaining = dur.saturating_sub(elapsed);
223223+ let weight = remaining.as_secs_f64() / dur.as_secs_f64();
224224+ let effective = self.count as f64 + self.prev_count as f64 * weight;
225225+226226+ if effective < limit as f64 {
227227+ return Duration::ZERO;
228228+ }
229229+230230+ if self.prev_count == 0 || self.count as f64 >= limit as f64 {
231231+ // must wait for a full window rotation
232232+ remaining + Duration::from_millis(1)
233233+ } else {
234234+ let secs = remaining.as_secs_f64()
235235+ - dur.as_secs_f64() * (limit as f64 - self.count as f64) / self.prev_count as f64;
236236+ Duration::from_secs_f64(secs.max(0.0)) + Duration::from_micros(500)
237237+ }
238238+ }
239239+}
240240+241241+struct RateLimiter {
242242+ // parking_lot::Mutex — uncontended path never touches the kernel
243243+ windows: Mutex<[WindowState; 3]>,
244244+}
245245+246246+impl RateLimiter {
247247+ fn new() -> Self {
248248+ Self {
249249+ windows: Mutex::new([WindowState::new(), WindowState::new(), WindowState::new()]),
250250+ }
251251+ }
252252+253253+ /// returns None if the slot was acquired, or Some(sleep_for) if limited.
254254+ fn try_acquire(&self, limits: [u64; 3]) -> Option<Duration> {
255255+ let mut windows = self.windows.lock();
256256+257257+ windows
258258+ .iter_mut()
259259+ .zip(DURATIONS)
260260+ .for_each(|(w, d)| w.rotate(d));
261261+262262+ let max_wait = windows
263263+ .iter()
264264+ .zip(DURATIONS)
265265+ .zip(limits)
266266+ .map(|((w, dur), limit)| w.wait_needed(dur, limit))
267267+ .max()
268268+ .unwrap_or(Duration::ZERO);
269269+270270+ if max_wait.is_zero() {
271271+ windows.iter_mut().for_each(|w| w.count += 1);
272272+ None
273273+ } else {
274274+ Some(max_wait)
166275 }
167276 }
168277}
+20-11
src/crawler/worker.rs
···137137138138 // filter already-known repos, build and commit the write batch, then return
139139 // the surviving guards so they are dropped on the async side after commit.
140140- let db = self.state.db.clone();
140140+ let app_state = self.state.clone();
141141 let surviving = tokio::time::timeout(
142142 BLOCKING_TASK_TIMEOUT,
143143 tokio::task::spawn_blocking(move || -> Result<Vec<InFlightGuard>> {
144144 let mut rng: SmallRng = rand::make_rng();
145145- let mut batch = db.inner.batch();
145145+ let mut batch = app_state.db.inner.batch();
146146 let mut surviving = Vec::new();
147147 for guard in guards {
148148 let did_key = keys::repo_key(&*guard);
149149 let metadata_key = keys::repo_metadata_key(&*guard);
150150- if db.repos.contains_key(&did_key).into_diagnostic()? {
150150+ if app_state
151151+ .db
152152+ .repos
153153+ .contains_key(&did_key)
154154+ .into_diagnostic()?
155155+ {
151156 continue;
152157 }
153158 let state = RepoState::backfilling();
154159 let metadata = RepoMetadata::backfilling(rng.next_u64());
155155- batch.insert(&db.repos, &did_key, ser_repo_state(&state)?);
160160+ batch.insert(&app_state.db.repos, &did_key, ser_repo_state(&state)?);
156161 batch.insert(
157157- &db.repo_metadata,
162162+ &app_state.db.repo_metadata,
158163 &metadata_key,
159164 crate::db::ser_repo_metadata(&metadata)?,
160165 );
161161- batch.insert(&db.pending, keys::pending_key(metadata.index_id), &did_key);
166166+ batch.insert(
167167+ &app_state.db.pending,
168168+ keys::pending_key(metadata.index_id),
169169+ &did_key,
170170+ );
162171 // clear any stale retry entry, this DID is confirmed and being enqueued
163163- batch.remove(&db.crawler, keys::crawler_retry_key(&*guard));
172172+ batch.remove(&app_state.db.crawler, keys::crawler_retry_key(&*guard));
164173 trace!(did = %*guard, "enqueuing repo");
165174 surviving.push(guard);
166175 }
167176 if let Some(cursor) = cursor_update {
168168- batch.insert(&db.cursors, cursor.key, cursor.value);
177177+ batch.insert(&app_state.db.cursors, cursor.key, cursor.value);
169178 }
170179 // todo: repo state overwrites here are acceptable?
171180 batch.commit().into_diagnostic()?;
···202211 }
203212204213 async fn commit_cursor(&self, cursor: CursorUpdate) -> Result<()> {
205205- let db = self.state.db.clone();
214214+ let state = self.state.clone();
206215 tokio::time::timeout(
207216 BLOCKING_TASK_TIMEOUT,
208217 tokio::task::spawn_blocking(move || {
209209- let mut batch = db.inner.batch();
210210- batch.insert(&db.cursors, cursor.key, cursor.value);
218218+ let mut batch = state.db.inner.batch();
219219+ batch.insert(&state.db.cursors, cursor.key, cursor.value);
211220 batch.commit().into_diagnostic()
212221 }),
213222 )
+8-13
src/db/filter.rs
···11use fjall::{Keyspace, OwnedWriteBatch};
22-use jacquard_common::IntoStatic;
33-use jacquard_common::types::nsid::Nsid;
42use jacquard_common::types::string::Did;
53use miette::{IntoDiagnostic, Result};
6475use crate::db::types::TrimmedDid;
88-use crate::filter::{FilterConfig, FilterMode, SetUpdate};
66+use crate::filter::{FilterConfig, FilterMode};
77+use crate::patch::SetUpdate;
98109pub const MODE_KEY: &[u8] = b"m";
1110pub const SIGNAL_PREFIX: u8 = b's';
···113112 for guard in ks.prefix(signal_prefix) {
114113 let (k, _) = guard.into_inner().into_diagnostic()?;
115114 let val = std::str::from_utf8(&k[signal_prefix.len()..]).into_diagnostic()?;
116116- config.signals.push(Nsid::new(val)?.into_static());
115115+ config.signals.push(val.into());
117116 }
118117119118 let col_prefix = [COLLECTION_PREFIX, SEP];
120119 for guard in ks.prefix(col_prefix) {
121120 let (k, _) = guard.into_inner().into_diagnostic()?;
122121 let val = std::str::from_utf8(&k[col_prefix.len()..]).into_diagnostic()?;
123123- config.collections.push(Nsid::new(val)?.into_static());
122122+ config.collections.push(val.into());
124123 }
125124126125 Ok(config)
···144143145144#[cfg(test)]
146145mod tests {
146146+ use smol_str::SmolStr;
147147+147148 use super::*;
148149149150 #[test]
···198199199200 let config = load(&ks)?;
200201 assert_eq!(config.mode, FilterMode::Filter);
201201- assert_eq!(
202202- config.signals,
203203- vec![Nsid::new("a.b.c").unwrap().into_static()]
204204- );
205205- assert_eq!(
206206- config.collections,
207207- vec![Nsid::new("d.e.f").unwrap().into_static()]
208208- );
202202+ assert_eq!(config.signals, vec![SmolStr::new("a.b.c")]);
203203+ assert_eq!(config.collections, vec![SmolStr::new("d.e.f")]);
209204210205 let excludes = read_set(&ks, EXCLUDE_PREFIX)?;
211206 assert_eq!(excludes, vec!["did:plc:yk4q3id7id6p5z3bypvshc64"]);
···274274 }
275275 SubscribeReposMessage::Account(account) => {
276276 debug!("processing account");
277277- Self::handle_account(ctx, &mut repo_state, &msg.firehose, *account)
277277+ Self::handle_account(ctx, &mut repo_state, &msg.firehose, *account, msg.is_pds)
278278 }
279279 _ => Ok(()),
280280 }
···472472 fn handle_account(
473473 ctx: &mut WorkerContext,
474474 repo_state: &mut RepoState,
475475- #[allow(unused_variables)] firehose: &Url,
475475+ firehose: &Url,
476476 #[allow(unused_mut)] mut account: Account<'static>,
477477+ is_pds: bool,
477478 ) -> Result<()> {
478479 let event_ms = account.time.0.timestamp_millis();
479480 if repo_state.last_message_time.is_some_and(|t| event_ms <= t) {
···483484484485 repo_state.advance_message_time(event_ms);
485486487487+ // always capture was_active for count tracking, not just in indexer mode
488488+ let was_active = repo_state.active;
486489 #[cfg(feature = "indexer")]
487487- let (was_active, was_status) = (repo_state.active, repo_state.status.clone());
490490+ let was_status = repo_state.status.clone();
488491489492 repo_state.active = account.active;
490493 if !account.active {
···512515 };
513516 }
514517518518+ // update per-PDS active account count on transitions
519519+ if is_pds {
520520+ if let Some(host) = firehose.host_str() {
521521+ let count_key = keys::pds_account_count_key(host);
522522+ if !was_active && repo_state.active {
523523+ ctx.state.db.update_count(&count_key, 1);
524524+ } else if was_active && !repo_state.active {
525525+ ctx.state.db.update_count(&count_key, -1);
526526+ }
527527+ }
528528+ }
529529+515530 let repo_key = keys::repo_key(&account.did);
516531517532 #[cfg(feature = "indexer")]
···558573 repo_state: &mut RepoState,
559574 source_host: &str,
560575 ) -> Result<AuthorityOutcome> {
561561- let expected = pds_host(repo_state.pds.as_deref());
576576+ let pds_host = |pds: &str| {
577577+ Url::parse(pds)
578578+ .ok()
579579+ .and_then(|u| u.host_str().map(SmolStr::new))
580580+ };
581581+582582+ let expected = repo_state.pds.as_deref().and_then(pds_host);
562583 if expected.as_deref() == Some(source_host) {
563584 return Ok(AuthorityOutcome::Authorized);
564585 }
565586566587 // try again once
567588 self.refresh_doc(did, repo_state)?;
568568- let Some(expected) = pds_host(repo_state.pds.as_deref()) else {
589589+ let Some(expected) = repo_state.pds.as_deref().and_then(pds_host) else {
569590 miette::bail!("can't get pds host???");
570591 };
571592···837858838859 db.update_count("repos", 1);
839860861861+ // track initial active state for per-PDS rate limiting
862862+ if msg.is_pds && repo_state.active {
863863+ if let Some(host) = msg.firehose.host_str() {
864864+ db.update_count(&keys::pds_account_count_key(host), 1);
865865+ }
866866+ }
867867+840868 Ok(Some(repo_state))
841869 }
842870···861889 /// host did not match even after doc resolution.
862890 WrongHost { expected: SmolStr },
863891}
864864-865865-fn pds_host(pds: Option<&str>) -> Option<SmolStr> {
866866- // todo: add faster host parsing since we only need that
867867- pds.and_then(|pds| Url::parse(pds).ok()).map(|u| {
868868- u.host_str()
869869- .map(SmolStr::new)
870870- .expect("that there is host in pds url")
871871- })
872872-}
+1
src/lib.rs
···1818pub(crate) mod ingest;
1919#[cfg(feature = "indexer")]
2020pub(crate) mod ops;
2121+pub(crate) mod patch;
2122pub(crate) mod resolver;
2223pub(crate) mod state;
2324pub(crate) mod util;
+11
src/patch.rs
···11+use serde::{Deserialize, Serialize};
22+33+/// apply a bool patch or set replacement for a single set update.
44+#[derive(Debug, Clone, Serialize, Deserialize)]
55+#[serde(untagged)]
66+pub(crate) enum SetUpdate {
77+ /// replace the entire set with this list
88+ Set(Vec<String>),
99+ /// patch: true = add, false = remove
1010+ Patch(std::collections::HashMap<String, bool>),
1111+}
+45-1
src/state.rs
···11+use std::collections::HashMap;
22+use std::sync::Arc;
13use std::sync::atomic::AtomicI64;
24use std::time::Duration;
3566+use arc_swap::ArcSwap;
47use miette::Result;
88+use smol_str::SmolStr;
59use tokio::sync::{Notify, watch};
610use url::Url;
711812use crate::{
99- config::Config,
1313+ config::{Config, RateTier},
1014 db::Db,
1115 filter::{FilterHandle, new_handle},
1216 resolver::Resolver,
1717+ util::throttle::Throttler,
1318};
14192020+/// pds hostname -> tier name. updated atomically via ArcSwap.
2121+pub(crate) type PdsTierHandle = Arc<ArcSwap<HashMap<String, SmolStr>>>;
2222+1523pub struct AppState {
1624 pub db: Db,
1725 pub resolver: Resolver,
1826 pub(crate) filter: FilterHandle,
2727+ pub(crate) pds_tiers: PdsTierHandle,
2828+ pub(crate) rate_tiers: HashMap<String, RateTier>,
1929 pub firehose_cursors: scc::HashIndex<Url, AtomicI64>,
2030 pub backfill_notify: Notify,
2131 pub crawler_enabled: watch::Sender<bool>,
2232 pub firehose_enabled: watch::Sender<bool>,
2333 pub backfill_enabled: watch::Sender<bool>,
2434 pub ephemeral_ttl: Duration,
3535+ pub throttler: Throttler,
2536}
26372738impl AppState {
···41524253 let filter = new_handle(filter_config);
43545555+ // load persisted per-PDS tier assignments from the filter keyspace.
5656+ // trusted_hosts from config are merged in as defaults (not persisted here; they seed
5757+ // only if the host has no existing assignment in the DB).
5858+ let mut tier_map: HashMap<String, SmolStr> = crate::db::pds_tiers::load(&db.filter)
5959+ .unwrap_or_default()
6060+ .into_iter()
6161+ .map(|(host, tier)| (host.to_string(), tier))
6262+ .collect();
6363+ for host in &config.trusted_hosts {
6464+ tier_map
6565+ .entry(host.clone())
6666+ .or_insert_with(|| SmolStr::new("trusted"));
6767+ }
6868+ let pds_tiers = Arc::new(ArcSwap::new(Arc::new(tier_map)));
6969+4470 let relay_cursors = scc::HashIndex::new();
45714672 let (crawler_enabled, _) = watch::channel(crawler_default);
···5177 db,
5278 resolver,
5379 filter,
8080+ pds_tiers,
8181+ rate_tiers: config.rate_tiers.clone(),
5482 firehose_cursors: relay_cursors,
5583 backfill_notify: Notify::new(),
5684 crawler_enabled,
5785 firehose_enabled,
5886 backfill_enabled,
5987 ephemeral_ttl: config.ephemeral_ttl.clone(),
8888+ throttler: Throttler::new(),
6089 })
6190 }
62916392 pub fn notify_backfill(&self) {
6493 self.backfill_notify.notify_one();
9494+ }
9595+9696+ /// returns the rate tier for the given PDS hostname.
9797+ /// falls back to the "default" tier if no assignment exists or the assigned tier is unknown.
9898+ pub fn pds_tier_for(&self, host: &str) -> RateTier {
9999+ let default = self
100100+ .rate_tiers
101101+ .get("default")
102102+ .copied()
103103+ .unwrap_or_else(RateTier::default_tier);
104104+ let snapshot = self.pds_tiers.load();
105105+ snapshot
106106+ .get(host)
107107+ .and_then(|name| self.rate_tiers.get(name.as_str()).copied())
108108+ .unwrap_or(default)
65109 }
6611067111 /// pauses the crawler, firehose, and backfill worker, runs `f`, then restores their prior state.
+2
src/util.rs
src/util/mod.rs
···10101111use crate::{db::types::DidKey, types::RepoStatus};
12121313+pub mod throttle;
1414+1315/// outcome of [`RetryWithBackoff::retry`] when the operation does not succeed.
1416pub enum RetryOutcome<E> {
1517 /// ratelimited after exhausting all retries
+294
tests/api.nu
···250250 print "firehose source tests passed!"
251251}
252252253253+def test-pds-tiers [url: string, pid: int] {
254254+ print "=== test: pds tier management ==="
255255+256256+ # initial state: no assignments, built-in rate tiers present
257257+ print " GET /pds/tiers (expect empty assignments, built-in rate_tiers)..."
258258+ let initial = (http get $"($url)/pds/tiers")
259259+ if ($initial.assignments | length) != 0 {
260260+ fail $"expected empty assignments, got ($initial.assignments | length)" $pid
261261+ }
262262+ if not ("default" in $initial.rate_tiers) {
263263+ fail "expected 'default' tier in rate_tiers" $pid
264264+ }
265265+ if not ("trusted" in $initial.rate_tiers) {
266266+ fail "expected 'trusted' tier in rate_tiers" $pid
267267+ }
268268+ print " ok: empty assignments and built-in rate tiers present"
269269+270270+ # GET /pds/rate-tiers returns the same definitions with the right fields
271271+ print " GET /pds/rate-tiers (check structure)..."
272272+ let rate_tiers = (http get $"($url)/pds/rate-tiers")
273273+ for tier_name in ["default", "trusted"] {
274274+ let tier = ($rate_tiers | get $tier_name)
275275+ for field in ["per_second_base", "per_second_account_mul", "per_hour", "per_day"] {
276276+ if not ($field in $tier) {
277277+ fail $"($tier_name) tier missing field ($field)" $pid
278278+ }
279279+ }
280280+ }
281281+ # trusted tier must have higher per-second limit than default
282282+ if ($rate_tiers.trusted.per_second_base) <= ($rate_tiers.default.per_second_base) {
283283+ fail $"expected trusted.per_second_base > default, got ($rate_tiers.trusted.per_second_base) vs ($rate_tiers.default.per_second_base)" $pid
284284+ }
285285+ print " ok: rate tier definitions have correct fields and expected ordering"
286286+287287+ # assign a host to the trusted tier
288288+ print " PUT /pds/tiers (assign to trusted)..."
289289+ http put -f -e -t application/json $"($url)/pds/tiers" {
290290+ host: "pds.example.com",
291291+ tier: "trusted"
292292+ } | assert-status 200 "PUT /pds/tiers" $pid
293293+ let after_assign = (http get $"($url)/pds/tiers")
294294+ if ($after_assign.assignments | length) != 1 {
295295+ fail $"expected 1 assignment, got ($after_assign.assignments | length)" $pid
296296+ }
297297+ let a = ($after_assign.assignments | first)
298298+ if $a.host != "pds.example.com" {
299299+ fail $"expected host=pds.example.com, got ($a.host)" $pid
300300+ }
301301+ if $a.tier != "trusted" {
302302+ fail $"expected tier=trusted, got ($a.tier)" $pid
303303+ }
304304+ print $" ok: assignment created host=($a.host), tier=($a.tier)"
305305+306306+ # re-assigning the same host to a different tier updates without creating a duplicate
307307+ print " PUT /pds/tiers (re-assign to default)..."
308308+ http put -f -e -t application/json $"($url)/pds/tiers" {
309309+ host: "pds.example.com",
310310+ tier: "default"
311311+ } | assert-status 200 "PUT /pds/tiers re-assign" $pid
312312+ let after_reassign = (http get $"($url)/pds/tiers")
313313+ if ($after_reassign.assignments | length) != 1 {
314314+ fail $"expected 1 assignment after re-assign, got ($after_reassign.assignments | length)" $pid
315315+ }
316316+ if ($after_reassign.assignments | first).tier != "default" {
317317+ fail $"expected tier=default after re-assign, got (($after_reassign.assignments | first).tier)" $pid
318318+ }
319319+ print " ok: re-assign updates tier without creating a duplicate"
320320+321321+ # assigning an unknown tier name is rejected with 400
322322+ print " PUT /pds/tiers (unknown tier, expect 400)..."
323323+ http put -f -e -t application/json $"($url)/pds/tiers" {
324324+ host: "pds.example.com",
325325+ tier: "nonexistent"
326326+ } | assert-status 400 "PUT /pds/tiers unknown tier" $pid
327327+ let after_bad = (http get $"($url)/pds/tiers")
328328+ if ($after_bad.assignments | length) != 1 {
329329+ fail "expected assignment count unchanged after rejected request" $pid
330330+ }
331331+ if ($after_bad.assignments | first).tier != "default" {
332332+ fail "expected tier unchanged after rejected request" $pid
333333+ }
334334+ print " ok: unknown tier name rejected with 400, existing assignment unchanged"
335335+336336+ # add a second host to verify multi-assignment listing works
337337+ print " PUT /pds/tiers (second host)..."
338338+ http put -f -e -t application/json $"($url)/pds/tiers" {
339339+ host: "other.example.com",
340340+ tier: "trusted"
341341+ } | assert-status 200 "PUT /pds/tiers second host" $pid
342342+ let after_second = (http get $"($url)/pds/tiers")
343343+ if ($after_second.assignments | length) != 2 {
344344+ fail $"expected 2 assignments, got ($after_second.assignments | length)" $pid
345345+ }
346346+ print " ok: two distinct hosts listed independently"
347347+348348+ # remove the first host
349349+ print " DELETE /pds/tiers (first host)..."
350350+ http delete -f -e -t application/json $"($url)/pds/tiers" --data {
351351+ host: "pds.example.com"
352352+ } | assert-status 200 "DELETE /pds/tiers" $pid
353353+ let after_del = (http get $"($url)/pds/tiers")
354354+ if ($after_del.assignments | length) != 1 {
355355+ fail $"expected 1 assignment after delete, got ($after_del.assignments | length)" $pid
356356+ }
357357+ if ($after_del.assignments | first).host != "other.example.com" {
358358+ fail "expected only other.example.com to remain after delete" $pid
359359+ }
360360+ print " ok: correct host removed, other assignment intact"
361361+362362+ # remove the second host
363363+ http delete -f -e -t application/json $"($url)/pds/tiers" --data {
364364+ host: "other.example.com"
365365+ } | assert-status 200 "DELETE /pds/tiers second" $pid
366366+367367+ # deleting a non-existent host is idempotent (returns 200, not an error)
368368+ print " DELETE /pds/tiers (non-existent, expect 200)..."
369369+ http delete -f -e -t application/json $"($url)/pds/tiers" --data {
370370+ host: "pds.example.com"
371371+ } | assert-status 200 "DELETE /pds/tiers non-existent" $pid
372372+ let after_idempotent = (http get $"($url)/pds/tiers")
373373+ if ($after_idempotent.assignments | length) != 0 {
374374+ fail "expected empty assignments after cleanup" $pid
375375+ }
376376+ print " ok: delete of non-existent host is idempotent"
377377+378378+ print "pds tier management tests passed!"
379379+}
380380+381381+# verify that tier assignments are written to the database and survive a restart.
382382+def test-pds-tier-persistence [binary: string, db_path: string, port: int] {
383383+ print "=== test: pds tier assignments persist across restart ==="
384384+385385+ let url = $"http://localhost:($port)"
386386+387387+ let instance = (with-env { HYDRANT_CRAWLER_URLS: "", HYDRANT_RELAY_HOSTS: "" } {
388388+ start-hydrant $binary $db_path $port
389389+ })
390390+ if not (wait-for-api $url) {
391391+ fail "hydrant did not start"
392392+ }
393393+394394+ print " assigning host to trusted tier..."
395395+ http put -t application/json $"($url)/pds/tiers" {
396396+ host: "persist.example.com",
397397+ tier: "trusted"
398398+ }
399399+400400+ let before = (http get $"($url)/pds/tiers")
401401+ if ($before.assignments | length) != 1 {
402402+ fail "assignment was not created" $instance.pid
403403+ }
404404+405405+ print " restarting hydrant..."
406406+ kill $instance.pid
407407+ sleep 2sec
408408+409409+ let instance2 = (with-env { HYDRANT_CRAWLER_URLS: "", HYDRANT_RELAY_HOSTS: "" } {
410410+ start-hydrant $binary $db_path $port
411411+ })
412412+ if not (wait-for-api $url) {
413413+ fail "hydrant did not restart" $instance2.pid
414414+ }
415415+416416+ print " checking assignment survived restart..."
417417+ let after = (http get $"($url)/pds/tiers")
418418+ if ($after.assignments | length) != 1 {
419419+ fail $"expected 1 assignment after restart, got ($after.assignments | length)" $instance2.pid
420420+ }
421421+ let a = ($after.assignments | first)
422422+ if $a.host != "persist.example.com" {
423423+ fail $"expected host=persist.example.com after restart, got ($a.host)" $instance2.pid
424424+ }
425425+ if $a.tier != "trusted" {
426426+ fail $"expected tier=trusted after restart, got ($a.tier)" $instance2.pid
427427+ }
428428+ print " ok: tier assignment persisted across restart"
429429+430430+ kill $instance2.pid
431431+ print "pds tier persistence test passed!"
432432+}
433433+434434+# verify that HYDRANT_TRUSTED_HOSTS pre-assigns hosts to the trusted tier at startup.
435435+def test-pds-trusted-hosts [binary: string, db_path: string, port: int] {
436436+ print "=== test: HYDRANT_TRUSTED_HOSTS pre-assigns tier at startup ==="
437437+438438+ let url = $"http://localhost:($port)"
439439+ let host_a = "alpha.example.com"
440440+ let host_b = "beta.example.com"
441441+442442+ let instance = (with-env {
443443+ HYDRANT_CRAWLER_URLS: "",
444444+ HYDRANT_RELAY_HOSTS: "",
445445+ HYDRANT_TRUSTED_HOSTS: $"($host_a),($host_b)"
446446+ } {
447447+ start-hydrant $binary $db_path $port
448448+ })
449449+ if not (wait-for-api $url) {
450450+ fail "hydrant did not start"
451451+ }
452452+453453+ print " checking pre-assigned trusted hosts..."
454454+ let tiers = (http get $"($url)/pds/tiers")
455455+ let assignments = $tiers.assignments
456456+457457+ for host in [$host_a, $host_b] {
458458+ let match = ($assignments | where host == $host)
459459+ if ($match | length) != 1 {
460460+ fail $"expected assignment for ($host) from HYDRANT_TRUSTED_HOSTS, got ($assignments)" $instance.pid
461461+ }
462462+ if ($match | first).tier != "trusted" {
463463+ fail $"expected tier=trusted for ($host), got (($match | first).tier)" $instance.pid
464464+ }
465465+ }
466466+ print $" ok: ($host_a) and ($host_b) pre-assigned to trusted tier"
467467+468468+ kill $instance.pid
469469+ print "trusted hosts startup test passed!"
470470+}
471471+472472+# verify that a custom tier defined via HYDRANT_RATE_TIERS is visible and assignable.
473473+def test-pds-custom-rate-tier [binary: string, db_path: string, port: int] {
474474+ print "=== test: custom rate tier via HYDRANT_RATE_TIERS ==="
475475+476476+ let url = $"http://localhost:($port)"
477477+478478+ # custom:100/1.0/360000/8640000 — base=100, mul=1.0, hourly=360000, daily=8640000
479479+ let instance = (with-env {
480480+ HYDRANT_CRAWLER_URLS: "",
481481+ HYDRANT_RELAY_HOSTS: "",
482482+ HYDRANT_RATE_TIERS: "custom:100/1.0/360000/8640000"
483483+ } {
484484+ start-hydrant $binary $db_path $port
485485+ })
486486+ if not (wait-for-api $url) {
487487+ fail "hydrant did not start"
488488+ }
489489+490490+ # custom tier should appear alongside the built-in tiers
491491+ print " checking custom tier is listed in /pds/rate-tiers..."
492492+ let rate_tiers = (http get $"($url)/pds/rate-tiers")
493493+ if not ("custom" in $rate_tiers) {
494494+ fail "expected 'custom' tier in rate_tiers" $instance.pid
495495+ }
496496+ if not ("default" in $rate_tiers) {
497497+ fail "built-in 'default' tier should still be present alongside custom tier" $instance.pid
498498+ }
499499+ let custom = ($rate_tiers | get custom)
500500+ if $custom.per_second_base != 100 {
501501+ fail $"expected custom.per_second_base=100, got ($custom.per_second_base)" $instance.pid
502502+ }
503503+ if $custom.per_hour != 360000 {
504504+ fail $"expected custom.per_hour=360000, got ($custom.per_hour)" $instance.pid
505505+ }
506506+ print $" ok: custom tier listed with correct parameters"
507507+508508+ # a host can be assigned to the custom tier
509509+ print " assigning host to custom tier..."
510510+ http put -f -e -t application/json $"($url)/pds/tiers" {
511511+ host: "custom.example.com",
512512+ tier: "custom"
513513+ } | assert-status 200 "PUT /pds/tiers custom tier" $instance.pid
514514+ let after = (http get $"($url)/pds/tiers")
515515+ let match = ($after.assignments | where host == "custom.example.com")
516516+ if ($match | length) != 1 {
517517+ fail "expected assignment for custom.example.com" $instance.pid
518518+ }
519519+ if ($match | first).tier != "custom" {
520520+ fail $"expected tier=custom, got (($match | first).tier)" $instance.pid
521521+ }
522522+ print " ok: host assigned to custom tier successfully"
523523+524524+ kill $instance.pid
525525+ print "custom rate tier test passed!"
526526+}
527527+253528def main [] {
254529 let port = resolve-test-port 3007
255530 let url = $"http://localhost:($port)"
···268543269544 test-crawler-sources $url $instance.pid
270545 test-firehose-sources $url $instance.pid
546546+ test-pds-tiers $url $instance.pid
271547272548 kill $instance.pid
273549 sleep 2sec
···281557 let db_config = (mktemp -d -t hydrant_api.XXXXXX)
282558 print $"db: ($db_config)"
283559 test-config-source-not-persisted $binary $db_config $port
560560+561561+ sleep 1sec
562562+563563+ let db_pds_persist = (mktemp -d -t hydrant_api.XXXXXX)
564564+ print $"db: ($db_pds_persist)"
565565+ test-pds-tier-persistence $binary $db_pds_persist $port
566566+567567+ sleep 1sec
568568+569569+ let db_pds_trusted = (mktemp -d -t hydrant_api.XXXXXX)
570570+ print $"db: ($db_pds_trusted)"
571571+ test-pds-trusted-hosts $binary $db_pds_trusted $port
572572+573573+ sleep 1sec
574574+575575+ let db_pds_custom = (mktemp -d -t hydrant_api.XXXXXX)
576576+ print $"db: ($db_pds_custom)"
577577+ test-pds-custom-rate-tier $binary $db_pds_custom $port
284578285579 print ""
286580 print "all api tests passed!"