lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

jitter the upstream throttle

phil 85320b40 78dc0b7b

+22 -12
+1
Cargo.lock
··· 2342 2342 "cid", 2343 2343 "clap", 2344 2344 "dashmap 5.5.3", 2345 + "fastrand", 2345 2346 "fjall", 2346 2347 "futures", 2347 2348 "governor",
+1
Cargo.toml
··· 11 11 cid = { version = "0.11", default-features = false, features = ["alloc"] } 12 12 clap = { version = "4.5.60", features = ["derive", "env"] } 13 13 dashmap = "5" 14 + fastrand = "2.3.0" 14 15 fjall = "3.0.3" 15 16 futures = "0.3" 16 17 governor = "0.6"
+4 -3
hacking.md
··· 121 121 - [x] repo-stream: drop record block contents with processor fn 122 122 - [x] in getRecord before describeRepo 123 123 - [x] in commit handling 124 + - [x] meta/metrics keyspace for general stats 125 + - [x] total repos (hyperloglog estimate?) 126 + - [x] resync queue size 127 + - [x] jitter http throttle 124 128 - [x] commit CAR handling: generate a list of keys with gaps noted, to reliably detect missing adjacent keys 125 129 - [ ] watch logs for errors now that we're strict 126 - - [ ] meta/metrics keyspace for general stats 127 - - [ ] total repos (hyperloglog estimate?) 128 - - [ ] resync queue size 129 130 130 131 very much still todo but i'm getting tired 131 132 - [x] config: add a `--heavy` mode that always uses `getRepo` and never `describeRepo`
+16 -9
src/http.rs
··· 18 18 use jacquard_common::http_client::{HttpClient, HttpClientExt}; 19 19 use jacquard_common::stream::{ByteStream, StreamError}; 20 20 21 + const THROTTLE_JITTER_MS: f32 = 16.; 22 + 21 23 /// State shared across all clones of a [`ThrottledClient`]. 22 24 struct Shared { 23 25 /// Duration of one token at the configured rate (= 1s / rate). ··· 37 39 .or_insert_with(|| Arc::new(RateLimiter::direct(self.quota))) 38 40 .value(), 39 41 ) 42 + } 43 + 44 + fn jittered_interval(&self) -> Duration { 45 + let secs = fastrand::f32() * THROTTLE_JITTER_MS / 1000.0; 46 + self.token_interval + Duration::from_secs_f32(secs) 40 47 } 41 48 } 42 49 ··· 91 98 if let Some(host) = parts.uri.host() { 92 99 let limiter = self.shared.get_or_create_limiter(host); 93 100 while limiter.check().is_err() { 94 - metrics::gauge!("lightrail_http_host_thorottling").increment(1); 95 - tokio::time::sleep(self.shared.token_interval).await; 96 - metrics::gauge!("lightrail_http_host_thorottling").decrement(1); 101 + metrics::gauge!("lightrail_http_host_throttling").increment(1); 102 + tokio::time::sleep(self.shared.jittered_interval()).await; 103 + metrics::gauge!("lightrail_http_host_throttling").decrement(1); 97 104 } 98 105 } 99 106 ··· 125 132 if let Some(host) = parts.uri.host() { 126 133 let limiter = self.shared.get_or_create_limiter(host); 127 134 while limiter.check().is_err() { 128 - metrics::gauge!("lightrail_http_host_thorottling").increment(1); 129 - tokio::time::sleep(self.shared.token_interval).await; 130 - metrics::gauge!("lightrail_http_host_thorottling").decrement(1); 135 + metrics::gauge!("lightrail_http_host_throttling").increment(1); 136 + tokio::time::sleep(self.shared.jittered_interval()).await; 137 + metrics::gauge!("lightrail_http_host_throttling").decrement(1); 131 138 } 132 139 } 133 140 self.inner ··· 147 154 if let Some(host) = parts.uri.host() { 148 155 let limiter = self.shared.get_or_create_limiter(host); 149 156 while limiter.check().is_err() { 150 - metrics::gauge!("lightrail_http_host_thorottling").increment(1); 151 - tokio::time::sleep(self.shared.token_interval).await; 152 - metrics::gauge!("lightrail_http_host_thorottling").decrement(1); 157 + metrics::gauge!("lightrail_http_host_throttling").increment(1); 158 + tokio::time::sleep(self.shared.jittered_interval()).await; 159 + metrics::gauge!("lightrail_http_host_throttling").decrement(1); 153 160 } 154 161 } 155 162 self.inner.send_http_bidirectional(parts, body).await