lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix some metrics under cancellation with guards

phil 4f9f062e bb0e7e45

+108 -23
+108 -18
src/http.rs
··· 11 11 12 12 use std::collections::HashMap; 13 13 use std::num::NonZeroU32; 14 + use std::pin::Pin; 14 15 use std::sync::{ 15 16 Arc, 16 17 atomic::{AtomicU64, Ordering}, 17 18 }; 19 + use std::task::{Context, Poll}; 18 20 use std::time::Duration; 19 21 20 22 use dashmap::{DashMap, DashSet}; ··· 28 30 29 31 const THROTTLE_JITTER_MS: f32 = 16.0; 30 32 33 + // ── In-flight request monitoring ───────────────────────────────────────── 34 + 35 + /// RAII guard that tracks an in-flight HTTP request in the metrics gauge. 36 + /// 37 + /// Increments on creation, decrements on drop. For streaming responses, 38 + /// pass this guard into a [`MonitoredStream`] via 39 + /// [`into_byte_stream`](InFlightGuard::into_byte_stream) — the stream 40 + /// drops the guard when it ends or is cancelled. 41 + struct InFlightGuard { 42 + label: &'static str, 43 + } 44 + 45 + impl InFlightGuard { 46 + fn new(label: &'static str) -> Self { 47 + metrics::gauge!("lightrail_http_requests_in_flight", "type" => label).increment(1); 48 + Self { label } 49 + } 50 + 51 + /// Transfer gauge ownership into a [`ByteStream`] wrapper. The guard 52 + /// will be dropped (decrementing the gauge) when the stream ends 53 + /// naturally or is dropped mid-flight (cancellation). 54 + fn into_byte_stream(self, response: http::Response<ByteStream>) -> http::Response<ByteStream> { 55 + let (parts, body) = response.into_parts(); 56 + let monitored = MonitoredStream { 57 + inner: body.into_inner(), 58 + guard: Some(self), 59 + }; 60 + http::Response::from_parts(parts, ByteStream::new(monitored)) 61 + } 62 + } 63 + 64 + impl Drop for InFlightGuard { 65 + fn drop(&mut self) { 66 + metrics::gauge!("lightrail_http_requests_in_flight", "type" => self.label).decrement(1); 67 + } 68 + } 69 + 70 + /// Stream wrapper that holds an [`InFlightGuard`] alive for the duration 71 + /// of the stream. When the inner stream returns `None` (finished), the 72 + /// guard is dropped immediately. If the stream is dropped before finishing 73 + /// (cancellation), the guard drops with it. 74 + struct MonitoredStream { 75 + inner: Pin<Box<dyn futures::stream::Stream<Item = Result<bytes::Bytes, StreamError>> + Send>>, 76 + guard: Option<InFlightGuard>, 77 + } 78 + 79 + impl futures::stream::Stream for MonitoredStream { 80 + type Item = Result<bytes::Bytes, StreamError>; 81 + 82 + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 83 + let result = self.inner.as_mut().poll_next(cx); 84 + if let Poll::Ready(None) = &result { 85 + // Stream finished — drop the guard now rather than waiting 86 + // for the MonitoredStream itself to be dropped. 87 + self.guard.take(); 88 + } 89 + result 90 + } 91 + } 92 + 93 + impl Unpin for MonitoredStream {} 94 + 95 + // ── Throttle tracking ──────────────────────────────────────────────────── 96 + 97 + /// RAII guard that removes a request from the throttle tracking set on drop. 98 + /// 99 + /// Without this, a future cancelled mid-sleep (e.g. by a cancellation token) 100 + /// would leak an entry in the `HostLimiting::limiting` set, causing 101 + /// `currently_limiting()` to report phantom blocked requests. 102 + struct ThrottleGuard { 103 + limiting: Arc<HostLimiting>, 104 + req_id: (u64, String), 105 + } 106 + 107 + impl Drop for ThrottleGuard { 108 + fn drop(&mut self) { 109 + self.limiting.limiting.remove(&self.req_id); 110 + metrics::gauge!("lightrail_http_host_throttling").decrement(1); 111 + } 112 + } 113 + 31 114 struct HostLimiting { 32 115 limiter: DefaultDirectRateLimiter, 33 116 limiting: DashSet<(u64, String)>, ··· 67 150 async fn limit(&self, host: &str, path: &str) { 68 151 let (id, limiting) = self.get_or_create_limiter(host); 69 152 let req_id = (id, path.to_string()); 70 - let mut throttled = false; 153 + let mut guard: Option<ThrottleGuard> = None; 71 154 while let Err(not_until) = limiting.limiter.check() { 72 - if !throttled { 73 - throttled = true; 155 + if guard.is_none() { 74 156 metrics::gauge!("lightrail_http_host_throttling").increment(1); 75 157 limiting.limiting.insert(req_id.clone()); 158 + guard = Some(ThrottleGuard { 159 + limiting: Arc::clone(&limiting), 160 + req_id: req_id.clone(), 161 + }); 76 162 } 77 163 let min_wait = not_until.wait_time_from(QuantaClock::default().now()); 78 164 let jitter = Duration::from_millis((fastrand::f32() * THROTTLE_JITTER_MS) as u64); ··· 81 167 .increment(wait_total.as_millis() as u64); 82 168 tokio::time::sleep(wait_total).await; 83 169 } 84 - if throttled { 85 - metrics::gauge!("lightrail_http_host_throttling").decrement(1); 86 - limiting.limiting.remove(&req_id); 87 - } 170 + // Disarm: we got our token, clean up now rather than waiting for drop. 171 + drop(guard); 88 172 } 89 173 } 174 + 175 + // ── ThrottledClient ────────────────────────────────────────────────────── 90 176 91 177 /// HTTP client that applies per-host GCRA rate limiting to every request. 92 178 /// ··· 171 257 for (name, value) in &parts.headers { 172 258 req = req.header(name, value); 173 259 } 174 - metrics::gauge!("lightrail_http_requests_in_flight").increment(1); 175 - let resp = req.send().await; 176 - metrics::gauge!("lightrail_http_requests_in_flight").decrement(1); 177 - let resp = resp?; 260 + let resp = { 261 + // Guard decrements the gauge when this scope exits (including on `?` early return). 262 + let _guard = InFlightGuard::new("send_http"); 263 + req.send().await? 264 + }; 178 265 179 266 let status = resp.status(); 180 267 let mut builder = http::Response::builder().status(status); ··· 199 286 warn!(uri = %parts.uri, "failed to get host for rate limiting"); 200 287 } 201 288 202 - metrics::gauge!("lightrail_http_requests_in_flight").increment(1); 203 - // decremented in get_repo (sketttch) 204 - self.inner 289 + let guard = InFlightGuard::new("send_http_streaming"); 290 + let resp = self 291 + .inner 205 292 .send_http_streaming(http::Request::from_parts(parts, body)) 206 - .await 293 + .await?; 294 + // Transfer gauge ownership into the response body stream — decrements 295 + // when the stream ends or is dropped (cancellation). 296 + Ok(guard.into_byte_stream(resp)) 207 297 } 208 298 209 299 #[cfg(not(target_arch = "wasm32"))] ··· 221 311 warn!(uri = %parts.uri, "failed to get host for rate limiting"); 222 312 } 223 313 224 - metrics::gauge!("lightrail_http_requests_in_flight").increment(1); 225 - // decremented in get_repo (sketttch) 226 - self.inner.send_http_bidirectional(parts, body).await 314 + let guard = InFlightGuard::new("send_http_bidirectional"); 315 + let resp = self.inner.send_http_bidirectional(parts, body).await?; 316 + Ok(guard.into_byte_stream(resp)) 227 317 } 228 318 }
-5
src/sync/resync/get_repo.rs
··· 35 35 .xrpc(base.clone()) 36 36 .download(&req) 37 37 .await 38 - .inspect_err(|_| metrics::gauge!("lightrail_http_requests_in_flight").decrement(1)) 39 38 .map_err(|e| GetCollectionsError::Request(e.to_string()))?; 40 39 41 40 let status = response.status(); 42 41 let (_, body) = response.into_parts(); 43 42 44 43 if status.as_u16() == 429 { 45 - metrics::gauge!("lightrail_http_requests_in_flight").decrement(1); 46 44 return Err(GetCollectionsError::RateLimited(host)); 47 45 } 48 46 if !status.is_success() { 49 - metrics::gauge!("lightrail_http_requests_in_flight").decrement(1); 50 47 return Err(classify_xrpc_error(body, status).await); 51 48 } 52 49 ··· 64 61 .with_mem_limit_mb(CAR_MEM_LIMIT_MB) 65 62 .load_car(reader) 66 63 .await 67 - .inspect_err(|_| metrics::gauge!("lightrail_http_requests_in_flight").decrement(1)) 68 64 .map_err(|e| match e { 69 65 LoadError::MemoryLimitReached(_) => GetCollectionsError::InvalidData(format!( 70 66 "repo CAR exceeds {CAR_MEM_LIMIT_MB} MiB memory limit" 71 67 )), 72 68 e => GetCollectionsError::InvalidData(format!("CAR load error: {e}")), 73 69 })?; 74 - metrics::gauge!("lightrail_http_requests_in_flight").decrement(1); 75 70 76 71 let rev = Tid::new(&mem_car.commit.rev) 77 72 .map_err(|e| GetCollectionsError::InvalidData(format!("bad rev in commit: {e}")))?;