lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

track currently-throttling hosts

phil 319ac2de 9f655176

+236 -46
+93 -38
src/http.rs
··· 9 9 //! resync pipeline resolves DIDs to PDS endpoints before fetching, workers 10 10 //! will naturally be rate-limited against the correct PDS host. 11 11 12 + use std::collections::HashMap; 12 13 use std::num::NonZeroU32; 13 - use std::sync::Arc; 14 + use std::sync::{ 15 + Arc, 16 + atomic::{AtomicU64, Ordering}, 17 + }; 14 18 use std::time::Duration; 15 19 16 - use dashmap::DashMap; 17 - use governor::{DefaultDirectRateLimiter, Quota, RateLimiter}; 20 + use dashmap::{DashMap, DashSet}; 21 + use governor::{ 22 + DefaultDirectRateLimiter, Quota, RateLimiter, 23 + clock::{Clock, QuantaClock}, 24 + }; 18 25 use jacquard_common::http_client::{HttpClient, HttpClientExt}; 19 26 use jacquard_common::stream::{ByteStream, StreamError}; 27 + use tracing::warn; 20 28 21 - const THROTTLE_JITTER_MS: f32 = 16.; 29 + const THROTTLE_JITTER_MS: f32 = 16.0; 30 + 31 + struct HostLimiting { 32 + limiter: DefaultDirectRateLimiter, 33 + limiting: DashSet<(u64, String)>, 34 + } 35 + 36 + impl HostLimiting { 37 + fn new(quota: Quota) -> Self { 38 + Self { 39 + limiter: RateLimiter::direct(quota), 40 + limiting: DashSet::new(), 41 + } 42 + } 43 + } 22 44 23 45 /// State shared across all clones of a [`ThrottledClient`]. 24 - struct Shared { 25 - /// Duration of one token at the configured rate (= 1s / rate). 26 - token_interval: Duration, 46 + struct Limiters { 47 + current_id: AtomicU64, 27 48 /// One GCRA limiter per hostname. Entries are never evicted; the number 28 49 /// of distinct hosts contacted is small enough to be unbounded in memory. 29 - limiters: DashMap<String, Arc<DefaultDirectRateLimiter>>, 50 + limiters: DashMap<String, Arc<HostLimiting>>, 30 51 /// Per-host quota, kept for creating new limiters on demand. 31 52 quota: Quota, 32 53 } 33 54 34 - impl Shared { 35 - fn get_or_create_limiter(&self, host: &str) -> Arc<DefaultDirectRateLimiter> { 36 - Arc::clone( 55 + impl Limiters { 56 + fn get_or_create_limiter(&self, host: &str) -> (u64, Arc<HostLimiting>) { 57 + let id = self.current_id.fetch_add(1, Ordering::Relaxed); // just needs to be different each time 58 + let limiting = Arc::clone( 37 59 self.limiters 38 60 .entry(host.to_string()) 39 - .or_insert_with(|| Arc::new(RateLimiter::direct(self.quota))) 61 + .or_insert_with(|| Arc::new(HostLimiting::new(self.quota))) 40 62 .value(), 41 - ) 63 + ); 64 + (id, limiting) 42 65 } 43 66 44 - fn jittered_interval(&self) -> Duration { 45 - let secs = fastrand::f32() * THROTTLE_JITTER_MS / 1000.0; 46 - self.token_interval + Duration::from_secs_f32(secs) 67 + async fn limit(&self, host: &str, path: &str) { 68 + let (id, limiting) = self.get_or_create_limiter(host); 69 + let mut throttled = false; 70 + while let Err(not_until) = limiting.limiter.check() { 71 + if !throttled { 72 + throttled = true; 73 + metrics::gauge!("lightrail_http_host_throttling").increment(1); 74 + limiting.limiting.insert((id, path.to_string())); 75 + } 76 + let min_wait = not_until.wait_time_from(QuantaClock::default().now()); 77 + let jitter = Duration::from_millis((fastrand::f32() * THROTTLE_JITTER_MS) as u64); 78 + let wait_total = min_wait + jitter; 79 + metrics::counter!("lightrail_http_throttle_time_total_ms") 80 + .increment(wait_total.as_millis() as u64); 81 + tokio::time::sleep(wait_total).await; 82 + } 83 + if throttled { 84 + metrics::gauge!("lightrail_http_host_throttling").decrement(1); 85 + limiting.limiting.remove(&(id, path.to_string())); 86 + } 47 87 } 48 88 } 49 89 ··· 56 96 #[derive(Clone)] 57 97 pub struct ThrottledClient { 58 98 inner: reqwest::Client, 59 - shared: Arc<Shared>, 99 + limiters: Arc<Limiters>, 60 100 } 61 101 62 102 impl ThrottledClient { ··· 72 112 let quota = Quota::per_second(rate_per_second); 73 113 Self { 74 114 inner, 75 - shared: Arc::new(Shared { 76 - token_interval: Duration::from_secs(1) / rate_per_second.get(), 115 + limiters: Arc::new(Limiters { 116 + current_id: AtomicU64::new(0), 77 117 limiters: DashMap::new(), 78 118 quota, 79 119 }), ··· 81 121 } 82 122 } 83 123 124 + impl ThrottledClient { 125 + /// Return the set of all PDS hostnames that have rate limiters. 126 + pub fn currently_limiting(&self) -> HashMap<String, HashMap<String, u64>> { 127 + self.limiters 128 + .limiters 129 + .iter() 130 + .map(|entry| { 131 + let (k, v) = entry.pair(); 132 + let path_counts = v.limiting.iter().map(|entry| entry.key().1.clone()).fold( 133 + HashMap::new(), 134 + |mut acc: std::collections::HashMap<_, u64>, path| { 135 + *acc.entry(path.to_string()).or_default() += 1; 136 + acc 137 + }, 138 + ); 139 + (k.to_string(), path_counts) 140 + }) 141 + .filter(|(_, v)| !v.is_empty()) 142 + .collect() 143 + } 144 + } 145 + 84 146 /// Build the shared HTTP client used for all outbound ATProto requests. 85 147 pub fn build_client(rate_per_sec: NonZeroU32) -> ThrottledClient { 86 148 ThrottledClient::new(rate_per_sec) ··· 96 158 let (parts, body) = request.into_parts(); 97 159 98 160 if let Some(host) = parts.uri.host() { 99 - let limiter = self.shared.get_or_create_limiter(host); 100 - while limiter.check().is_err() { 101 - metrics::gauge!("lightrail_http_host_throttling").increment(1); 102 - // i think we should be limiter.until_ready_with_jitter().await! 103 - tokio::time::sleep(self.shared.jittered_interval()).await; 104 - metrics::gauge!("lightrail_http_host_throttling").decrement(1); 105 - } 161 + self.limiters.limit(host, parts.uri.path()).await; 162 + } else { 163 + warn!(uri = %parts.uri, "failed to get host for rate limiting"); 106 164 } 107 165 108 166 let mut req = self ··· 133 191 request: http::Request<Vec<u8>>, 134 192 ) -> Result<http::Response<ByteStream>, Self::Error> { 135 193 let (parts, body) = request.into_parts(); 194 + 136 195 if let Some(host) = parts.uri.host() { 137 - let limiter = self.shared.get_or_create_limiter(host); 138 - while limiter.check().is_err() { 139 - metrics::gauge!("lightrail_http_host_throttling").increment(1); 140 - tokio::time::sleep(self.shared.jittered_interval()).await; 141 - metrics::gauge!("lightrail_http_host_throttling").decrement(1); 142 - } 196 + self.limiters.limit(host, parts.uri.path()).await; 197 + } else { 198 + warn!(uri = %parts.uri, "failed to get host for rate limiting"); 143 199 } 200 + 144 201 metrics::gauge!("lightrail_http_requests_in_flight").increment(1); 145 202 // decremented in get_repo (sketttch) 146 203 self.inner ··· 158 215 S: n0_future::Stream<Item = Result<bytes::Bytes, StreamError>> + Send + 'static, 159 216 { 160 217 if let Some(host) = parts.uri.host() { 161 - let limiter = self.shared.get_or_create_limiter(host); 162 - while limiter.check().is_err() { 163 - metrics::gauge!("lightrail_http_host_throttling").increment(1); 164 - tokio::time::sleep(self.shared.jittered_interval()).await; 165 - metrics::gauge!("lightrail_http_host_throttling").decrement(1); 166 - } 218 + self.limiters.limit(host, parts.uri.path()).await; 219 + } else { 220 + warn!(uri = %parts.uri, "failed to get host for rate limiting"); 167 221 } 222 + 168 223 metrics::gauge!("lightrail_http_requests_in_flight").increment(1); 169 224 // decremented in get_repo (sketttch) 170 225 self.inner.send_http_bidirectional(parts, body).await
+6
src/identity.rs
··· 205 205 self.cache.insert(did, pds, pubkey); 206 206 } 207 207 208 + /// Look up `did` in the cache without making network calls. 209 + /// Returns `None` on cache miss. 210 + pub fn resolve_cached(&self, did: &Did<'_>) -> Option<Arc<CachedIdentity>> { 211 + self.cache.get(did) 212 + } 213 + 208 214 /// Evict `did` from the identity cache. 209 215 /// 210 216 /// Called when a `#identity` firehose event is received, after all
+19 -4
src/main.rs
··· 146 146 let client = lightrail::http::build_client(args.crawl_qps); 147 147 let token = CancellationToken::new(); 148 148 149 + let dispatcher_state: resync::dispatcher::DispatcherState = std::sync::Arc::new( 150 + std::sync::Mutex::new(resync::dispatcher::DispatcherSnapshot::default()), 151 + ); 152 + 149 153 let mut tasks: JoinSet<Result<()>> = JoinSet::new(); 150 154 151 155 tasks.spawn({ ··· 202 206 let db = db.clone(); 203 207 let client = client.clone(); 204 208 let resolver = resolver.clone(); 209 + let dispatcher_state = dispatcher_state.clone(); 205 210 async move { 206 211 resync::dispatcher::run(resync::DispatcherConfig { 207 212 resolver, ··· 212 217 get_repo_timeout: Duration::from_secs(args.get_repo_fetch_timeout_secs), 213 218 token, 214 219 force_get_repo: args.heavy, 220 + state: dispatcher_state, 215 221 }) 216 222 .await 217 223 .inspect(|_| info!("resync done.")) ··· 223 229 let token = token.clone(); 224 230 let db = db.clone(); 225 231 let addr = args.listen; 232 + let dispatcher_state = dispatcher_state.clone(); 233 + let client = client.clone(); 226 234 let admin_config = args 227 235 .admin_password 228 236 .map(|pw| lightrail::server::AdminConfig { ··· 230 238 admin_password: pw, 231 239 }); 232 240 async move { 233 - lightrail::server::serve(addr, db, token, admin_config) 234 - .await 235 - .inspect(|_| info!("server done.")) 236 - .inspect_err(|e| warn!(error = %e, "server exited")) 241 + lightrail::server::serve( 242 + addr, 243 + db, 244 + token, 245 + admin_config, 246 + Some(dispatcher_state), 247 + Some(client), 248 + ) 249 + .await 250 + .inspect(|_| info!("server done.")) 251 + .inspect_err(|e| warn!(error = %e, "server exited")) 237 252 } 238 253 }); 239 254
+15
src/server/admin.rs
··· 9 9 use serde_json::json; 10 10 use subtle::ConstantTimeEq; 11 11 12 + use std::collections::HashMap; 12 13 use std::sync::atomic::Ordering; 13 14 15 + use crate::http::ThrottledClient; 14 16 use crate::storage::{DbRef, backfill_progress, error::StorageError}; 17 + use crate::sync::resync::dispatcher::{DispatcherSnapshot, DispatcherState}; 15 18 16 19 use super::AdminConfig; 17 20 ··· 37 40 // ── Backfill progress (live from storage) ───────────────────────────── 38 41 upstream_backfill_complete: bool, 39 42 upstream_backfill_completed_at: Option<String>, 43 + // ── Live dispatcher state ────────────────────────────────────────── 44 + #[serde(skip_serializing_if = "Option::is_none")] 45 + dispatcher: Option<DispatcherSnapshot>, 46 + #[serde(skip_serializing_if = "Option::is_none")] 47 + throttled_hosts: Option<HashMap<String, HashMap<String, u64>>>, 40 48 } 41 49 42 50 pub(super) enum AdminStatusError { ··· 76 84 pub async fn admin_status( 77 85 State(db): State<DbRef>, 78 86 Extension(config): Extension<AdminConfig>, 87 + dispatcher_ext: Option<Extension<DispatcherState>>, 88 + client_ext: Option<Extension<ThrottledClient>>, 79 89 headers: HeaderMap, 80 90 ) -> Result<Json<AdminStatus>, AdminStatusError> { 81 91 if !check_basic_auth(&headers, &config.admin_password) { ··· 87 97 let backfill = tokio::task::spawn_blocking(move || backfill_progress::get(&db2, &host)) 88 98 .await 89 99 .unwrap()?; 100 + 101 + let dispatcher = dispatcher_ext.map(|Extension(state)| state.lock().unwrap().clone()); 102 + let throttled_hosts = client_ext.map(|Extension(client)| client.currently_limiting()); 90 103 91 104 let s = &db.stats; 92 105 Ok(Json(AdminStatus { ··· 114 127 .and_then(|b| b.completed_at.as_ref()) 115 128 .is_some(), 116 129 upstream_backfill_completed_at: backfill.and_then(|b| b.completed_at), 130 + dispatcher, 131 + throttled_hosts, 117 132 })) 118 133 } 119 134
+14 -2
src/server/mod.rs
··· 25 25 use jacquard_axum::IntoRouter; 26 26 27 27 use crate::error::Result; 28 + use crate::http::ThrottledClient; 28 29 use crate::storage::DbRef; 30 + use crate::sync::resync::dispatcher::DispatcherState; 29 31 30 32 /// Config for the admin endpoints. Only constructed when `--admin-password` is 31 33 /// set; when absent the `/admin/*` routes are not registered at all. ··· 51 53 db: DbRef, 52 54 token: tokio_util::sync::CancellationToken, 53 55 admin_config: Option<AdminConfig>, 56 + dispatcher_state: Option<DispatcherState>, 57 + client: Option<ThrottledClient>, 54 58 ) -> Result<()> { 55 59 let base = GetRepoStatusRequest::into_router(get_repo_status) 56 60 .merge(ListReposRequest::into_router(list_repos)) ··· 59 63 )); 60 64 61 65 let app = if let Some(config) = admin_config { 62 - base.route("/admin/status", axum::routing::get(admin_status)) 66 + let mut app = base 67 + .route("/admin/status", axum::routing::get(admin_status)) 63 68 .route("/", axum::routing::get(hello::hello)) 64 69 .with_state(db) 65 - .layer(axum::Extension(config)) 70 + .layer(axum::Extension(config)); 71 + if let Some(ds) = dispatcher_state { 72 + app = app.layer(axum::Extension(ds)); 73 + } 74 + if let Some(c) = client { 75 + app = app.layer(axum::Extension(c)); 76 + } 77 + app 66 78 } else { 67 79 base.route("/", axum::routing::get(hello::hello)) 68 80 .with_state(db)
+89 -2
src/sync/resync/dispatcher.rs
··· 13 13 use jacquard_api::com_atproto::sync::subscribe_repos::Commit; 14 14 use jacquard_common::{IntoStatic, types::string::Did, url::Url}; 15 15 use std::collections::{HashMap, HashSet}; 16 - use std::sync::Arc; 16 + use std::sync::{Arc, Mutex, atomic::Ordering}; 17 17 use std::time::{Duration, Instant, SystemTime}; 18 18 19 19 use super::ResyncError; ··· 21 21 use tokio::task::{Id as TaskId, JoinSet}; 22 22 use tracing::{debug, error, info, trace, warn}; 23 23 24 - use std::sync::atomic::Ordering; 24 + use serde::Serialize; 25 25 26 26 use crate::error::Result; 27 27 use crate::storage::{ ··· 31 31 }; 32 32 use crate::util::TokenExt; 33 33 34 + /// Point-in-time view of the dispatcher's operational state, published for 35 + /// the admin endpoint to read. 36 + #[derive(Clone, Default, Serialize)] 37 + pub struct DispatcherSnapshot { 38 + /// DIDs currently being resynced, with their PDS host. 39 + pub busy: Vec<BusyEntry>, 40 + /// PDS hosts in cooldown after a 429. 41 + pub cooling: Vec<CoolingEntry>, 42 + /// Per-host active worker counts. 43 + pub hosts: Vec<HostEntry>, 44 + /// Total number of worker tasks in the JoinSet. 45 + pub worker_count: usize, 46 + } 47 + 48 + #[derive(Clone, Serialize)] 49 + pub struct BusyEntry { 50 + pub did: String, 51 + pub host: String, 52 + } 53 + 54 + #[derive(Clone, Serialize)] 55 + pub struct CoolingEntry { 56 + pub host: String, 57 + pub remaining_secs: f64, 58 + } 59 + 60 + #[derive(Clone, Serialize)] 61 + pub struct HostEntry { 62 + pub host: String, 63 + pub workers: usize, 64 + } 65 + 66 + /// Shared handle to the latest dispatcher snapshot. 67 + pub type DispatcherState = Arc<Mutex<DispatcherSnapshot>>; 68 + 34 69 /// How long to wait between queue polls when no workers are running. 35 70 const IDLE_POLL: Duration = Duration::from_millis(500); 36 71 ··· 49 84 pub get_repo_timeout: std::time::Duration, 50 85 pub token: tokio_util::sync::CancellationToken, 51 86 pub force_get_repo: bool, 87 + pub state: DispatcherState, 52 88 } 53 89 54 90 /// Run the resync dispatcher until the future is cancelled. ··· 66 102 get_repo_timeout, 67 103 token, 68 104 force_get_repo, 105 + state, 69 106 }: DispatcherConfig, 70 107 ) -> Result<()> { 71 108 let mut busy: HashSet<Did<'static>> = HashSet::new(); ··· 253 290 re_enqueue_panic_recovery(did, db.clone()).await; 254 291 } 255 292 } 293 + } 294 + 295 + // Publish snapshot for the admin endpoint. 296 + { 297 + let now_inst = Instant::now(); 298 + let snap = DispatcherSnapshot { 299 + busy: task_dids 300 + .values() 301 + .map(|did| { 302 + let host = resolver 303 + .resolve_cached(did) 304 + .and_then(|r| r.pds.host_str().map(str::to_owned)) 305 + .unwrap_or_default(); 306 + BusyEntry { 307 + did: did.to_string(), 308 + host, 309 + } 310 + }) 311 + .collect(), 312 + cooling: cooling_hosts 313 + .iter() 314 + .filter_map(|(url, until)| { 315 + let remaining = until.checked_duration_since(now_inst)?; 316 + Some(CoolingEntry { 317 + host: url.host_str().unwrap_or("unknown").to_owned(), 318 + remaining_secs: remaining.as_secs_f64(), 319 + }) 320 + }) 321 + .collect(), 322 + hosts: { 323 + // Count workers per host from task_dids via resolver cache 324 + let mut counts: HashMap<String, usize> = HashMap::new(); 325 + for did in task_dids.values() { 326 + let host = resolver 327 + .resolve_cached(did) 328 + .and_then(|r| r.pds.host_str().map(str::to_owned)) 329 + .unwrap_or_default(); 330 + *counts.entry(host).or_default() += 1; 331 + } 332 + let mut entries: Vec<_> = counts 333 + .into_iter() 334 + .map(|(host, workers)| HostEntry { host, workers }) 335 + .collect(); 336 + entries.sort_by(|a, b| b.workers.cmp(&a.workers)); 337 + entries 338 + }, 339 + worker_count: workers.len(), 340 + }; 341 + // Unwrap is safe: we never poison the lock (no panics while holding it). 342 + *state.lock().unwrap() = snap; 256 343 } 257 344 } 258 345 }