lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

per-did event cap and firehose backpressure and

- pds `connect` timeout (shorter than the full timeout)
- handle `unreachable` from pds with aggressive backoff

the per-did event cap is hardcoded at 100 for now which feels like a lot even (many seconds of firehose). if it's exceeded, we drop the whole thing and transition the repo to desynced.

the global firehose queue cap is hardcoded at 2000 (and should probably be exposed as a cli arg). after that we backpressure against tungstenite, which has a limited read buffer and eventually will stop receiving events until we can process some. yay for bounded memory!

(the firehose cap is checked against the sum of all per-did queues)

additional metrics added for the queues

phil 0239ddbb d35325e0

+287 -37
+1
src/http.rs
··· 194 194 env!("CARGO_PKG_VERSION"), 195 195 ", https://tangled.org/microcosm.blue/lightrail" 196 196 )) 197 + .connect_timeout(Duration::from_secs(8)) 197 198 .build() 198 199 .expect("failed to build HTTP client"); 199 200 let quota = Quota::per_second(rate_per_second);
+149 -27
src/sync/firehose/event_dispatcher.rs
··· 22 22 //! cursor forever; the affected DID will accept a resync on restart. 23 23 24 24 use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; 25 - use std::sync::Arc; 26 - use std::time::Instant; 25 + use std::sync::{Arc, Mutex}; 26 + use std::time::{Instant, SystemTime}; 27 27 28 28 use jacquard_api::com_atproto::sync::subscribe_repos::{Account, Commit, Identity, Sync}; 29 29 use jacquard_common::types::string::Did; 30 30 use tokio::task::{Id as TaskId, JoinError, JoinSet}; 31 - use tracing::{error, trace, warn}; 31 + use tracing::{error, info, trace, warn}; 32 32 33 - use crate::storage::DbRef; 33 + use crate::storage::{ 34 + self, DbRef, 35 + repo::{AccountStatus, RepoInfo, RepoState}, 36 + resync_queue::ResyncItem, 37 + }; 34 38 35 39 /// Seqs outstanding longer than this are evicted from the watermark. 36 40 const STALL_EVICT_SECS: u64 = 20; 37 41 42 + /// Per-DID in-memory queue depth beyond which the DID is forcibly transitioned 43 + /// to `Desynchronized` and its queued events are dropped. 44 + /// 45 + /// Reaching this means a single DID's worker is stuck or its PDS is slow enough 46 + /// that commits are piling up in memory. Cheaper to resync the repo later than 47 + /// to hold many MB of CAR bytes per lagging DID. 48 + const PER_DID_QUEUE_CAP: usize = 100; 49 + 38 50 // --------------------------------------------------------------------------- 39 51 // Public types 40 52 // --------------------------------------------------------------------------- ··· 47 59 task_did_seq: HashMap<TaskId, (Did<'static>, i64)>, 48 60 /// Outstanding seqs → enqueue time; used for the watermark. 49 61 outstanding: BTreeMap<i64, Instant>, 62 + /// DIDs currently being transitioned to `Desynchronized` after hitting 63 + /// `PER_DID_QUEUE_CAP`. New events for these DIDs are dropped so they 64 + /// don't re-accumulate during the async storage write. 65 + /// 66 + /// The spawned transition task removes the DID from the set on completion; 67 + /// subsequent firehose events for the same DID will be dropped at step 2 68 + /// of commit_event processing since the repo is now `Desynchronized`. 69 + desyncing: Arc<Mutex<HashSet<Did<'static>>>>, 50 70 max_concurrent: usize, 51 71 resolver: Arc<crate::identity::Resolver>, 52 72 db: DbRef, ··· 114 134 workers: JoinSet::new(), 115 135 task_did_seq: HashMap::new(), 116 136 outstanding: BTreeMap::new(), 137 + desyncing: Arc::new(Mutex::new(HashSet::new())), 117 138 max_concurrent, 118 139 resolver, 119 140 db, ··· 124 145 /// Enqueue a commit for async processing. 125 146 pub fn enqueue(&mut self, commit: Box<Commit<'static>>, seq: i64) { 126 147 metrics::counter!("lightrail_firehose_events_total", "kind" => "commit").increment(1); 127 - self.outstanding.insert(seq, Instant::now()); 128 148 let did = commit.repo.clone(); 129 - self.queues 130 - .entry(did) 131 - .or_default() 132 - .push_back(PendingWork::Commit(PendingCommit { commit, seq })); 133 - self.try_fill(); 149 + self.enqueue_work( 150 + did, 151 + PendingWork::Commit(PendingCommit { commit, seq }), 152 + "commit", 153 + ); 134 154 } 135 155 136 156 /// Enqueue a `#sync` event for async processing. ··· 139 159 /// is processed in order relative to any in-flight commits for this DID. 140 160 pub fn enqueue_sync(&mut self, sync: Box<Sync<'static>>, seq: i64) { 141 161 metrics::counter!("lightrail_firehose_events_total", "kind" => "sync").increment(1); 142 - self.outstanding.insert(seq, Instant::now()); 143 162 let did = sync.did.clone(); 144 - self.queues 145 - .entry(did) 146 - .or_default() 147 - .push_back(PendingWork::Sync(PendingSync { sync, seq })); 148 - self.try_fill(); 163 + self.enqueue_work(did, PendingWork::Sync(PendingSync { sync, seq }), "sync"); 149 164 } 150 165 151 166 /// Enqueue a `#identity` event for async processing. ··· 156 171 /// re-resolve after the invalidation runs. 157 172 pub fn enqueue_identity(&mut self, identity: Box<Identity<'static>>, seq: i64) { 158 173 metrics::counter!("lightrail_firehose_events_total", "kind" => "identity").increment(1); 159 - self.outstanding.insert(seq, Instant::now()); 160 174 let did = identity.did.clone(); 161 - self.queues 162 - .entry(did) 163 - .or_default() 164 - .push_back(PendingWork::Identity(PendingIdentity { identity, seq })); 165 - self.try_fill(); 175 + self.enqueue_work( 176 + did, 177 + PendingWork::Identity(PendingIdentity { identity, seq }), 178 + "identity", 179 + ); 166 180 } 167 181 168 182 /// Enqueue a `#account` event for async processing. ··· 172 186 /// that could re-add index entries for the same DID. 173 187 pub fn enqueue_account(&mut self, account: Box<Account<'static>>, seq: i64) { 174 188 metrics::counter!("lightrail_firehose_events_total", "kind" => "account").increment(1); 175 - self.outstanding.insert(seq, Instant::now()); 176 189 let did = account.did.clone(); 177 - self.queues 178 - .entry(did) 179 - .or_default() 180 - .push_back(PendingWork::Account(PendingAccount { account, seq })); 190 + self.enqueue_work( 191 + did, 192 + PendingWork::Account(PendingAccount { account, seq }), 193 + "account", 194 + ); 195 + } 196 + 197 + /// Common enqueue path: applies the per-DID cap and the `desyncing` gate 198 + /// before pushing onto the queue. 199 + /// 200 + /// Drops the event if the DID is currently being transitioned to 201 + /// `Desynchronized`, or if pushing would exceed [`PER_DID_QUEUE_CAP`] (in 202 + /// which case the overflow path is triggered). 203 + fn enqueue_work(&mut self, did: Did<'static>, work: PendingWork, kind: &'static str) { 204 + if self.desyncing.lock().unwrap().contains(&did) { 205 + metrics::counter!("lightrail_firehose_events_dropped_total", 206 + "reason" => "desyncing", "kind" => kind) 207 + .increment(1); 208 + return; 209 + } 210 + 211 + let depth = self.queues.get(&did).map_or(0, |q| q.len()); 212 + if depth >= PER_DID_QUEUE_CAP { 213 + self.overflow_did(did, kind); 214 + return; 215 + } 216 + 217 + let seq = work.seq(); 218 + self.outstanding.insert(seq, Instant::now()); 219 + self.queues.entry(did).or_default().push_back(work); 181 220 self.try_fill(); 221 + } 222 + 223 + /// Handle a per-DID queue overflow: drop queued events for this DID, 224 + /// remove their seqs from `outstanding` so the watermark can advance, and 225 + /// spawn a blocking task to transition the repo to `Desynchronized` and 226 + /// enqueue a resync. The DID is added to `desyncing` synchronously so 227 + /// concurrent enqueues drop immediately; the spawned task removes it on 228 + /// completion. 229 + fn overflow_did(&mut self, did: Did<'static>, kind: &'static str) { 230 + warn!(did = %did, kind, 231 + "per-DID firehose queue exceeded cap; desyncing repo"); 232 + metrics::counter!("lightrail_firehose_did_overflow_total", "kind" => kind).increment(1); 233 + 234 + if let Some(queue) = self.queues.remove(&did) { 235 + for work in queue { 236 + self.outstanding.remove(&work.seq()); 237 + } 238 + } 239 + 240 + self.desyncing.lock().unwrap().insert(did.clone()); 241 + 242 + let db = self.db.clone(); 243 + let desyncing = self.desyncing.clone(); 244 + let did_for_task = did.clone(); 245 + tokio::task::spawn_blocking(move || { 246 + let outcome = desync_overflowed(&db, &did_for_task); 247 + desyncing.lock().unwrap().remove(&did_for_task); 248 + if let Err(e) = outcome { 249 + error!(did = %did_for_task, error = %e, 250 + "failed to write Desynchronized state after queue overflow"); 251 + } else { 252 + info!(did = %did_for_task, "overflowed DID marked Desynchronized and queued for resync"); 253 + } 254 + }); 255 + } 256 + 257 + /// Current number of outstanding seqs (queued + in-flight). 258 + /// 259 + /// Used by the firehose loop as a backpressure signal. 260 + pub fn outstanding_count(&self) -> usize { 261 + self.outstanding.len() 182 262 } 183 263 184 264 /// Drain available slots in the worker pool. ··· 396 476 .flatten() 397 477 .map_err(|e| e.to_string()); 398 478 CommitWorkerResult { did, seq, outcome } 479 + } 480 + 481 + // --------------------------------------------------------------------------- 482 + // Overflow desync 483 + // --------------------------------------------------------------------------- 484 + 485 + /// Write `RepoState::Desynchronized` for `did` and enqueue a resync. 486 + /// 487 + /// Preserves the existing `AccountStatus` if a record exists; creates one as 488 + /// Active if not. The batch is atomic so the resync queue entry can't be 489 + /// written without the corresponding state update. 490 + fn desync_overflowed(db: &DbRef, did: &Did<'static>) -> crate::error::Result<()> { 491 + let existing_status = match storage::repo::get(db, did)? { 492 + Some((info, _)) => info.status, 493 + None => AccountStatus::Active, 494 + }; 495 + let mut batch = db.database.batch(); 496 + storage::repo::put_info_into( 497 + &mut batch, 498 + db, 499 + did, 500 + &RepoInfo { 501 + state: RepoState::Desynchronized, 502 + status: existing_status, 503 + error: Some("firehose queue overflow".to_string()), 504 + }, 505 + ); 506 + storage::resync_queue::enqueue_into( 507 + &mut batch, 508 + db, 509 + SystemTime::now(), 510 + &ResyncItem { 511 + did: did.clone(), 512 + retry_count: 0, 513 + retry_reason: "firehose queue overflow".to_string(), 514 + commit_cbor: vec![], 515 + }, 516 + ); 517 + batch 518 + .commit() 519 + .map_err(Into::<crate::storage::StorageError>::into)?; 520 + Ok(()) 399 521 } 400 522 401 523 // ---------------------------------------------------------------------------
+22 -1
src/sync/firehose/mod.rs
··· 48 48 /// Reconnect if no message is received for this long. 49 49 const IDLE_TIMEOUT: Duration = Duration::from_secs(180); 50 50 51 + /// Cap on outstanding firehose events (queued + in-flight) across all DIDs. 52 + /// 53 + /// When reached, the WebSocket read is suspended — tungstenite's read buffer 54 + /// fills, the TCP receive window closes, and the relay stops sending. Real 55 + /// backpressure, no dropped events. The cap resumes as soon as worker 56 + /// completions bring outstanding below the threshold. 57 + /// 58 + /// Sized to bound peak memory from pending `Box<Commit>` entries (each holding 59 + /// a full CAR slice) while leaving plenty of headroom for normal operation. 60 + /// 61 + /// Exported as `lightrail_firehose_queue_cap` so Prometheus can compute 62 + /// saturation as `lightrail_firehose_outstanding / lightrail_firehose_queue_cap`. 63 + const GLOBAL_QUEUE_CAP: usize = 2000; 64 + 51 65 /// Manages a single logical connection to a relay firehose, with reconnection. 52 66 pub struct Subscriber { 53 67 host: Host, ··· 82 96 /// Returns only when the cancellation token fires or an unrecoverable 83 97 /// storage error occurs. 84 98 pub async fn run(&mut self, token: tokio_util::sync::CancellationToken) -> Result<()> { 99 + metrics::gauge!("lightrail_firehose_queue_cap").set(GLOBAL_QUEUE_CAP as f64); 85 100 let client = TungsteniteClient::new(); 86 101 let base: jacquard_common::url::Url = format!("wss://{}", self.host).parse().map_err( 87 102 |e: jacquard_common::url::ParseError| crate::error::Error::Other(e.to_string()), ··· 173 188 Timeout, 174 189 } 175 190 191 + // Gate the stream branch on queue capacity (see GLOBAL_QUEUE_CAP). 192 + let can_accept = dispatcher.outstanding_count() < GLOBAL_QUEUE_CAP; 193 + if !can_accept { 194 + metrics::counter!("lightrail_firehose_backpressure_ticks_total").increment(1); 195 + } 196 + 176 197 let event = tokio::select! { 177 198 _ = token.cancelled() => return Ok(()), 178 - r = messages.next() => Event::Stream(r), 199 + r = messages.next(), if can_accept => Event::Stream(r), 179 200 r = dispatcher.workers_mut().join_next_with_id(), 180 201 if dispatcher.has_workers() => Event::Worker(r), 181 202 _ = tokio::time::sleep(
+6 -4
src/sync/resync/describe_repo.rs
··· 179 179 } 180 180 181 181 fn classify_send_error(e: jacquard_common::error::ClientError, host: &str) -> GetCollectionsError { 182 - if matches!(e.kind(), ClientErrorKind::Http { status } if status.as_u16() == 429) { 183 - GetCollectionsError::RateLimited(host.to_string()) 184 - } else { 185 - GetCollectionsError::Request(e.to_string()) 182 + match e.kind() { 183 + ClientErrorKind::Http { status } if status.as_u16() == 429 => { 184 + GetCollectionsError::RateLimited(host.to_string()) 185 + } 186 + ClientErrorKind::Transport => GetCollectionsError::Unreachable(host.to_string()), 187 + _ => GetCollectionsError::Request(e.to_string()), 186 188 } 187 189 } 188 190
+94 -3
src/sync/resync/dispatcher.rs
··· 87 87 /// the dispatcher will not dispatch new work to that host. 88 88 const RATE_LIMIT_COOLDOWN: Duration = Duration::from_secs(20); 89 89 90 + /// Per-host cooldown after a network-layer failure reaching the PDS. 91 + /// 92 + /// Longer than the 429 cooldown because unreachable hosts rarely recover in 93 + /// seconds. Keeps other DIDs on the same dead host from burning worker slots. 94 + const UNREACHABLE_COOLDOWN: Duration = Duration::from_secs(60); 95 + 90 96 pub struct DispatcherConfig { 91 97 pub resolver: std::sync::Arc<crate::identity::Resolver>, 92 98 pub db: DbRef, ··· 358 364 "failed to re-enqueue after rate limit (panic)"), 359 365 } 360 366 } 367 + Ok(WorkerOutcome::Unreachable { pds, retry_count }) => { 368 + metrics::counter!("lightrail_resync_completed_total", 369 + "outcome" => "unreachable") 370 + .increment(1); 371 + drain_stale_buffer(&did, &db).await; 372 + cooling_hosts.insert(pds.clone(), Instant::now() + UNREACHABLE_COOLDOWN); 373 + let pds_host = pds.host_str().unwrap_or("unknown").to_string(); 374 + let new_retry = retry_count.saturating_add(1); 375 + warn!(did = %did, pds = %pds, retry = new_retry, 376 + cooldown_secs = UNREACHABLE_COOLDOWN.as_secs(), 377 + "PDS unreachable; cooling down host"); 378 + // Transition out of Resyncing so the firehose stops buffering 379 + // commits for this DID while its PDS is dead. 380 + let error_reason = format!("unreachable: {pds_host}"); 381 + if let Err(e) = transition_state( 382 + db.clone(), 383 + did.clone(), 384 + RepoState::Error, 385 + Some(error_reason.clone()), 386 + ) 387 + .await 388 + { 389 + error!(error = %e, did = %did, 390 + "error transitioning state after unreachable"); 391 + } 392 + if let Some(delay) = unreachable_backoff(new_retry) { 393 + let item = ResyncItem { 394 + did: did.clone(), 395 + retry_count: new_retry, 396 + retry_reason: error_reason, 397 + commit_cbor: vec![], 398 + }; 399 + let db = db.clone(); 400 + let when = SystemTime::now() + delay; 401 + match tokio::task::spawn_blocking(move || { 402 + storage::resync_queue::enqueue(&db, when, &item) 403 + }) 404 + .await 405 + { 406 + Ok(Ok(())) => info!(did = %did, retry = new_retry, 407 + delay_secs = delay.as_secs(), "re-enqueued after unreachable"), 408 + Ok(Err(e)) => error!(error = %e, did = %did, 409 + "failed to re-enqueue after unreachable"), 410 + Err(e) => error!(error = %e, did = %did, 411 + "failed to re-enqueue after unreachable (panic)"), 412 + } 413 + } else { 414 + info!(did = %did, retries = retry_count, 415 + "giving up on unreachable PDS after max retries"); 416 + } 417 + } 361 418 Ok(outcome) => { 362 419 if let Err(e) = handle_completion( 363 420 did.clone(), ··· 452 509 pds: Arc<Url>, 453 510 retry_count: u16, 454 511 }, 512 + /// Network-layer failure reaching the PDS. The dispatcher cools the host 513 + /// down so other DIDs on the same (likely-dead) PDS don't burn worker slots. 514 + Unreachable { 515 + pds: Arc<Url>, 516 + retry_count: u16, 517 + }, 455 518 /// The PDS returned a definitive "repo not found" response. 456 519 /// The repo state has already been written; schedule a slow retry. 457 520 NotFound { ··· 483 546 Err(ResyncError::RateLimited(pds)) => { 484 547 warn!(did = %did, pds = %pds, "PDS rate-limited; reporting to dispatcher"); 485 548 WorkerOutcome::RateLimited { 549 + pds, 550 + retry_count: item.retry_count, 551 + } 552 + } 553 + Err(ResyncError::Unreachable(pds)) => { 554 + info!(did = %did, pds = %pds, "PDS unreachable; reporting to dispatcher"); 555 + WorkerOutcome::Unreachable { 486 556 pds, 487 557 retry_count: item.retry_count, 488 558 } ··· 561 631 .increment(1); 562 632 drain_stale_buffer(&did, &db).await; 563 633 let new_retry = retry_count.saturating_add(1); 564 - if let Some(delay) = not_found_backoff(new_retry) { 634 + if let Some(delay) = repo_not_found_backoff(new_retry) { 565 635 let item = ResyncItem { 566 636 did: did.clone(), 567 637 retry_count: new_retry, ··· 580 650 "giving up on repo not found after max retries"); 581 651 } 582 652 } 583 - // RateLimited is handled inline in the dispatcher loop before calling here. 653 + // RateLimited and Unreachable are handled inline in the dispatcher loop 654 + // before calling here (both need access to `cooling_hosts`). 584 655 WorkerOutcome::RateLimited { .. } => { 585 656 unreachable!("RateLimited handled before handle_completion") 657 + } 658 + WorkerOutcome::Unreachable { .. } => { 659 + unreachable!("Unreachable handled before handle_completion") 586 660 } 587 661 } 588 662 Ok(()) ··· 759 833 /// Returns `None` when we've exhausted retries — the `RepoState::NotFound` 760 834 /// written by `index_repo` is the terminal state; future firehose activity 761 835 /// for the DID will re-trigger processing normally. 762 - fn not_found_backoff(retry_count: u16) -> Option<Duration> { 836 + fn repo_not_found_backoff(retry_count: u16) -> Option<Duration> { 763 837 match retry_count { 764 838 1 => Some(Duration::from_secs(6 * 3600)), // 6h 765 839 2 => Some(Duration::from_secs(24 * 3600)), // 24h ··· 767 841 _ => None, 768 842 } 769 843 } 844 + 845 + /// Slow-retry schedule for repos whose PDS was unreachable. 846 + /// 847 + /// Longer than the general `backoff` ladder because connect failures burn a 848 + /// full describe_timeout+get_repo_timeout budget per attempt, so we want to 849 + /// back off aggressively. Returns `None` after exhausting retries; the DID's 850 + /// `RepoState::Error` record persists so future firehose activity can 851 + /// re-trigger processing. 852 + fn unreachable_backoff(retry_count: u16) -> Option<Duration> { 853 + match retry_count { 854 + 1 => Some(Duration::from_secs(15 * 60)), // 15m 855 + 2 => Some(Duration::from_secs(60 * 60)), // 1h 856 + 3 => Some(Duration::from_secs(6 * 3600)), // 6h 857 + 4..8 => Some(Duration::from_secs(24 * 3600)), // 24h, keep retrying daily for a while 858 + _ => None, 859 + } 860 + }
+5 -2
src/sync/resync/get_repo.rs
··· 7 7 use jacquard_api::com_atproto::sync::get_repo::GetRepo; 8 8 use jacquard_common::{ 9 9 http_client::{HttpClient, HttpClientExt}, 10 - stream::ByteStream, 10 + stream::{ByteStream, StreamErrorKind}, 11 11 types::string::{Did, Tid}, 12 12 xrpc::XrpcExt, 13 13 }; ··· 40 40 .xrpc(base.clone()) 41 41 .download(&req) 42 42 .await 43 - .map_err(|e| GetCollectionsError::Request(e.to_string()))?; 43 + .map_err(|e| match e.kind() { 44 + StreamErrorKind::Transport => GetCollectionsError::Unreachable(host.clone()), 45 + _ => GetCollectionsError::Request(e.to_string()), 46 + })?; 44 47 45 48 let status = response.status(); 46 49 let (_, body) = response.into_parts();
+10
src/sync/resync/mod.rs
··· 52 52 /// The PDS returned HTTP 429. Carries the PDS URL for host-level cooldown. 53 53 #[error("rate limited by {0}")] 54 54 RateLimited(Arc<jacquard_common::url::Url>), 55 + /// Network-layer failure reaching the PDS (connect refused, timeout, DNS). 56 + /// Carries the PDS URL for host-level cooldown. 57 + #[error("PDS unreachable: {0}")] 58 + Unreachable(Arc<jacquard_common::url::Url>), 55 59 #[error("invalid DID in queue: {0}")] 56 60 InvalidDid(String), 57 61 #[error("blocking storage task panicked: {0}")] ··· 89 93 /// The PDS rate-limited this request (HTTP 429). 90 94 #[error("rate limited by {0} (HTTP 429)")] 91 95 RateLimited(String), 96 + /// Network-layer failure (connect refused, timeout, DNS). Host name included. 97 + #[error("unreachable: {0}")] 98 + Unreachable(String), 92 99 /// Network or HTTP failure; may be transient. 93 100 #[error("request failed: {0}")] 94 101 Request(String), ··· 201 208 } 202 209 Err(GetCollectionsError::RateLimited(_)) => { 203 210 return Err(ResyncError::RateLimited(resolved.pds.clone())); 211 + } 212 + Err(GetCollectionsError::Unreachable(_)) => { 213 + return Err(ResyncError::Unreachable(resolved.pds.clone())); 204 214 } 205 215 Err(e) => return Err(ResyncError::Fetch(e)), 206 216 };