lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

rough round-robin resync (by host)

per-host rate limits are the limiting factor on completing a full-network backfill, so ensuring workers are talking to different hosts as much as possible is the easiest way to speed up backfill via increased concurrency

this is a very rough stab that i'd need a lot of time to be comfortable with

i think there are bigger architectural restructures that get us further

phil ed85038a 31bf417f

+928 -357
+1 -1
examples/enqueue_resync.rs
··· 42 42 retry_reason: args.reason.clone(), 43 43 commit_cbor: vec![], 44 44 }; 45 - storage::resync_queue::enqueue(&db, now, &item)?; 45 + storage::resync_queue::enqueue(&db, now, &item, None)?; 46 46 println!( 47 47 "{}{} queued", 48 48 did.as_str(),
-1
src/http.rs
··· 99 99 let limiter = self.shared.get_or_create_limiter(host); 100 100 while limiter.check().is_err() { 101 101 metrics::gauge!("lightrail_http_host_throttling").increment(1); 102 - // i think we should be limiter.until_ready_with_jitter().await! 103 102 tokio::time::sleep(self.shared.jittered_interval()).await; 104 103 metrics::gauge!("lightrail_http_host_throttling").decrement(1); 105 104 }
+7
src/main.rs
··· 57 57 #[arg(long, env = "LIGHTRAIL_MAX_RESYNC_WORKERS", default_value_t = 16)] 58 58 max_resync_workers: usize, 59 59 60 + /// Maximum resync workers per PDS host. Caps per-host concurrency so the 61 + /// dispatcher spreads work across hosts instead of piling onto the biggest 62 + /// PDS. Items for saturated hosts are re-enqueued with a short delay. 63 + #[arg(long, env = "LIGHTRAIL_MAX_RESYNC_PER_HOST", default_value_t = 20)] 64 + max_resync_per_host: usize, 65 + 60 66 /// How often to flush the firehose cursor watermark to storage, in seconds. 61 67 #[arg(long, env = "LIGHTRAIL_CURSOR_SAVE_INTERVAL", default_value_t = 1)] 62 68 cursor_save_interval_secs: u64, ··· 199 205 db, 200 206 client, 201 207 max_concurrent: args.max_resync_workers, 208 + max_per_host: args.max_resync_per_host, 202 209 describe_timeout: Duration::from_secs(args.describe_repo_fetch_timeout_secs), 203 210 get_repo_timeout: Duration::from_secs(args.get_repo_fetch_timeout_secs), 204 211 token,
+16 -3
src/storage/mod.rs
··· 36 36 pub(super) const PREFIX_SUBSCRIBE_REPOS: KeyPrefix = *b"sub"; 37 37 /// listRepos backfill walk progress (per relay host). See [`backfill_progress`]. 38 38 pub(super) const PREFIX_LIST_REPOS: KeyPrefix = *b"lsr"; 39 - /// Timestamp-ordered resync work queue. See [`resync_queue`]. 39 + /// Timestamp-ordered resync work queue (legacy format). See [`resync_queue`]. 40 40 pub(super) const PREFIX_RESYNC_QUEUE: KeyPrefix = *b"rsq"; 41 + /// Host-partitioned resync work queue. See [`resync_queue`]. 42 + pub(super) const PREFIX_RESYNC_HOST_QUEUE: KeyPrefix = *b"rsH"; 41 43 /// Per-repo buffered firehose events during resync. See [`resync_buffer`]. 42 44 pub(super) const PREFIX_RESYNC_BUFFER: KeyPrefix = *b"rsb"; 43 45 /// Per-PDS host state (sync1.1 mode, trust, listRepos cursor/done). See [`pds_host`]. ··· 66 68 pub(crate) index_ks: fjall::Keyspace, 67 69 /// Persistent system stats and cardinality sketches, loaded on open. 68 70 pub(crate) stats: StatsRef, 71 + /// In-memory index of hosts with queued resync items. 72 + pub(crate) host_index: std::sync::Mutex<resync_queue::HostIndex>, 69 73 } 70 74 71 75 /// Cheaply-cloneable reference to the shared database. ··· 110 114 )) 111 115 })?; 112 116 let stats = meta::load(&ks)?; 113 - Ok(Arc::new(Db { 117 + let host_index = std::sync::Mutex::new(resync_queue::HostIndex::new()); 118 + let db = Arc::new(Db { 114 119 database, 115 120 ks, 116 121 index_ks, 117 122 stats, 118 - })) 123 + host_index, 124 + }); 125 + 126 + // Migrate legacy "rsq" entries to host-partitioned "rsH" format, then 127 + // rebuild the in-memory host index from the on-disk queue state. 128 + resync_queue::migrate_legacy(&db)?; 129 + resync_queue::rebuild_host_index(&db)?; 130 + 131 + Ok(db) 119 132 }
+636 -223
src/storage/resync_queue.rs
··· 1 - //! Timestamp-ordered resync queue. 1 + //! Two-level host-partitioned resync queue. 2 2 //! 3 - //! Keys: `"rsq"<ts_be:u64>\0<did>` 3 + //! Keys: `"rsH" <host_len:u8> <host_bytes> \0 <ts_be:u64> \0 <did>` 4 4 //! Values: `[u16 BE retry_count][u16 BE reason_len][reason_bytes][commit_cbor_bytes]` 5 + //! 6 + //! Items are partitioned by PDS hostname so that the dispatcher can round-robin 7 + //! across hosts for fair scheduling. An in-memory [`HostIndex`] tracks the set 8 + //! of non-empty hosts and per-host counts so that `next_host` is O(hosts) rather 9 + //! than O(queue depth). 5 10 6 - use std::collections::HashSet; 11 + use std::collections::{BTreeSet, HashMap, HashSet}; 7 12 use std::sync::atomic::Ordering; 8 13 9 - use fjall::util::prefixed_range; 10 14 use jacquard_common::types::string::Did; 11 15 use tracing::{debug, trace, warn}; 12 16 13 17 use crate::storage::{ 14 - DbRef, PREFIX_RESYNC_QUEUE, 18 + DbRef, PREFIX_RESYNC_HOST_QUEUE, PREFIX_RESYNC_QUEUE, 15 19 error::{StorageError, StorageResult}, 16 20 repo, 17 21 }; ··· 19 23 const NUL: u8 = b'\0'; 20 24 21 25 // --------------------------------------------------------------------------- 22 - // Key encoding 26 + // In-memory host index 23 27 // --------------------------------------------------------------------------- 24 28 25 - /// `"rsq"<ts_be:u64>\0<did>` — timestamp-ordered resync queue. 26 - /// 27 - /// Big-endian timestamp gives natural chronological ordering. 28 - fn key(ts: u64, did: &Did<'_>) -> Vec<u8> { 29 + /// Tracks the set of hosts that have queued items for round-robin scheduling. 30 + pub struct HostIndex { 31 + hosts: BTreeSet<String>, 32 + host_counts: HashMap<String, u64>, 33 + rr_cursor: usize, 34 + } 35 + 36 + impl HostIndex { 37 + pub fn new() -> Self { 38 + Self { 39 + hosts: BTreeSet::new(), 40 + host_counts: HashMap::new(), 41 + rr_cursor: 0, 42 + } 43 + } 44 + 45 + fn increment(&mut self, host: &str) { 46 + let count = self.host_counts.entry(host.to_owned()).or_insert(0); 47 + *count += 1; 48 + self.hosts.insert(host.to_owned()); 49 + } 50 + 51 + fn decrement(&mut self, host: &str) { 52 + if let Some(count) = self.host_counts.get_mut(host) { 53 + *count = count.saturating_sub(1); 54 + if *count == 0 { 55 + self.host_counts.remove(host); 56 + self.hosts.remove(host); 57 + // If cursor was past the removed host, adjust 58 + if self.rr_cursor > self.hosts.len() { 59 + self.rr_cursor = 0; 60 + } 61 + } 62 + } 63 + } 64 + } 65 + 66 + // --------------------------------------------------------------------------- 67 + // Key encoding (new host-partitioned format) 68 + // --------------------------------------------------------------------------- 69 + 70 + /// `"rsH" <host_len:u8> <host_bytes> \0 <ts_be:u64> \0 <did>` 71 + fn host_key(host: &str, ts: u64, did: &Did<'_>) -> Vec<u8> { 29 72 let d = did.as_str(); 30 - let mut k = Vec::with_capacity(PREFIX_RESYNC_QUEUE.len() + 8 + 1 + d.len()); 31 - k.extend_from_slice(&PREFIX_RESYNC_QUEUE); 73 + let hb = host.as_bytes(); 74 + let mut k = 75 + Vec::with_capacity(PREFIX_RESYNC_HOST_QUEUE.len() + 1 + hb.len() + 1 + 8 + 1 + d.len()); 76 + k.extend_from_slice(&PREFIX_RESYNC_HOST_QUEUE); 77 + k.push(hb.len() as u8); 78 + k.extend_from_slice(hb); 79 + k.push(NUL); 32 80 k.extend_from_slice(&ts.to_be_bytes()); 33 81 k.push(NUL); 34 82 k.extend_from_slice(d.as_bytes()); 35 83 k 36 84 } 37 85 38 - /// `"rsq"` — prefix for scanning the entire queue. 86 + /// Prefix for scanning all items for one host: `"rsH" <host_len:u8> <host_bytes> \0` 87 + fn host_prefix(host: &str) -> Vec<u8> { 88 + let hb = host.as_bytes(); 89 + let mut k = Vec::with_capacity(PREFIX_RESYNC_HOST_QUEUE.len() + 1 + hb.len() + 1); 90 + k.extend_from_slice(&PREFIX_RESYNC_HOST_QUEUE); 91 + k.push(hb.len() as u8); 92 + k.extend_from_slice(hb); 93 + k.push(NUL); 94 + k 95 + } 96 + 97 + /// Prefix for scanning the entire host-partitioned queue. 39 98 fn key_prefix_all() -> Vec<u8> { 40 - PREFIX_RESYNC_QUEUE.to_vec() 99 + PREFIX_RESYNC_HOST_QUEUE.to_vec() 41 100 } 42 101 43 - /// `<ts_be:u64>\0` — timestamp middle component, for use as an upper bound 44 - /// after concatenating with [`key_prefix_all`]. 45 - fn key_ts_midfix(ts: u64) -> Vec<u8> { 46 - let mut k = Vec::with_capacity(9); 102 + /// Upper bound for timestamp within a host prefix (exclusive): 103 + /// `"rsH" <host_len:u8> <host_bytes> \0 <ts_be:u64> \0` 104 + /// 105 + /// Used as the exclusive end of a range scan so that we only see items with 106 + /// timestamps strictly less than `ts`. 107 + fn host_ts_upper(host: &str, ts: u64) -> Vec<u8> { 108 + let hb = host.as_bytes(); 109 + let mut k = Vec::with_capacity(PREFIX_RESYNC_HOST_QUEUE.len() + 1 + hb.len() + 1 + 8 + 1); 110 + k.extend_from_slice(&PREFIX_RESYNC_HOST_QUEUE); 111 + k.push(hb.len() as u8); 112 + k.extend_from_slice(hb); 113 + k.push(NUL); 47 114 k.extend_from_slice(&ts.to_be_bytes()); 48 115 k.push(NUL); 49 116 k 50 117 } 51 118 52 - /// Parse a timestamp and DID from a full resync queue key. 53 - fn key_parse(raw: &[u8]) -> StorageResult<(u64, Did<'static>)> { 119 + /// Parse host, timestamp, and DID from a full host-partitioned key. 120 + fn host_key_parse(raw: &[u8]) -> StorageResult<(String, u64, Did<'static>)> { 121 + let key_str = String::from_utf8_lossy(raw); 122 + let rest = raw 123 + .strip_prefix(&PREFIX_RESYNC_HOST_QUEUE) 124 + .ok_or(StorageError::Corrupt { 125 + key: key_str.to_string(), 126 + reason: "wrong prefix for host resync queue", 127 + })?; 128 + if rest.is_empty() { 129 + return Err(StorageError::Corrupt { 130 + key: key_str.to_string(), 131 + reason: "missing host_len byte", 132 + }); 133 + } 134 + let host_len = rest[0] as usize; 135 + let rest = &rest[1..]; 136 + if rest.len() < host_len { 137 + return Err(StorageError::Corrupt { 138 + key: key_str.to_string(), 139 + reason: "host bytes truncated", 140 + }); 141 + } 142 + let host = std::str::from_utf8(&rest[..host_len]) 143 + .map_err(|_| StorageError::Corrupt { 144 + key: key_str.to_string(), 145 + reason: "host not valid UTF-8", 146 + })? 147 + .to_owned(); 148 + let rest = rest[host_len..] 149 + .strip_prefix(&[NUL]) 150 + .ok_or(StorageError::Corrupt { 151 + key: key_str.to_string(), 152 + reason: "missing NUL after host", 153 + })?; 154 + if rest.len() < 9 { 155 + return Err(StorageError::Corrupt { 156 + key: key_str.to_string(), 157 + reason: "not enough bytes for timestamp + NUL", 158 + }); 159 + } 160 + let ts_bytes: [u8; 8] = rest[..8].try_into().map_err(|_| StorageError::Corrupt { 161 + key: key_str.to_string(), 162 + reason: "timestamp conversion failed", 163 + })?; 164 + let ts = u64::from_be_bytes(ts_bytes); 165 + let rest = rest[8..] 166 + .strip_prefix(&[NUL]) 167 + .ok_or(StorageError::Corrupt { 168 + key: key_str.to_string(), 169 + reason: "missing NUL after timestamp", 170 + })?; 171 + let did_str = std::str::from_utf8(rest).map_err(|_| StorageError::Corrupt { 172 + key: key_str.to_string(), 173 + reason: "invalid UTF-8 for DID", 174 + })?; 175 + let did = Did::new_owned(did_str).map_err(|_| StorageError::Corrupt { 176 + key: key_str.to_string(), 177 + reason: "invalid DID", 178 + })?; 179 + Ok((host, ts, did)) 180 + } 181 + 182 + // --------------------------------------------------------------------------- 183 + // Legacy key helpers (for migration) 184 + // --------------------------------------------------------------------------- 185 + 186 + /// Parse timestamp and DID from old `"rsq"` format key. 187 + fn legacy_key_parse(raw: &[u8]) -> StorageResult<(u64, Did<'static>)> { 54 188 let key_str = String::from_utf8_lossy(raw); 55 189 let rest = raw 56 190 .strip_prefix(&PREFIX_RESYNC_QUEUE) 57 191 .ok_or(StorageError::Corrupt { 58 192 key: key_str.to_string(), 59 - reason: "wrong prefix for resync queue", 193 + reason: "wrong prefix for legacy resync queue", 60 194 })?; 61 195 if rest.len() < 9 { 62 196 return Err(StorageError::Corrupt { 63 197 key: key_str.to_string(), 64 - reason: "not enough suffix bytes for resync queue", 198 + reason: "not enough suffix bytes for legacy resync queue", 65 199 }); 66 200 } 67 201 let ts_bytes: [u8; 8] = rest[..8].try_into().map_err(|_| StorageError::Corrupt { 68 202 key: key_str.to_string(), 69 - reason: "not enough bytes for timestamp in resync queue", 203 + reason: "not enough bytes for timestamp", 70 204 })?; 71 205 let ts = u64::from_be_bytes(ts_bytes); 72 206 let rest = rest[8..] 73 207 .strip_prefix(&[NUL]) 74 208 .ok_or(StorageError::Corrupt { 75 209 key: key_str.to_string(), 76 - reason: "missing NUL separator in resync queue key", 210 + reason: "missing NUL separator in legacy key", 77 211 })?; 78 212 let did_str = std::str::from_utf8(rest).map_err(|_| StorageError::Corrupt { 79 213 key: key_str.to_string(), 80 - reason: "invalid UTF-8 for DID in resync queue", 214 + reason: "invalid UTF-8 for DID in legacy key", 81 215 })?; 82 216 let did = Did::new_owned(did_str).map_err(|_| StorageError::Corrupt { 83 217 key: key_str.to_string(), 84 - reason: "invalid DID in resync queue", 218 + reason: "invalid DID in legacy key", 85 219 })?; 86 220 Ok((ts, did)) 87 221 } ··· 142 276 } 143 277 144 278 // --------------------------------------------------------------------------- 279 + // Public item type for claimed items 280 + // --------------------------------------------------------------------------- 281 + 282 + /// A claimed item with its host. 283 + pub struct ClaimedItem { 284 + pub item: ResyncItem, 285 + pub host: String, 286 + } 287 + 288 + // --------------------------------------------------------------------------- 145 289 // CRUD 146 290 // --------------------------------------------------------------------------- 147 291 148 - /// queue a repo into a batch 149 - pub fn enqueue_into(batch: &mut fjall::OwnedWriteBatch, db: &DbRef, ts: u64, item: &ResyncItem) { 292 + /// Queue a repo for resync into a batch. 293 + /// 294 + /// `host` is the PDS hostname; `None` maps to the empty-string (unknown) bucket. 295 + pub fn enqueue_into( 296 + batch: &mut fjall::OwnedWriteBatch, 297 + db: &DbRef, 298 + ts: u64, 299 + item: &ResyncItem, 300 + host: Option<&str>, 301 + ) { 302 + let h = host.unwrap_or(""); 150 303 if item.retry_reason == "backfill" { 151 304 trace!( 152 305 did = item.did.as_str(), 153 306 ts, 307 + host = h, 154 308 reason = %item.retry_reason, 155 309 retry = item.retry_count, 156 310 "enqueue resync to batch" ··· 159 313 debug!( 160 314 did = item.did.as_str(), 161 315 ts, 316 + host = h, 162 317 reason = %item.retry_reason, 163 318 retry = item.retry_count, 164 319 "enqueue resync to batch" 165 320 ); 166 321 } 167 - batch.insert(&db.ks, key(ts, &item.did), encode(item)); 322 + batch.insert(&db.ks, host_key(h, ts, &item.did), encode(item)); 168 323 db.stats.resync_queue_depth.fetch_add(1, Ordering::Relaxed); 169 - } 170 324 171 - /// Count the total number of entries currently in the resync queue. 172 - /// 173 - /// Performs a full prefix scan; use only for admin/diagnostic views. 174 - pub fn count_queued(db: &DbRef) -> usize { 175 - db.ks.prefix(key_prefix_all()).count() 325 + let mut idx = db.host_index.lock().expect("host_index poisoned"); 326 + idx.increment(h); 176 327 } 177 328 178 329 /// Enqueue a repo for resync at the given Unix timestamp (seconds). 179 - pub fn enqueue(db: &DbRef, ts: u64, item: &ResyncItem) -> StorageResult<()> { 330 + /// 331 + /// `host` is the PDS hostname; `None` maps to the empty-string (unknown) bucket. 332 + pub fn enqueue(db: &DbRef, ts: u64, item: &ResyncItem, host: Option<&str>) -> StorageResult<()> { 180 333 let mut batch = db.database.batch(); 181 - enqueue_into(&mut batch, db, ts, item); 334 + enqueue_into(&mut batch, db, ts, item, host); 182 335 batch.commit()?; 183 336 Ok(()) 184 337 } 185 338 186 - /// Dequeue and return the next item whose timestamp is ≤ `now`. 339 + /// Count the total number of entries currently in the resync queue. 187 340 /// 188 - /// Removes the entry from the queue atomically before returning it. 189 - /// 190 - /// TODO: no, this is not atomic currently 191 - /// 192 - /// note: deleted accounts aren't removed from the resync queue so we need to 193 - /// check that (or does the caller deal with it?) 194 - /// 195 - /// `since`: we actually want to pass in a cursor so we can efficiently skip 196 - /// over tombstones. we don't have to persist the cursor to disk, but the caller 197 - /// can hold it in memory over the app's lifetime so we only pay the tomb scan 198 - /// cost once on startup. 199 - pub fn dequeue_ready( 200 - db: &DbRef, 201 - now: u64, 202 - since: Option<Vec<u8>>, 203 - ) -> StorageResult<Option<(ResyncItem, Vec<u8>)>> { 204 - let prefix = key_prefix_all(); 341 + /// Performs a full prefix scan; use only for admin/diagnostic views. 342 + pub fn count_queued(db: &DbRef) -> usize { 343 + db.ks.prefix(key_prefix_all()).count() 344 + } 205 345 206 - let lower_suffix = since.unwrap_or(vec![]); 207 - let upper_suffix = key_ts_midfix(now); 346 + /// Advance the round-robin cursor and return the next host that has queued items 347 + /// and is not in `skip_hosts`. Returns `None` if all hosts are skipped or the 348 + /// queue is empty. 349 + pub fn next_host(db: &DbRef, skip_hosts: &HashSet<String>) -> Option<String> { 350 + let idx = db.host_index.lock().expect("host_index poisoned"); 351 + let n = idx.hosts.len(); 352 + if n == 0 { 353 + return None; 354 + } 208 355 209 - let Some(guard) = db 210 - .ks 211 - .range(prefixed_range(&prefix, lower_suffix..upper_suffix)) 212 - .next() 213 - else { 214 - return Ok(None); 215 - }; 356 + // Start from current cursor position and wrap around once. 357 + let start = idx.rr_cursor % n; 358 + let hosts_vec: Vec<&String> = idx.hosts.iter().collect(); 216 359 217 - let (key_slice, val_slice) = guard.into_inner()?; 218 - let key_bytes = key_slice.as_ref(); 219 - let (ts, did) = key_parse(key_bytes)?; 220 - assert!(ts < now); 221 - let key_str = String::from_utf8_lossy(key_bytes).into_owned(); 222 - let item = decode(val_slice.as_ref(), &key_str, did)?; 223 - debug!( 224 - did = item.did.as_str(), 225 - ts, 226 - reason = %item.retry_reason, 227 - retry = item.retry_count, 228 - "dequeue resync" 229 - ); 230 - db.ks.remove(key_bytes)?; 231 - let next_since = key_bytes 232 - .get(prefix.len()..) 233 - .expect("a resync queue key must start with the resync queue prefix"); 234 - Ok(Some((item, next_since.to_vec()))) 360 + for i in 0..n { 361 + let pos = (start + i) % n; 362 + let host = hosts_vec[pos]; 363 + if !skip_hosts.contains(host) { 364 + // We need to drop the immutable borrow to take a mutable one. 365 + // Instead, capture the result and the new cursor position. 366 + let result = host.clone(); 367 + let new_cursor = (pos + 1) % n; 368 + drop(idx); 369 + let mut idx = db.host_index.lock().expect("host_index poisoned"); 370 + idx.rr_cursor = new_cursor; 371 + return Some(result); 372 + } 373 + } 374 + None 235 375 } 236 376 237 - /// Claim the next ready resync job, skipping DIDs that are currently in flight. 377 + /// Claim the oldest ready item from a specific host's sub-queue, skipping busy 378 + /// DIDs. Atomically removes from queue and transitions repo state to Resyncing. 238 379 /// 239 - /// Scans the queue for the oldest entry whose timestamp is `< now` and whose 240 - /// DID is not in `busy`. On finding one it atomically (fjall batch): 241 - /// - removes the entry from the resync queue, and 242 - /// - writes `state = Resyncing` for the repo (preserving its account status). 243 - /// 244 - /// Returns the claimed item and an updated `since` cursor (same semantics as 245 - /// [`dequeue_ready`]). Returns `None` if no claimable entry exists. 246 - pub fn claim_resync( 380 + /// Returns `None` if no claimable item exists for this host. 381 + pub fn claim_from_host( 247 382 db: &DbRef, 383 + host: &str, 248 384 now: u64, 249 - since: Option<Vec<u8>>, 250 385 busy: &HashSet<Did<'_>>, 251 - ) -> StorageResult<Option<(ResyncItem, Vec<u8>)>> { 252 - let prefix = key_prefix_all(); 253 - let lower_suffix = since.unwrap_or_default(); 254 - let upper_suffix = key_ts_midfix(now); 386 + ) -> StorageResult<Option<ClaimedItem>> { 387 + let prefix = host_prefix(host); 388 + let upper = host_ts_upper(host, now); 255 389 256 - for guard in db 257 - .ks 258 - .range(prefixed_range(&prefix, lower_suffix..upper_suffix)) 259 - { 390 + for guard in db.ks.range(prefix..upper) { 260 391 let (key_slice, val_slice) = guard.into_inner()?; 261 392 let key_bytes = key_slice.as_ref(); 262 - let (_, did) = key_parse(key_bytes)?; 393 + let (_, _, did) = host_key_parse(key_bytes)?; 263 394 264 395 if busy.contains(&did) { 265 - debug!(did = did.as_str(), "skip busy did in resync queue"); 396 + debug!(did = did.as_str(), host, "skip busy did in host queue"); 266 397 continue; 267 398 } 268 399 269 400 let key_str = String::from_utf8_lossy(key_bytes).into_owned(); 270 401 let item = decode(val_slice.as_ref(), &key_str, did.clone())?; 271 - let next_since = key_bytes[prefix.len()..].to_vec(); 272 402 273 403 // Read current repo info to preserve account status across the transition. 274 404 let repo_key = repo::key(&did); ··· 300 430 batch.commit()?; 301 431 302 432 db.stats.resync_queue_depth.fetch_sub(1, Ordering::Relaxed); 433 + 434 + { 435 + let mut idx = db.host_index.lock().expect("host_index poisoned"); 436 + idx.decrement(host); 437 + } 438 + 303 439 trace!( 304 440 did = item.did.as_str(), 441 + host, 305 442 reason = %item.retry_reason, 306 443 retry = item.retry_count, 307 - "claimed resync job" 444 + "claimed resync job from host queue" 308 445 ); 309 - return Ok(Some((item, next_since))); 446 + return Ok(Some(ClaimedItem { 447 + item, 448 + host: host.to_owned(), 449 + })); 310 450 } 311 451 312 452 Ok(None) 313 453 } 314 454 315 455 // --------------------------------------------------------------------------- 456 + // Migration + rebuild 457 + // --------------------------------------------------------------------------- 458 + 459 + /// Migrate old-format `"rsq"` entries to new `"rsH"` format with empty host. 460 + /// 461 + /// Called once on startup. Returns the number of migrated entries. 462 + pub fn migrate_legacy(db: &DbRef) -> StorageResult<u64> { 463 + let prefix = PREFIX_RESYNC_QUEUE.to_vec(); 464 + let mut count = 0u64; 465 + 466 + for guard in db.ks.prefix(&prefix) { 467 + let (key_slice, val_slice) = guard.into_inner()?; 468 + let old_key = key_slice.as_ref(); 469 + let (ts, did) = legacy_key_parse(old_key)?; 470 + 471 + // Write new-format key with empty host, remove old key. 472 + let new_key = host_key("", ts, &did); 473 + let mut batch = db.database.batch(); 474 + batch.insert(&db.ks, &new_key, val_slice.as_ref()); 475 + batch.remove(&db.ks, old_key); 476 + batch.commit()?; 477 + 478 + count += 1; 479 + } 480 + 481 + if count > 0 { 482 + debug!(count, "migrated legacy rsq entries to rsH format"); 483 + } 484 + Ok(count) 485 + } 486 + 487 + /// Rebuild the in-memory [`HostIndex`] from a full scan of the queue. 488 + /// 489 + /// Called on startup after migration. 490 + pub fn rebuild_host_index(db: &DbRef) -> StorageResult<()> { 491 + let prefix = key_prefix_all(); 492 + let mut hosts = BTreeSet::new(); 493 + let mut host_counts: HashMap<String, u64> = HashMap::new(); 494 + 495 + for guard in db.ks.prefix(&prefix) { 496 + let (key_slice, _) = guard.into_inner()?; 497 + let (host, _, _) = host_key_parse(key_slice.as_ref())?; 498 + *host_counts.entry(host.clone()).or_insert(0) += 1; 499 + hosts.insert(host); 500 + } 501 + 502 + let mut idx = db.host_index.lock().expect("host_index poisoned"); 503 + idx.hosts = hosts; 504 + idx.host_counts = host_counts; 505 + idx.rr_cursor = 0; 506 + 507 + Ok(()) 508 + } 509 + 510 + // --------------------------------------------------------------------------- 316 511 // Tests 317 512 // --------------------------------------------------------------------------- 318 513 ··· 336 531 } 337 532 } 338 533 339 - // --- key encoding --- 534 + fn pending_repo(db: &DbRef, did_str: &str) { 535 + repo::put_info( 536 + db, 537 + &did(did_str), 538 + &repo::RepoInfo { 539 + state: repo::RepoState::Pending, 540 + status: repo::AccountStatus::Active, 541 + error: None, 542 + }, 543 + ) 544 + .unwrap(); 545 + } 546 + 547 + // --- host_key structure and sorting --- 340 548 341 549 #[test] 342 - fn key_structure() { 343 - let k = key(0x0102030405060708, &did("did:web:example.com")); 344 - let mut expected = b"rsq".to_vec(); 550 + fn host_key_structure() { 551 + let k = host_key("pds1.bsky.network", 0x0102030405060708, &did("did:web:example.com")); 552 + let mut expected = b"rsH".to_vec(); 553 + let host_bytes = b"pds1.bsky.network"; 554 + expected.push(host_bytes.len() as u8); 555 + expected.extend_from_slice(host_bytes); 556 + expected.push(b'\0'); 345 557 expected.extend_from_slice(&0x0102030405060708u64.to_be_bytes()); 346 558 expected.push(b'\0'); 347 559 expected.extend_from_slice(b"did:web:example.com"); ··· 349 561 } 350 562 351 563 #[test] 352 - fn key_sorts_by_timestamp() { 353 - let earlier = key(100, &did("did:web:example.com")); 354 - let later = key(200, &did("did:web:example.com")); 564 + fn same_host_sorts_by_timestamp() { 565 + let earlier = host_key("pds.example.com", 100, &did("did:web:a.com")); 566 + let later = host_key("pds.example.com", 200, &did("did:web:a.com")); 355 567 assert!(earlier < later); 356 568 } 357 569 358 570 #[test] 359 - fn key_same_timestamp_sorts_by_did() { 360 - let a = key(100, &did("did:web:a.com")); 361 - let b = key(100, &did("did:web:b.com")); 571 + fn same_host_same_ts_sorts_by_did() { 572 + let a = host_key("pds.example.com", 100, &did("did:web:a.com")); 573 + let b = host_key("pds.example.com", 100, &did("did:web:b.com")); 362 574 assert!(a < b); 363 575 } 576 + 577 + // --- host_key_parse roundtrips --- 364 578 365 579 #[test] 366 - fn key_parse_roundtrips() { 580 + fn host_key_parse_roundtrips() { 367 581 let ts = 0xdeadbeefcafe1234u64; 368 582 let d = did("did:web:example.com"); 369 - let k = key(ts, &d); 370 - let (parsed_ts, parsed_did) = key_parse(&k).unwrap(); 583 + let host = "pds1.bsky.network"; 584 + let k = host_key(host, ts, &d); 585 + let (parsed_host, parsed_ts, parsed_did) = host_key_parse(&k).unwrap(); 586 + assert_eq!(parsed_host, host); 371 587 assert_eq!(parsed_ts, ts); 372 588 assert_eq!(parsed_did, d); 373 589 } 374 590 375 591 #[test] 376 - fn key_prefix_all_is_prefix_of_any_entry() { 377 - let prefix_all = key_prefix_all(); 378 - let k = key(100, &did("did:web:example.com")); 379 - assert!(k.starts_with(&prefix_all)); 380 - } 381 - 382 - #[test] 383 - fn key_ts_midfix_upper_bound_excludes_entries_at_that_ts() { 384 - let prefix_all = key_prefix_all(); 385 - let entry = key(100, &did("did:web:example.com")); 386 - let mut upper = prefix_all.clone(); 387 - upper.extend_from_slice(&key_ts_midfix(100)); 388 - assert!(entry >= upper); 592 + fn host_key_parse_empty_host() { 593 + let ts = 42u64; 594 + let d = did("did:web:unknown.com"); 595 + let k = host_key("", ts, &d); 596 + let (parsed_host, parsed_ts, parsed_did) = host_key_parse(&k).unwrap(); 597 + assert_eq!(parsed_host, ""); 598 + assert_eq!(parsed_ts, ts); 599 + assert_eq!(parsed_did, d); 389 600 } 390 601 391 602 #[test] 392 - fn key_ts_midfix_upper_bound_includes_earlier_ts() { 393 - let prefix_all = key_prefix_all(); 394 - let entry = key(99, &did("did:web:example.com")); 395 - let mut upper = prefix_all.clone(); 396 - upper.extend_from_slice(&key_ts_midfix(100)); 397 - assert!(entry < upper); 398 - } 399 - 400 - #[test] 401 - fn key_parse_returns_error_for_truncated_key() { 402 - assert!(key_parse(b"rsq").is_err()); 603 + fn host_key_parse_rejects_truncated() { 604 + assert!(host_key_parse(b"rsH").is_err()); 403 605 } 404 606 405 607 // --- encode / decode --- ··· 429 631 assert!(decode(&[0, 1, 2], "k", did("did:web:example.com")).is_err()); 430 632 } 431 633 432 - // --- enqueue / dequeue_ready --- 634 + // --- enqueue and claim_from_host basic flow --- 433 635 434 636 #[test] 435 - fn dequeue_returns_none_on_empty_queue() { 637 + fn enqueue_and_claim_basic() { 436 638 let db = open_temporary().unwrap(); 437 - assert!(dequeue_ready(&db, 9999, None).unwrap().is_none()); 639 + pending_repo(&db, "did:web:a.com"); 640 + enqueue( 641 + &db, 642 + 100, 643 + &item("did:web:a.com", 0, "backfill", &[1, 2, 3]), 644 + Some("pds.example.com"), 645 + ) 646 + .unwrap(); 647 + 648 + let claimed = claim_from_host(&db, "pds.example.com", 101, &HashSet::new()) 649 + .unwrap() 650 + .unwrap(); 651 + assert_eq!(claimed.item.did.as_str(), "did:web:a.com"); 652 + assert_eq!(claimed.item.retry_reason, "backfill"); 653 + assert_eq!(claimed.item.commit_cbor, vec![1, 2, 3]); 654 + assert_eq!(claimed.host, "pds.example.com"); 655 + 656 + // Queue should now be empty for this host. 657 + assert!( 658 + claim_from_host(&db, "pds.example.com", 9999, &HashSet::new()) 659 + .unwrap() 660 + .is_none() 661 + ); 438 662 } 439 663 440 664 #[test] 441 - fn enqueue_and_dequeue_basic() { 665 + fn enqueue_with_none_host_uses_empty_string() { 666 + let db = open_temporary().unwrap(); 667 + pending_repo(&db, "did:web:a.com"); 668 + enqueue(&db, 100, &item("did:web:a.com", 0, "test", &[]), None).unwrap(); 669 + 670 + let claimed = claim_from_host(&db, "", 101, &HashSet::new()) 671 + .unwrap() 672 + .unwrap(); 673 + assert_eq!(claimed.item.did.as_str(), "did:web:a.com"); 674 + assert_eq!(claimed.host, ""); 675 + } 676 + 677 + // --- next_host round-robins across hosts --- 678 + 679 + #[test] 680 + fn next_host_round_robins() { 442 681 let db = open_temporary().unwrap(); 443 - enqueue(&db, 100, &item("did:web:a.com", 0, "backfill", &[1, 2, 3])).unwrap(); 682 + enqueue( 683 + &db, 684 + 100, 685 + &item("did:web:a.com", 0, "r1", &[]), 686 + Some("alpha.example.com"), 687 + ) 688 + .unwrap(); 689 + enqueue( 690 + &db, 691 + 100, 692 + &item("did:web:b.com", 0, "r2", &[]), 693 + Some("beta.example.com"), 694 + ) 695 + .unwrap(); 696 + enqueue( 697 + &db, 698 + 100, 699 + &item("did:web:c.com", 0, "r3", &[]), 700 + Some("gamma.example.com"), 701 + ) 702 + .unwrap(); 444 703 445 - let (got, _cursor) = dequeue_ready(&db, 101, None).unwrap().unwrap(); 446 - assert_eq!(got.did.as_str(), "did:web:a.com"); 447 - assert_eq!(got.retry_reason, "backfill"); 448 - assert_eq!(got.commit_cbor, vec![1, 2, 3]); 704 + let empty_skip: HashSet<String> = HashSet::new(); 705 + let h1 = next_host(&db, &empty_skip).unwrap(); 706 + let h2 = next_host(&db, &empty_skip).unwrap(); 707 + let h3 = next_host(&db, &empty_skip).unwrap(); 708 + let h4 = next_host(&db, &empty_skip).unwrap(); 449 709 450 - assert!(dequeue_ready(&db, 101, None).unwrap().is_none()); 710 + // BTreeSet order is alphabetical: alpha, beta, gamma 711 + assert_eq!(h1, "alpha.example.com"); 712 + assert_eq!(h2, "beta.example.com"); 713 + assert_eq!(h3, "gamma.example.com"); 714 + // Wraps around 715 + assert_eq!(h4, "alpha.example.com"); 451 716 } 452 717 718 + // --- next_host skips hosts in skip set --- 719 + 453 720 #[test] 454 - fn dequeue_excludes_items_at_or_after_now() { 721 + fn next_host_skips_hosts_in_skip_set() { 455 722 let db = open_temporary().unwrap(); 456 - enqueue(&db, 100, &item("did:web:a.com", 0, "test", &[])).unwrap(); 723 + enqueue( 724 + &db, 725 + 100, 726 + &item("did:web:a.com", 0, "r1", &[]), 727 + Some("alpha.example.com"), 728 + ) 729 + .unwrap(); 730 + enqueue( 731 + &db, 732 + 100, 733 + &item("did:web:b.com", 0, "r2", &[]), 734 + Some("beta.example.com"), 735 + ) 736 + .unwrap(); 737 + enqueue( 738 + &db, 739 + 100, 740 + &item("did:web:c.com", 0, "r3", &[]), 741 + Some("gamma.example.com"), 742 + ) 743 + .unwrap(); 457 744 458 - assert!(dequeue_ready(&db, 100, None).unwrap().is_none()); 459 - assert!(dequeue_ready(&db, 99, None).unwrap().is_none()); 460 - assert!(dequeue_ready(&db, 101, None).unwrap().is_some()); 745 + let mut skip = HashSet::new(); 746 + skip.insert("alpha.example.com".to_owned()); 747 + skip.insert("gamma.example.com".to_owned()); 748 + 749 + let h = next_host(&db, &skip).unwrap(); 750 + assert_eq!(h, "beta.example.com"); 461 751 } 462 752 463 753 #[test] 464 - fn dequeue_returns_oldest_entry_first() { 754 + fn next_host_returns_none_when_all_skipped() { 465 755 let db = open_temporary().unwrap(); 466 - enqueue(&db, 200, &item("did:web:b.com", 0, "later", &[])).unwrap(); 467 - enqueue(&db, 100, &item("did:web:a.com", 0, "earlier", &[])).unwrap(); 756 + enqueue( 757 + &db, 758 + 100, 759 + &item("did:web:a.com", 0, "r1", &[]), 760 + Some("alpha.example.com"), 761 + ) 762 + .unwrap(); 468 763 469 - let (first, _) = dequeue_ready(&db, 9999, None).unwrap().unwrap(); 470 - assert_eq!(first.retry_reason, "earlier"); 764 + let mut skip = HashSet::new(); 765 + skip.insert("alpha.example.com".to_owned()); 766 + assert!(next_host(&db, &skip).is_none()); 767 + } 471 768 472 - let (second, _) = dequeue_ready(&db, 9999, None).unwrap().unwrap(); 473 - assert_eq!(second.retry_reason, "later"); 769 + #[test] 770 + fn next_host_returns_none_on_empty_queue() { 771 + let db = open_temporary().unwrap(); 772 + let empty_skip: HashSet<String> = HashSet::new(); 773 + assert!(next_host(&db, &empty_skip).is_none()); 474 774 } 475 775 776 + // --- claim_from_host skips busy DIDs --- 777 + 476 778 #[test] 477 - fn since_cursor_skips_over_tombstone_region() { 779 + fn claim_from_host_skips_busy_dids() { 478 780 let db = open_temporary().unwrap(); 479 - enqueue(&db, 10, &item("did:web:a.com", 0, "first", &[])).unwrap(); 480 - enqueue(&db, 20, &item("did:web:b.com", 0, "second", &[])).unwrap(); 781 + pending_repo(&db, "did:web:a.com"); 782 + pending_repo(&db, "did:web:b.com"); 783 + enqueue( 784 + &db, 785 + 100, 786 + &item("did:web:a.com", 0, "first", &[]), 787 + Some("pds.example.com"), 788 + ) 789 + .unwrap(); 790 + enqueue( 791 + &db, 792 + 101, 793 + &item("did:web:b.com", 0, "second", &[]), 794 + Some("pds.example.com"), 795 + ) 796 + .unwrap(); 481 797 482 - let (first, cursor) = dequeue_ready(&db, 9999, None).unwrap().unwrap(); 483 - assert_eq!(first.retry_reason, "first"); 798 + let mut busy: HashSet<Did<'static>> = HashSet::new(); 799 + busy.insert(did("did:web:a.com")); 484 800 485 - enqueue(&db, 5, &item("did:web:late.com", 0, "late", &[])).unwrap(); 486 - 487 - let (second, _) = dequeue_ready(&db, 9999, Some(cursor)).unwrap().unwrap(); 488 - assert_eq!(second.retry_reason, "second"); 801 + let claimed = claim_from_host(&db, "pds.example.com", 9999, &busy) 802 + .unwrap() 803 + .unwrap(); 804 + assert_eq!(claimed.item.did.as_str(), "did:web:b.com"); 489 805 } 490 806 491 - // --- claim_resync --- 492 - 493 - fn pending_repo(db: &DbRef, did_str: &str) { 494 - repo::put_info( 495 - db, 496 - &did(did_str), 497 - &repo::RepoInfo { 498 - state: repo::RepoState::Pending, 499 - status: repo::AccountStatus::Active, 500 - error: None, 501 - }, 807 + #[test] 808 + fn claim_from_host_returns_none_when_all_busy() { 809 + let db = open_temporary().unwrap(); 810 + pending_repo(&db, "did:web:a.com"); 811 + enqueue( 812 + &db, 813 + 100, 814 + &item("did:web:a.com", 0, "only", &[]), 815 + Some("pds.example.com"), 502 816 ) 503 817 .unwrap(); 818 + 819 + let mut busy: HashSet<Did<'static>> = HashSet::new(); 820 + busy.insert(did("did:web:a.com")); 821 + 822 + assert!( 823 + claim_from_host(&db, "pds.example.com", 9999, &busy) 824 + .unwrap() 825 + .is_none() 826 + ); 504 827 } 828 + 829 + // --- claim_from_host respects timestamps --- 505 830 506 831 #[test] 507 - fn claim_resync_transitions_state_and_dequeues() { 832 + fn claim_from_host_respects_timestamps() { 508 833 let db = open_temporary().unwrap(); 509 834 pending_repo(&db, "did:web:a.com"); 510 - enqueue(&db, 100, &item("did:web:a.com", 0, "backfill", &[])).unwrap(); 835 + enqueue( 836 + &db, 837 + 200, 838 + &item("did:web:a.com", 0, "future", &[]), 839 + Some("pds.example.com"), 840 + ) 841 + .unwrap(); 842 + 843 + // now=200 means ts must be < 200 to be claimable. 844 + assert!( 845 + claim_from_host(&db, "pds.example.com", 200, &HashSet::new()) 846 + .unwrap() 847 + .is_none() 848 + ); 849 + assert!( 850 + claim_from_host(&db, "pds.example.com", 199, &HashSet::new()) 851 + .unwrap() 852 + .is_none() 853 + ); 511 854 512 - let (claimed, _cursor) = claim_resync(&db, 101, None, &HashSet::new()) 855 + // now=201 should claim it. 856 + let claimed = claim_from_host(&db, "pds.example.com", 201, &HashSet::new()) 513 857 .unwrap() 514 858 .unwrap(); 515 - assert_eq!(claimed.did.as_str(), "did:web:a.com"); 516 - 517 - assert!(dequeue_ready(&db, 9999, None).unwrap().is_none()); 518 - 519 - let (info, _) = repo::get(&db, &did("did:web:a.com")).unwrap().unwrap(); 520 - assert_eq!(info.state, repo::RepoState::Resyncing); 859 + assert_eq!(claimed.item.retry_reason, "future"); 521 860 } 522 861 523 862 #[test] 524 - fn claim_resync_preserves_account_status() { 863 + fn claim_from_host_preserves_account_status() { 525 864 let db = open_temporary().unwrap(); 526 865 repo::put_info( 527 866 &db, ··· 533 872 }, 534 873 ) 535 874 .unwrap(); 536 - enqueue(&db, 100, &item("did:web:a.com", 0, "backfill", &[])).unwrap(); 875 + enqueue( 876 + &db, 877 + 100, 878 + &item("did:web:a.com", 0, "backfill", &[]), 879 + Some("pds.example.com"), 880 + ) 881 + .unwrap(); 537 882 538 - claim_resync(&db, 101, None, &HashSet::new()) 883 + claim_from_host(&db, "pds.example.com", 101, &HashSet::new()) 539 884 .unwrap() 540 885 .unwrap(); 541 886 542 887 let (info, _) = repo::get(&db, &did("did:web:a.com")).unwrap().unwrap(); 543 888 assert_eq!(info.status, repo::AccountStatus::Suspended); 889 + assert_eq!(info.state, repo::RepoState::Resyncing); 544 890 } 545 891 892 + // --- migrate_legacy --- 893 + 546 894 #[test] 547 - fn claim_resync_skips_busy_dids() { 895 + fn migrate_legacy_converts_old_entries() { 548 896 let db = open_temporary().unwrap(); 549 - pending_repo(&db, "did:web:a.com"); 550 - pending_repo(&db, "did:web:b.com"); 551 - enqueue(&db, 100, &item("did:web:a.com", 0, "first", &[])).unwrap(); 552 - enqueue(&db, 101, &item("did:web:b.com", 0, "second", &[])).unwrap(); 553 897 554 - let mut busy: HashSet<Did<'static>> = HashSet::new(); 555 - busy.insert(did("did:web:a.com")); 898 + // Manually insert an old-format "rsq" entry. 899 + let old_did = did("did:web:old.com"); 900 + let ts = 42u64; 901 + let old_item = item("did:web:old.com", 1, "gap", &[0xFF]); 902 + let mut old_key = PREFIX_RESYNC_QUEUE.to_vec(); 903 + old_key.extend_from_slice(&ts.to_be_bytes()); 904 + old_key.push(b'\0'); 905 + old_key.extend_from_slice(old_did.as_str().as_bytes()); 906 + db.ks.insert(&old_key, encode(&old_item)).unwrap(); 907 + 908 + let count = migrate_legacy(&db).unwrap(); 909 + assert_eq!(count, 1); 910 + 911 + // Old key should be gone. 912 + assert!(db.ks.get(&old_key).unwrap().is_none()); 913 + 914 + // Rebuild host index so next_host works. 915 + rebuild_host_index(&db).unwrap(); 556 916 557 - let (claimed, _) = claim_resync(&db, 9999, None, &busy).unwrap().unwrap(); 558 - assert_eq!(claimed.did.as_str(), "did:web:b.com"); 917 + // Should be claimable from empty-string host. 918 + pending_repo(&db, "did:web:old.com"); 919 + let claimed = claim_from_host(&db, "", 9999, &HashSet::new()) 920 + .unwrap() 921 + .unwrap(); 922 + assert_eq!(claimed.item.did.as_str(), "did:web:old.com"); 923 + assert_eq!(claimed.item.retry_count, 1); 924 + assert_eq!(claimed.item.retry_reason, "gap"); 925 + assert_eq!(claimed.item.commit_cbor, vec![0xFF]); 926 + assert_eq!(claimed.host, ""); 559 927 } 560 928 561 929 #[test] 562 - fn claim_resync_returns_none_when_all_ready_are_busy() { 930 + fn migrate_legacy_noop_when_no_old_entries() { 563 931 let db = open_temporary().unwrap(); 564 - pending_repo(&db, "did:web:a.com"); 565 - enqueue(&db, 100, &item("did:web:a.com", 0, "only", &[])).unwrap(); 932 + let count = migrate_legacy(&db).unwrap(); 933 + assert_eq!(count, 0); 934 + } 935 + 936 + // --- rebuild_host_index --- 937 + 938 + #[test] 939 + fn rebuild_host_index_from_disk() { 940 + let db = open_temporary().unwrap(); 941 + enqueue( 942 + &db, 943 + 100, 944 + &item("did:web:a.com", 0, "r1", &[]), 945 + Some("alpha.example.com"), 946 + ) 947 + .unwrap(); 948 + enqueue( 949 + &db, 950 + 101, 951 + &item("did:web:b.com", 0, "r2", &[]), 952 + Some("alpha.example.com"), 953 + ) 954 + .unwrap(); 955 + enqueue( 956 + &db, 957 + 102, 958 + &item("did:web:c.com", 0, "r3", &[]), 959 + Some("beta.example.com"), 960 + ) 961 + .unwrap(); 566 962 567 - let mut busy: HashSet<Did<'static>> = HashSet::new(); 568 - busy.insert(did("did:web:a.com")); 963 + // Clear the in-memory index. 964 + { 965 + let mut idx = db.host_index.lock().unwrap(); 966 + *idx = HostIndex::new(); 967 + } 968 + 969 + rebuild_host_index(&db).unwrap(); 569 970 570 - assert!(claim_resync(&db, 9999, None, &busy).unwrap().is_none()); 971 + let idx = db.host_index.lock().unwrap(); 972 + assert_eq!(idx.hosts.len(), 2); 973 + assert!(idx.hosts.contains("alpha.example.com")); 974 + assert!(idx.hosts.contains("beta.example.com")); 975 + assert_eq!(idx.host_counts["alpha.example.com"], 2); 976 + assert_eq!(idx.host_counts["beta.example.com"], 1); 571 977 } 978 + 979 + // --- count_queued --- 572 980 573 981 #[test] 574 - fn consecutive_dequeues_drain_in_order() { 982 + fn count_queued_counts_all_hosts() { 575 983 let db = open_temporary().unwrap(); 576 - enqueue(&db, 10, &item("did:web:a.com", 0, "first", &[])).unwrap(); 577 - enqueue(&db, 20, &item("did:web:b.com", 0, "second", &[])).unwrap(); 578 - enqueue(&db, 30, &item("did:web:c.com", 0, "third", &[])).unwrap(); 579 - 580 - let (a, _) = dequeue_ready(&db, 9999, None).unwrap().unwrap(); 581 - let (b, _) = dequeue_ready(&db, 9999, None).unwrap().unwrap(); 582 - let (c, _) = dequeue_ready(&db, 9999, None).unwrap().unwrap(); 583 - assert!(dequeue_ready(&db, 9999, None).unwrap().is_none()); 984 + enqueue( 985 + &db, 986 + 100, 987 + &item("did:web:a.com", 0, "r1", &[]), 988 + Some("alpha.example.com"), 989 + ) 990 + .unwrap(); 991 + enqueue( 992 + &db, 993 + 101, 994 + &item("did:web:b.com", 0, "r2", &[]), 995 + Some("beta.example.com"), 996 + ) 997 + .unwrap(); 998 + enqueue(&db, 102, &item("did:web:c.com", 0, "r3", &[]), None).unwrap(); 584 999 585 - assert_eq!(a.retry_reason, "first"); 586 - assert_eq!(b.retry_reason, "second"); 587 - assert_eq!(c.retry_reason, "third"); 1000 + assert_eq!(count_queued(&db), 3); 588 1001 } 589 1002 }
+1 -1
src/sync/backfill.rs
··· 280 280 retry_reason: "backfill".to_string(), 281 281 commit_cbor: vec![], 282 282 }; 283 - storage::resync_queue::enqueue(db, now, &item)?; 283 + storage::resync_queue::enqueue(db, now, &item, Some(&host.to_string()))?; 284 284 db.stats.repos_queued_total.fetch_add(1, Ordering::Relaxed); 285 285 count += 1; 286 286 }
+8 -2
src/sync/firehose/commit_event.rs
··· 296 296 error: None, 297 297 }, 298 298 ); 299 + let host_str = pds_host.as_ref().map(|h| h.to_string()); 299 300 storage::resync_queue::enqueue_into( 300 301 &mut batch, 301 302 db, ··· 306 307 retry_reason: "first firehose event for unknown repo".to_string(), 307 308 commit_cbor: vec![], 308 309 }, 310 + host_str.as_deref(), 309 311 ); 310 312 batch 311 313 .commit() ··· 398 400 warn!(did = %did.as_str(), 399 401 commit_since = since.as_str(), prev_rev = prev.rev.as_str(), 400 402 "commit dropped: since/rev mismatch; queueing resync"); 401 - return enqueue_desync(db, did, info.status, "since/rev mismatch"); 403 + return enqueue_desync(db, did, info.status, "since/rev mismatch", pds_host.as_ref().map(ToString::to_string).as_deref()); 402 404 } 403 405 404 406 // Step 9: `prevData` must match the MST root stored from the last commit. ··· 410 412 .increment(1); 411 413 warn!(did = %did.as_str(), 412 414 "commit dropped: prevData mismatch; queueing resync"); 413 - return enqueue_desync(db, did, info.status, "prevData mismatch"); 415 + return enqueue_desync(db, did, info.status, "prevData mismatch", pds_host.as_ref().map(ToString::to_string).as_deref()); 414 416 } 415 417 } 416 418 ··· 501 503 .map(|(k, _)| k) 502 504 .collect(); 503 505 error!(did = %did, maybe_deaths = ?maybes, "queuing resync due to unprovable death"); 506 + let host_str = pds_host.as_ref().map(ToString::to_string); 504 507 storage::resync_queue::enqueue_into( 505 508 &mut batch, 506 509 db, ··· 511 514 retry_reason: "unprovable_death".to_string(), 512 515 commit_cbor: vec![], 513 516 }, 517 + host_str.as_deref(), 514 518 ); 515 519 } 516 520 ··· 566 570 did: Did<'static>, 567 571 existing_status: AccountStatus, 568 572 reason: &str, 573 + pds_host: Option<&str>, 569 574 ) -> crate::error::Result<()> { 570 575 let mut batch = db.database.batch(); 571 576 storage::repo::put_info_into( ··· 588 593 retry_reason: reason.to_string(), 589 594 commit_cbor: vec![], 590 595 }, 596 + pds_host, 591 597 ); 592 598 batch 593 599 .commit()
+2
src/sync/firehose/sync_event.rs
··· 238 238 error: None, 239 239 }, 240 240 ); 241 + let host_str = pds_host.as_ref().map(|h| h.to_string()); 241 242 storage::resync_queue::enqueue_into( 242 243 &mut batch, 243 244 db, ··· 248 249 retry_reason: "#sync".to_string(), 249 250 commit_cbor: blocks, 250 251 }, 252 + host_str.as_deref(), 251 253 ); 252 254 batch 253 255 .commit()
+240 -123
src/sync/resync/dispatcher.rs
··· 18 18 use super::{GetCollectionsError, ResyncError}; 19 19 20 20 use tokio::task::{Id as TaskId, JoinSet}; 21 - use tracing::{debug, error, info, trace, warn}; 21 + use tracing::{error, info, trace, warn}; 22 22 23 23 use std::sync::atomic::Ordering; 24 24 ··· 44 44 pub db: DbRef, 45 45 pub client: crate::http::ThrottledClient, 46 46 pub max_concurrent: usize, 47 + /// Maximum resync workers targeting a single PDS host. Caps per-host 48 + /// concurrency so the dispatcher spreads work across hosts rather than 49 + /// piling workers onto the biggest PDS. Items for saturated hosts are 50 + /// re-enqueued with a short delay. 51 + pub max_per_host: usize, 47 52 pub describe_timeout: std::time::Duration, 48 53 pub get_repo_timeout: std::time::Duration, 49 54 pub token: tokio_util::sync::CancellationToken, ··· 61 66 db, 62 67 client, 63 68 max_concurrent, 69 + max_per_host, 64 70 describe_timeout, 65 71 get_repo_timeout, 66 72 token, ··· 69 75 ) -> Result<()> { 70 76 let mut busy: HashSet<Did<'static>> = HashSet::new(); 71 77 let mut task_dids: HashMap<TaskId, Did<'static>> = HashMap::new(); 72 - let mut since: Option<Vec<u8>> = None; 78 + // Track which host each task is targeting so we can decrement host_workers on completion. 79 + let mut task_hosts: HashMap<TaskId, String> = HashMap::new(); 73 80 let mut workers: JoinSet<WorkerOutcome> = JoinSet::new(); 74 81 // Maps PDS hostname → Instant when the 429 cooldown expires. 75 82 let mut cooling_hosts: HashMap<String, Instant> = HashMap::new(); 83 + // Active worker count per PDS host, for spreading work across hosts. 84 + let mut host_workers: HashMap<String, usize> = HashMap::new(); 76 85 77 - info!(max_concurrent, "resync dispatcher started"); 86 + info!(max_concurrent, max_per_host, "resync dispatcher started"); 78 87 79 88 loop { 80 - // Fill up to max_concurrent slots from the queue. 89 + // ── Step 1: drain all already-completed workers (non-blocking). ── 90 + while let Some(join_result) = workers.try_join_next_with_id() { 91 + drain_one( 92 + join_result, 93 + &mut task_dids, 94 + &mut task_hosts, 95 + &mut busy, 96 + &mut cooling_hosts, 97 + &mut host_workers, 98 + &db, 99 + &resolver, 100 + &client, 101 + ); 102 + } 103 + 104 + // ── Step 2: fill empty slots via round-robin host selection. ── 105 + let mut filled = false; 106 + // Build the skip set: cooling hosts + hosts at their per-host cap. 107 + let mut skip_hosts: HashSet<String> = cooling_hosts 108 + .iter() 109 + .filter(|&(_, &until)| Instant::now() < until) 110 + .map(|(h, _)| h.clone()) 111 + .collect(); 112 + for (h, &count) in &host_workers { 113 + if count >= max_per_host { 114 + skip_hosts.insert(h.clone()); 115 + } 116 + } 117 + // Clean up expired cooling entries while we're at it. 118 + cooling_hosts.retain(|_, until| Instant::now() < *until); 119 + 81 120 while workers.len() < max_concurrent { 121 + // Interleave draining so completed tasks are recycled immediately. 122 + while let Some(join_result) = workers.try_join_next_with_id() { 123 + drain_one( 124 + join_result, 125 + &mut task_dids, 126 + &mut task_hosts, 127 + &mut busy, 128 + &mut cooling_hosts, 129 + &mut host_workers, 130 + &db, 131 + &resolver, 132 + &client, 133 + ); 134 + } 135 + 136 + // Pick the next host in round-robin order, skipping 137 + // cooling/saturated hosts. 138 + let Some(host) = storage::resync_queue::next_host(&db, &skip_hosts) else { 139 + break; // all hosts exhausted or empty 140 + }; 141 + 82 142 let now = unix_now(); 83 143 let claim = { 84 144 let db = db.clone(); 85 - let since = since.clone(); 145 + let host = host.clone(); 86 146 let busy_snapshot = busy.clone(); 87 147 tokio::task::spawn_blocking(move || { 88 - storage::resync_queue::claim_resync(&db, now, since, &busy_snapshot) 148 + storage::resync_queue::claim_from_host(&db, &host, now, &busy_snapshot) 89 149 }) 90 150 .await 91 151 }; 92 152 match claim { 93 153 Err(panic) => { 94 - error!(error = %panic, "claim_resync task panicked; pausing"); 154 + error!(error = %panic, "claim_from_host task panicked; pausing"); 95 155 break; 96 156 } 97 - Ok(Ok(Some((item, cursor)))) => { 98 - since = Some(cursor); 99 - let did = item.did.clone(); 157 + Ok(Ok(Some(claimed))) => { 158 + let did = claimed.item.did.clone(); 100 159 busy.insert(did.clone()); 160 + *host_workers.entry(host.clone()).or_insert(0) += 1; 101 161 102 - // Check whether this item's PDS host is still cooling down 103 - // after a 429. The resolver cache makes this a local lookup 104 - // for any DID we've seen recently. 105 - if let Ok(resolved) = resolver.resolve(&did).await { 106 - let host = resolved.pds.host_str().unwrap_or("").to_string(); 107 - match cooling_hosts.get(&host) { 108 - Some(&until) if Instant::now() < until => { 109 - let remaining = 110 - until.duration_since(Instant::now()).as_secs().max(1); 111 - let db = db.clone(); 112 - let ts = unix_now() + remaining; 113 - match tokio::task::spawn_blocking(move || { 114 - storage::resync_queue::enqueue(&db, ts, &item) 115 - }) 116 - .await 117 - { 118 - Ok(Ok(())) => {} 119 - Ok(Err(e)) => error!(error = %e, did = %did, 120 - "failed to defer during host cooldown"), 121 - Err(e) => error!(error = %e, did = %did, 122 - "failed to defer during host cooldown (panic)"), 123 - } 124 - busy.remove(&did); 125 - debug!(did = %did, host = %host, remaining_secs = remaining, 126 - "deferring; PDS host cooling down after 429"); 127 - break; 128 - } 129 - Some(_) => { 130 - // Cooldown expired; clean up the entry. 131 - cooling_hosts.remove(&host); 132 - } 133 - None => {} 134 - } 162 + // Update skip set if this host is now at cap. 163 + if host_workers.get(&host).copied().unwrap_or(0) >= max_per_host { 164 + skip_hosts.insert(host.clone()); 135 165 } 166 + 167 + metrics::gauge!("lightrail_resync_hosts_active") 168 + .set(host_workers.values().filter(|&&v| v > 0).count() as f64); 136 169 137 170 let client = client.clone(); 138 171 let resolver = resolver.clone(); 139 172 let db = db.clone(); 140 173 let handle = workers.spawn(async move { 141 174 run_worker( 142 - item, 175 + claimed.item, 143 176 &resolver, 144 177 &client, 145 178 &db, ··· 150 183 .await 151 184 }); 152 185 task_dids.insert(handle.id(), did.clone()); 153 - metrics::gauge!("lightrail_resync_workers").set(workers.len() as f64); 186 + task_hosts.insert(handle.id(), host); 187 + filled = true; 154 188 trace!(did = %did, running = workers.len(), "spawned resync worker"); 155 189 } 156 - Ok(Ok(None)) => break, // queue empty or all ready items busy 190 + Ok(Ok(None)) => { 191 + // This host's sub-queue is empty or all items are 192 + // busy/future-dated. Skip it for the rest of this round. 193 + skip_hosts.insert(host); 194 + continue; 195 + } 157 196 Ok(Err(e)) => { 158 197 error!(error = %e, "error claiming resync job; pausing"); 159 198 break; ··· 161 200 } 162 201 } 163 202 203 + metrics::gauge!("lightrail_resync_workers").set(workers.len() as f64); 204 + 205 + // ── Step 3: wait for progress. ── 164 206 if workers.is_empty() { 165 - // Nothing running, nothing claimable right now. Yield before polling. 207 + // Nothing running, nothing claimable. Yield before retrying. 166 208 if !token.sleep(Duration::from_millis(IDLE_POLL_MS)).await { 167 209 return Ok(()); 168 - }; 210 + } 169 211 continue; 170 212 } 171 213 172 - // Wait for the next worker to finish, then loop to try claiming more. 173 - let Some(next) = token.run(workers.join_next_with_id()).await else { 174 - return Ok(()); 175 - }; 176 - if let Some(join_result) = next { 177 - // JoinError also carries the task ID, so we can recover the DID 178 - // regardless of whether the task completed normally or panicked. 179 - let (task_id, worker_result) = match join_result { 180 - Ok((id, outcome)) => (id, Ok(outcome)), 181 - Err(e) => (e.id(), Err(e)), 214 + if workers.len() >= max_concurrent { 215 + // At capacity — must wait for a completion before we can fill. 216 + let Some(join_result) = token.run(workers.join_next_with_id()).await else { 217 + return Ok(()); 182 218 }; 183 - let did = task_dids 184 - .remove(&task_id) 185 - .expect("BUG: resync task completed for DID not in task_dids"); 186 - assert!( 187 - busy.remove(&did), 188 - "BUG: resync task completed for DID not in busy set" 189 - ); 190 - 191 - metrics::gauge!("lightrail_resync_workers").set(workers.len() as f64); 192 - 193 - match worker_result { 194 - Ok(WorkerOutcome::RateLimited { host, retry_count }) => { 195 - metrics::counter!("lightrail_resync_completed_total", 196 - "outcome" => "rate_limited") 197 - .increment(1); 198 - cooling_hosts.insert( 199 - host.clone(), 200 - Instant::now() + Duration::from_secs(RATE_LIMIT_COOLDOWN_SECS), 201 - ); 202 - warn!(did = %did, host = %host, cooldown_secs = RATE_LIMIT_COOLDOWN_SECS, 203 - "PDS rate-limited; cooling down"); 204 - let item = ResyncItem { 205 - did: did.clone(), 206 - retry_count, 207 - retry_reason: format!("rate limited by {host}"), 208 - commit_cbor: vec![], 209 - }; 210 - let db = db.clone(); 211 - let ts = unix_now() + RATE_LIMIT_COOLDOWN_SECS; 212 - match tokio::task::spawn_blocking(move || { 213 - storage::resync_queue::enqueue(&db, ts, &item) 214 - }) 215 - .await 216 - { 217 - Ok(Ok(())) => {} 218 - Ok(Err(e)) => error!(error = %e, did = %did, 219 - "failed to re-enqueue after rate limit"), 220 - Err(e) => error!(error = %e, did = %did, 221 - "failed to re-enqueue after rate limit (panic)"), 219 + if let Some(join_result) = join_result { 220 + drain_one( 221 + join_result, 222 + &mut task_dids, 223 + &mut task_hosts, 224 + &mut busy, 225 + &mut cooling_hosts, 226 + &mut host_workers, 227 + &db, 228 + &resolver, 229 + &client, 230 + ); 231 + } 232 + } else if filled { 233 + // Below capacity but we just spawned workers — loop immediately 234 + // to drain completions and fill more slots without blocking. 235 + continue; 236 + } else { 237 + // Below capacity, queue exhausted for now. Wait for either a 238 + // completion (which frees busy-set entries, unblocking queue 239 + // items) or a short timeout (for future-dated items to ripen). 240 + tokio::select! { 241 + biased; 242 + result = workers.join_next_with_id() => { 243 + if let Some(join_result) = result { 244 + drain_one( 245 + join_result, 246 + &mut task_dids, 247 + &mut task_hosts, 248 + &mut busy, 249 + &mut cooling_hosts, 250 + &mut host_workers, 251 + &db, 252 + &resolver, 253 + &client, 254 + ); 222 255 } 223 256 } 224 - Ok(outcome) => { 225 - if let Err(e) = handle_completion( 226 - did.clone(), 227 - outcome, 228 - db.clone(), 229 - resolver.clone(), 230 - client.clone(), 231 - ) 232 - .await 233 - { 234 - error!(error = %e, did = %did, "error handling worker completion"); 235 - } 236 - } 237 - Err(e) => { 238 - // Worker panicked. The queue entry was already removed when 239 - // claimed, so we re-enqueue to avoid silently dropping the job. 240 - metrics::counter!("lightrail_resync_completed_total", 241 - "outcome" => "panic") 242 - .increment(1); 243 - error!(error = %e, did = %did, "resync worker panicked; re-enqueueing"); 244 - re_enqueue_panic_recovery(did, db.clone()).await; 257 + cont = token.sleep(Duration::from_millis(IDLE_POLL_MS)) => { 258 + if !cont { return Ok(()); } 245 259 } 246 260 } 247 261 } ··· 271 285 }, 272 286 } 273 287 288 + /// Process a single joined worker result: update bookkeeping and spawn 289 + /// background completion handling. 290 + fn drain_one( 291 + join_result: std::result::Result<(TaskId, WorkerOutcome), tokio::task::JoinError>, 292 + task_dids: &mut HashMap<TaskId, Did<'static>>, 293 + task_hosts: &mut HashMap<TaskId, String>, 294 + busy: &mut HashSet<Did<'static>>, 295 + cooling_hosts: &mut HashMap<String, Instant>, 296 + host_workers: &mut HashMap<String, usize>, 297 + db: &DbRef, 298 + resolver: &std::sync::Arc<crate::identity::Resolver>, 299 + client: &crate::http::ThrottledClient, 300 + ) { 301 + let (task_id, worker_result) = match join_result { 302 + Ok((id, outcome)) => (id, Ok(outcome)), 303 + Err(e) => (e.id(), Err(e)), 304 + }; 305 + let did = task_dids 306 + .remove(&task_id) 307 + .expect("BUG: resync task completed for DID not in task_dids"); 308 + assert!( 309 + busy.remove(&did), 310 + "BUG: resync task completed for DID not in busy set" 311 + ); 312 + let task_host = task_hosts.remove(&task_id).unwrap_or_default(); 313 + if !task_host.is_empty() { 314 + if let Some(count) = host_workers.get_mut(&task_host) { 315 + *count = count.saturating_sub(1); 316 + } 317 + } 318 + 319 + match worker_result { 320 + Ok(WorkerOutcome::RateLimited { host, retry_count }) => { 321 + metrics::counter!("lightrail_resync_completed_total", 322 + "outcome" => "rate_limited") 323 + .increment(1); 324 + cooling_hosts.insert( 325 + host.clone(), 326 + Instant::now() + Duration::from_secs(RATE_LIMIT_COOLDOWN_SECS), 327 + ); 328 + warn!(did = %did, host = %host, cooldown_secs = RATE_LIMIT_COOLDOWN_SECS, 329 + "PDS rate-limited; cooling down"); 330 + let item = ResyncItem { 331 + did: did.clone(), 332 + retry_count, 333 + retry_reason: format!("rate limited by {host}"), 334 + commit_cbor: vec![], 335 + }; 336 + let db = db.clone(); 337 + let ts = unix_now() + RATE_LIMIT_COOLDOWN_SECS; 338 + let host_c = host.clone(); 339 + tokio::spawn(async move { 340 + match tokio::task::spawn_blocking(move || { 341 + storage::resync_queue::enqueue(&db, ts, &item, Some(host_c.as_str())) 342 + }) 343 + .await 344 + { 345 + Ok(Ok(())) => {} 346 + Ok(Err(e)) => error!(error = %e, did = %did, 347 + "failed to re-enqueue after rate limit"), 348 + Err(e) => error!(error = %e, did = %did, 349 + "failed to re-enqueue after rate limit (panic)"), 350 + } 351 + }); 352 + } 353 + Ok(outcome) => { 354 + let db = db.clone(); 355 + let resolver = resolver.clone(); 356 + let client = client.clone(); 357 + tokio::spawn(async move { 358 + if let Err(e) = handle_completion( 359 + did.clone(), 360 + outcome, 361 + db, 362 + resolver, 363 + client, 364 + task_host, 365 + ) 366 + .await 367 + { 368 + error!(error = %e, did = %did, "error handling worker completion"); 369 + } 370 + }); 371 + } 372 + Err(e) => { 373 + metrics::counter!("lightrail_resync_completed_total", 374 + "outcome" => "panic") 375 + .increment(1); 376 + error!(error = %e, did = %did, "resync worker panicked; re-enqueueing"); 377 + let db = db.clone(); 378 + tokio::spawn(async move { 379 + re_enqueue_panic_recovery(did, db).await; 380 + }); 381 + } 382 + } 383 + } 384 + 274 385 async fn run_worker( 275 386 item: ResyncItem, 276 387 resolver: &crate::identity::Resolver, ··· 280 391 get_repo_timeout: std::time::Duration, 281 392 force_get_repo: bool, 282 393 ) -> WorkerOutcome { 394 + metrics::gauge!("lightrail_resync_workers_active").increment(1); 283 395 let did = item.did.clone(); 284 - match super::index_repo( 396 + let outcome = match super::index_repo( 285 397 client, 286 398 resolver, 287 399 item.did, ··· 319 431 retry_count: item.retry_count, 320 432 } 321 433 } 322 - } 434 + }; 435 + metrics::gauge!("lightrail_resync_workers_active").decrement(1); 436 + outcome 323 437 } 324 438 325 439 // --------------------------------------------------------------------------- ··· 332 446 db: DbRef, 333 447 resolver: std::sync::Arc<crate::identity::Resolver>, 334 448 client: crate::http::ThrottledClient, 449 + host: String, 335 450 ) -> Result<()> { 336 451 match outcome { 337 452 WorkerOutcome::Success => { ··· 367 482 commit_cbor: vec![], 368 483 }; 369 484 let ts = unix_now() + delay; 370 - tokio::task::spawn_blocking(move || storage::resync_queue::enqueue(&db, ts, &item)) 485 + let host_c = host.clone(); 486 + tokio::task::spawn_blocking(move || storage::resync_queue::enqueue(&db, ts, &item, if host_c.is_empty() { None } else { Some(host_c.as_str()) })) 371 487 .await??; 372 488 info!(did = %did, retry = new_retry, delay_secs = delay, "re-enqueued for retry"); 373 489 } ··· 383 499 commit_cbor: vec![], 384 500 }; 385 501 let ts = unix_now() + delay; 386 - tokio::task::spawn_blocking(move || storage::resync_queue::enqueue(&db, ts, &item)) 502 + let host_c = host; 503 + tokio::task::spawn_blocking(move || storage::resync_queue::enqueue(&db, ts, &item, if host_c.is_empty() { None } else { Some(host_c.as_str()) })) 387 504 .await??; 388 505 info!(did = %did, retry = new_retry, delay_secs = delay, 389 506 "re-enqueued after repo not found"); ··· 484 601 // Short delay so we don't immediately re-panic on the same broken input. 485 602 let db2 = db.clone(); 486 603 match tokio::task::spawn_blocking(move || { 487 - storage::resync_queue::enqueue(&db2, unix_now() + 60, &item) 604 + storage::resync_queue::enqueue(&db2, unix_now() + 60, &item, None) 488 605 }) 489 606 .await 490 607 {
+17 -3
src/sync/resync/mod.rs
··· 129 129 // spawn_blocking closures (which require 'static captures). 130 130 let did: Did<'static> = did.into_static(); 131 131 132 - let resolved = resolver.resolve(&did).await?; 132 + metrics::gauge!("lightrail_resync_phase", "phase" => "resolve").increment(1); 133 + let resolved = resolver.resolve(&did).await; 134 + metrics::gauge!("lightrail_resync_phase", "phase" => "resolve").decrement(1); 135 + let resolved = resolved?; 133 136 let base = &*resolved.pds; 134 137 138 + metrics::gauge!("lightrail_resync_phase", "phase" => "fetch").increment(1); 135 139 let repo_snapshot = match fetch_collections( 136 140 client, 137 141 base, ··· 142 146 ) 143 147 .await 144 148 { 145 - Ok(s) => s, 149 + Ok(s) => { 150 + metrics::gauge!("lightrail_resync_phase", "phase" => "fetch").decrement(1); 151 + s 152 + } 146 153 Err(GetCollectionsError::RepoNotFound) => { 154 + metrics::gauge!("lightrail_resync_phase", "phase" => "fetch").decrement(1); 147 155 info!(did = %did.as_str(), "repo not found on PDS; marking state"); 148 156 let db = db.clone(); 149 157 tokio::task::spawn_blocking(move || mark_not_found(&db, &did)) ··· 152 160 return Err(ResyncError::RepoNotFound); 153 161 } 154 162 Err(GetCollectionsError::RepoGone(reason)) => { 163 + metrics::gauge!("lightrail_resync_phase", "phase" => "fetch").decrement(1); 155 164 let (state, status) = match reason { 156 165 RepoGoneReason::Takendown => (RepoState::Takendown, AccountStatus::Takendown), 157 166 RepoGoneReason::Suspended => (RepoState::Suspended, AccountStatus::Suspended), ··· 164 173 .map_err(|e| ResyncError::TaskPanic(e.to_string()))??; 165 174 return Ok(()); 166 175 } 167 - Err(e) => return Err(ResyncError::Fetch(e)), 176 + Err(e) => { 177 + metrics::gauge!("lightrail_resync_phase", "phase" => "fetch").decrement(1); 178 + return Err(ResyncError::Fetch(e)); 179 + } 168 180 }; 169 181 182 + metrics::gauge!("lightrail_resync_phase", "phase" => "store").increment(1); 170 183 let db = db.clone(); 171 184 let collections = repo_snapshot.collections; 172 185 let rev = repo_snapshot.rev; ··· 177 190 }) 178 191 .await 179 192 .map_err(|e| ResyncError::TaskPanic(e.to_string()))??; 193 + metrics::gauge!("lightrail_resync_phase", "phase" => "store").decrement(1); 180 194 if n_inserted > 0 { 181 195 metrics::counter!("lightrail_collection_births_total", "source" => "resync") 182 196 .increment(n_inserted as u64);