lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

resync queue timing metrics

phil 35731cdb e96b7ed3

+21
+21
src/storage/resync_queue.rs
··· 208 208 now: SystemTime, 209 209 since: Option<Vec<u8>>, 210 210 ) -> StorageResult<Option<(ResyncItem, Vec<u8>)>> { 211 + let t0 = std::time::Instant::now(); 211 212 let now_ms = crate::util::to_millis(now); 212 213 let prefix = key_prefix_all(); 213 214 ··· 219 220 .range(prefixed_range(&prefix, lower_suffix..upper_suffix)) 220 221 .next() 221 222 else { 223 + metrics::histogram!("lightrail_resync_queue_op_seconds", "op" => "dequeue", "outcome" => "empty") 224 + .record(t0.elapsed().as_secs_f64()); 222 225 return Ok(None); 223 226 }; 224 227 ··· 239 242 let next_since = key_bytes 240 243 .get(prefix.len()..) 241 244 .expect("a resync queue key must start with the resync queue prefix"); 245 + metrics::histogram!("lightrail_resync_queue_op_seconds", "op" => "dequeue", "outcome" => "found") 246 + .record(t0.elapsed().as_secs_f64()); 242 247 Ok(Some((item, next_since.to_vec()))) 243 248 } 244 249 ··· 257 262 since: Option<Vec<u8>>, 258 263 busy: &HashSet<Did<'_>>, 259 264 ) -> StorageResult<Option<(ResyncItem, Vec<u8>)>> { 265 + let t0 = std::time::Instant::now(); 260 266 let now_ms = crate::util::to_millis(now); 261 267 let prefix = key_prefix_all(); 262 268 let lower_suffix = since.unwrap_or_default(); 263 269 let upper_suffix = key_ts_midfix(now_ms); 264 270 271 + let mut scanned: u64 = 0; 272 + 265 273 for guard in db 266 274 .ks 267 275 .range(prefixed_range(&prefix, lower_suffix..upper_suffix)) 268 276 { 277 + scanned += 1; 269 278 let (key_slice, val_slice) = guard.into_inner()?; 270 279 let key_bytes = key_slice.as_ref(); 271 280 let (_, did) = key_parse(key_bytes)?; ··· 274 283 debug!(did = did.as_str(), "skip busy did in resync queue"); 275 284 continue; 276 285 } 286 + 287 + let scan_elapsed = t0.elapsed(); 277 288 278 289 let key_str = String::from_utf8_lossy(key_bytes).into_owned(); 279 290 let item = decode(val_slice.as_ref(), &key_str, did.clone())?; ··· 303 314 }; 304 315 305 316 // Atomically: remove from queue + write state=Resyncing. 317 + let t_commit = std::time::Instant::now(); 306 318 let mut batch = db.database.batch(); 307 319 batch.remove_weak(&db.ks, key_bytes); 308 320 batch.insert(&db.ks, &repo_key, repo::encode_repo_info(&new_info)); ··· 315 327 retry = item.retry_count, 316 328 "claimed resync job" 317 329 ); 330 + 331 + metrics::histogram!("lightrail_resync_queue_op_seconds", "op" => "claim", "phase" => "scan") 332 + .record(scan_elapsed.as_secs_f64()); 333 + metrics::histogram!("lightrail_resync_queue_op_seconds", "op" => "claim", "phase" => "commit") 334 + .record(t_commit.elapsed().as_secs_f64()); 335 + metrics::histogram!("lightrail_resync_queue_claim_scanned").record(scanned as f64); 318 336 return Ok(Some((item, next_since))); 319 337 } 320 338 339 + metrics::histogram!("lightrail_resync_queue_op_seconds", "op" => "claim", "phase" => "scan") 340 + .record(t0.elapsed().as_secs_f64()); 341 + metrics::histogram!("lightrail_resync_queue_claim_scanned").record(scanned as f64); 321 342 Ok(None) 322 343 } 323 344