lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

very sketchy requests-in-flight gauge

phil 31bf417f b27a8854

+13 -1
+8 -1
src/http.rs
··· 112 112 for (name, value) in &parts.headers { 113 113 req = req.header(name, value); 114 114 } 115 - let resp = req.send().await?; 115 + metrics::gauge!("lightrail_http_requests_in_flight").increment(1); 116 + let resp = req.send().await; 117 + metrics::gauge!("lightrail_http_requests_in_flight").decrement(1); 118 + let resp = resp?; 116 119 117 120 let status = resp.status(); 118 121 let mut builder = http::Response::builder().status(status); ··· 138 141 metrics::gauge!("lightrail_http_host_throttling").decrement(1); 139 142 } 140 143 } 144 + metrics::gauge!("lightrail_http_requests_in_flight").increment(1); 145 + // decremented in get_repo (sketttch) 141 146 self.inner 142 147 .send_http_streaming(http::Request::from_parts(parts, body)) 143 148 .await ··· 160 165 metrics::gauge!("lightrail_http_host_throttling").decrement(1); 161 166 } 162 167 } 168 + metrics::gauge!("lightrail_http_requests_in_flight").increment(1); 169 + // decremented in get_repo (sketttch) 163 170 self.inner.send_http_bidirectional(parts, body).await 164 171 } 165 172 }
+5
src/sync/resync/get_repo.rs
··· 35 35 .xrpc(base.clone()) 36 36 .download(&req) 37 37 .await 38 + .inspect_err(|_| metrics::gauge!("lightrail_http_requests_in_flight").decrement(1)) 38 39 .map_err(|e| GetCollectionsError::Request(e.to_string()))?; 39 40 40 41 let status = response.status(); 41 42 let (_, body) = response.into_parts(); 42 43 43 44 if status.as_u16() == 429 { 45 + metrics::gauge!("lightrail_http_requests_in_flight").decrement(1); 44 46 return Err(GetCollectionsError::RateLimited(host)); 45 47 } 46 48 if !status.is_success() { 49 + metrics::gauge!("lightrail_http_requests_in_flight").decrement(1); 47 50 return Err(classify_xrpc_error(body, status).await); 48 51 } 49 52 ··· 61 64 .with_mem_limit_mb(CAR_MEM_LIMIT_MB) 62 65 .load_car(reader) 63 66 .await 67 + .inspect_err(|_| metrics::gauge!("lightrail_http_requests_in_flight").decrement(1)) 64 68 .map_err(|e| match e { 65 69 LoadError::MemoryLimitReached(_) => GetCollectionsError::InvalidData(format!( 66 70 "repo CAR exceeds {CAR_MEM_LIMIT_MB} MiB memory limit" 67 71 )), 68 72 e => GetCollectionsError::InvalidData(format!("CAR load error: {e}")), 69 73 })?; 74 + metrics::gauge!("lightrail_http_requests_in_flight").decrement(1); 70 75 71 76 let rev = Tid::new(&mem_car.commit.rev) 72 77 .map_err(|e| GetCollectionsError::InvalidData(format!("bad rev in commit: {e}")))?;