lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

don't completely block discovery queue from db

...the db queue gets all clogged up with failing retries from discovery and then takes over priority so we wait foreverrrrrr

phil 76bde2b4 35731cdb

+125 -102
+125 -102
src/sync/resync/dispatcher.rs
··· 69 69 /// Shared handle to the latest dispatcher snapshot. 70 70 pub type DispatcherState = Arc<Mutex<DispatcherSnapshot>>; 71 71 72 + /// Tracks whether a worker was sourced from the in-memory discovery queue 73 + /// or from the on-disk retry queue, so the dispatcher can enforce per-source 74 + /// concurrency budgets. 75 + #[derive(Clone, Copy, PartialEq, Eq)] 76 + enum WorkSource { 77 + Discovery, 78 + DbQueue, 79 + } 80 + 72 81 /// How long to wait between queue polls when no workers are running. 73 82 const IDLE_POLL: Duration = Duration::from_millis(500); 74 83 ··· 111 120 }: DispatcherConfig, 112 121 ) -> Result<()> { 113 122 let mut busy: HashSet<Did<'static>> = HashSet::new(); 114 - let mut task_dids: HashMap<TaskId, Did<'static>> = HashMap::new(); 123 + let mut task_dids: HashMap<TaskId, (Did<'static>, WorkSource)> = HashMap::new(); 115 124 let mut since: Option<Vec<u8>> = None; 116 125 let mut workers: JoinSet<WorkerOutcome> = JoinSet::new(); 117 126 // Maps PDS URL → Instant when the 429 cooldown expires. ··· 119 128 120 129 info!(max_concurrent, "resync dispatcher started"); 121 130 122 - loop { 123 - // Fill up to max_concurrent slots from the queue. 124 - while workers.len() < max_concurrent { 125 - let now = SystemTime::now(); 126 - let claim = { 127 - let db = db.clone(); 128 - let since = since.clone(); 129 - let busy_snapshot = busy.clone(); 130 - tokio::task::spawn_blocking(move || { 131 - storage::resync_queue::claim_resync(&db, now, since, &busy_snapshot) 132 - }) 133 - .await 134 - }; 135 - match claim { 136 - Err(panic) => { 137 - error!(error = %panic, "claim_resync task panicked; pausing"); 138 - break; 139 - } 140 - Ok(Ok(Some((item, cursor)))) => { 141 - since = Some(cursor); 142 - let did = item.did.clone(); 143 - busy.insert(did.clone()); 144 - 145 - // Check whether this item's PDS host is still cooling down 146 - // after a 429. The resolver cache makes this a local lookup 147 - // for any DID we've seen recently. 148 - match resolver.resolve(&did).await { 149 - Ok(resolved) => { 150 - if let Some(&until) = cooling_hosts.get(&resolved.pds) { 151 - if Instant::now() < until { 152 - let remaining = until.duration_since(Instant::now()); 153 - let remaining = if remaining < Duration::from_secs(1) { 154 - Duration::from_secs(1) 155 - } else { 156 - remaining 157 - }; 158 - let db = db.clone(); 159 - let when = SystemTime::now() + remaining; 160 - let pds_host = 161 - resolved.pds.host_str().unwrap_or("unknown").to_string(); 162 - let remaining_ms = remaining.as_millis() as u64; 163 - match tokio::task::spawn_blocking(move || { 164 - storage::resync_queue::enqueue(&db, when, &item) 165 - }) 166 - .await 167 - { 168 - Ok(Ok(())) => {} 169 - Ok(Err(e)) => error!(error = %e, did = %did, 170 - "failed to defer during host cooldown"), 171 - Err(e) => error!(error = %e, did = %did, 172 - "failed to defer during host cooldown (panic)"), 173 - } 174 - busy.remove(&did); 175 - debug!(did = %did, host = %pds_host, remaining_ms, 176 - "deferring; PDS host cooling down after 429"); 177 - break; 178 - } else { 179 - cooling_hosts.remove(&resolved.pds); 180 - } 181 - } 182 - } 183 - Err(e) => { 184 - warn!(did = %did, error = %e, "DID resolution failed during claim; skipping"); 185 - busy.remove(&did); 186 - continue; 187 - } 188 - } 131 + // DB queue workers are capped at 1/3 of total slots so retries don't 132 + // starve fresh discovery work. 133 + let db_budget = (max_concurrent / 3).max(1); 189 134 190 - let worker_stuff = AppStuff { 191 - resolver: resolver.clone(), 192 - client: client.clone(), 193 - db: db.clone(), 194 - token: token.clone(), 195 - }; 196 - let handle = workers.spawn(async move { 197 - run_worker( 198 - item, 199 - worker_stuff, 200 - describe_timeout, 201 - get_repo_timeout, 202 - force_get_repo, 203 - ) 204 - .await 205 - }); 206 - task_dids.insert(handle.id(), did.clone()); 207 - metrics::gauge!("lightrail_resync_workers").set(workers.len() as f64); 208 - trace!(did = %did, running = workers.len(), "spawned resync worker"); 209 - } 210 - Ok(Ok(None)) => break, // queue empty or all ready items busy 211 - Ok(Err(e)) => { 212 - error!(error = %e, "error claiming resync job; pausing"); 213 - break; 214 - } 215 - } 216 - } 217 - 218 - // Disk queue exhausted — fill remaining slots from the memory 219 - // discovery queue (backfill / deep-crawl items). 135 + loop { 136 + // Phase 1: fill from the in-memory discovery queue first (priority). 220 137 if workers.len() < max_concurrent { 221 138 let now_inst = Instant::now(); 222 139 let cooling_set: HashSet<Arc<Url>> = cooling_hosts ··· 266 183 ) 267 184 .await 268 185 }); 269 - task_dids.insert(handle.id(), did.clone()); 186 + task_dids.insert(handle.id(), (did.clone(), WorkSource::Discovery)); 270 187 metrics::gauge!("lightrail_resync_workers").set(workers.len() as f64); 271 188 trace!(did = %did, running = workers.len(), "spawned resync worker (discovery)"); 272 189 } 273 190 } 274 191 192 + // Phase 2: fill remaining slots from the on-disk retry queue, 193 + // capped at db_budget to prevent retries from starving discovery. 194 + { 195 + let mut db_running = task_dids 196 + .values() 197 + .filter(|(_, s)| *s == WorkSource::DbQueue) 198 + .count(); 199 + while workers.len() < max_concurrent && db_running < db_budget { 200 + let now = SystemTime::now(); 201 + let claim = { 202 + let db = db.clone(); 203 + let since = since.clone(); 204 + let busy_snapshot = busy.clone(); 205 + tokio::task::spawn_blocking(move || { 206 + storage::resync_queue::claim_resync(&db, now, since, &busy_snapshot) 207 + }) 208 + .await 209 + }; 210 + match claim { 211 + Err(panic) => { 212 + error!(error = %panic, "claim_resync task panicked; pausing"); 213 + break; 214 + } 215 + Ok(Ok(Some((item, cursor)))) => { 216 + since = Some(cursor); 217 + let did = item.did.clone(); 218 + busy.insert(did.clone()); 219 + 220 + // Check whether this item's PDS host is still cooling down 221 + // after a 429. The resolver cache makes this a local lookup 222 + // for any DID we've seen recently. 223 + match resolver.resolve(&did).await { 224 + Ok(resolved) => { 225 + if let Some(&until) = cooling_hosts.get(&resolved.pds) { 226 + if Instant::now() < until { 227 + let remaining = until.duration_since(Instant::now()); 228 + let remaining = if remaining < Duration::from_secs(1) { 229 + Duration::from_secs(1) 230 + } else { 231 + remaining 232 + }; 233 + let db = db.clone(); 234 + let when = SystemTime::now() + remaining; 235 + let pds_host = resolved 236 + .pds 237 + .host_str() 238 + .unwrap_or("unknown") 239 + .to_string(); 240 + let remaining_ms = remaining.as_millis() as u64; 241 + match tokio::task::spawn_blocking(move || { 242 + storage::resync_queue::enqueue(&db, when, &item) 243 + }) 244 + .await 245 + { 246 + Ok(Ok(())) => {} 247 + Ok(Err(e)) => error!(error = %e, did = %did, 248 + "failed to defer during host cooldown"), 249 + Err(e) => error!(error = %e, did = %did, 250 + "failed to defer during host cooldown (panic)"), 251 + } 252 + busy.remove(&did); 253 + debug!(did = %did, host = %pds_host, remaining_ms, 254 + "deferring; PDS host cooling down after 429"); 255 + break; 256 + } else { 257 + cooling_hosts.remove(&resolved.pds); 258 + } 259 + } 260 + } 261 + Err(e) => { 262 + warn!(did = %did, error = %e, "DID resolution failed during claim; skipping"); 263 + busy.remove(&did); 264 + continue; 265 + } 266 + } 267 + 268 + let worker_stuff = AppStuff { 269 + resolver: resolver.clone(), 270 + client: client.clone(), 271 + db: db.clone(), 272 + token: token.clone(), 273 + }; 274 + let handle = workers.spawn(async move { 275 + run_worker( 276 + item, 277 + worker_stuff, 278 + describe_timeout, 279 + get_repo_timeout, 280 + force_get_repo, 281 + ) 282 + .await 283 + }); 284 + task_dids.insert(handle.id(), (did.clone(), WorkSource::DbQueue)); 285 + db_running += 1; 286 + metrics::gauge!("lightrail_resync_workers").set(workers.len() as f64); 287 + trace!(did = %did, running = workers.len(), "spawned resync worker"); 288 + } 289 + Ok(Ok(None)) => break, // queue empty or all ready items busy 290 + Ok(Err(e)) => { 291 + error!(error = %e, "error claiming resync job; pausing"); 292 + break; 293 + } 294 + } 295 + } 296 + } 297 + 275 298 if workers.is_empty() { 276 299 // Nothing running, nothing claimable right now. Wait for new 277 300 // items in the discovery queue or the next poll interval. ··· 295 318 Ok((id, outcome)) => (id, Ok(outcome)), 296 319 Err(e) => (e.id(), Err(e)), 297 320 }; 298 - let did = task_dids 321 + let (did, _source) = task_dids 299 322 .remove(&task_id) 300 323 .expect("BUG: resync task completed for DID not in task_dids"); 301 324 assert!( ··· 366 389 let snap = DispatcherSnapshot { 367 390 busy: task_dids 368 391 .values() 369 - .map(|did| { 392 + .map(|(did, _)| { 370 393 let host = resolver 371 394 .resolve_cached(did) 372 395 .and_then(|r| r.pds.host_str().map(str::to_owned)) ··· 390 413 hosts: { 391 414 // Count workers per host from task_dids via resolver cache 392 415 let mut counts: HashMap<String, usize> = HashMap::new(); 393 - for did in task_dids.values() { 416 + for (did, _) in task_dids.values() { 394 417 let host = resolver 395 418 .resolve_cached(did) 396 419 .and_then(|r| r.pds.host_str().map(str::to_owned))