lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

use time types everywhere

phil 9f655176 ea66d22c

+327 -171
+2 -2
examples/enqueue_resync.rs
··· 4 4 use jacquard_common::types::string::Did; 5 5 6 6 use lightrail::storage::{self, resync_queue::ResyncItem}; 7 - use lightrail::util::unix_now_ms; 7 + use std::time::SystemTime; 8 8 9 9 #[derive(Parser, Debug)] 10 10 #[command( ··· 28 28 fn main() -> Result<(), Box<dyn std::error::Error>> { 29 29 let args = Args::parse(); 30 30 let db = storage::open(&args.db_path, 64)?; 31 - let now = unix_now_ms(); 31 + let now = SystemTime::now(); 32 32 33 33 for raw in &args.dids { 34 34 let did = Did::new_owned(raw.clone())?;
+8 -2
src/storage/meta.rs
··· 161 161 } 162 162 163 163 fn write_sketch(ks: &fjall::Keyspace, suffix: &[u8], sketch: &Mutex<Sk>) -> StorageResult<()> { 164 - let bytes = postcard::to_stdvec(&*sketch.lock().unwrap()).unwrap_or_default(); 164 + let bytes = match postcard::to_stdvec(&*sketch.lock().unwrap()) { 165 + Ok(b) => b, 166 + Err(e) => { 167 + tracing::error!(key = ?suffix, error = %e, "failed to serialize sketch; skipping write"); 168 + return Ok(()); 169 + } 170 + }; 165 171 ks.insert(full_key(suffix), bytes)?; 166 172 Ok(()) 167 173 } ··· 196 202 197 203 // Set first_startup_ms if this is the very first startup. 198 204 if stats.first_startup_ms.load(Ordering::Relaxed) == 0 { 199 - let now = crate::util::unix_now_ms(); 205 + let now = crate::util::to_millis(std::time::SystemTime::now()); 200 206 stats.first_startup_ms.store(now, Ordering::Relaxed); 201 207 write_u64(ks, K_FIRST_STARTUP, now)?; 202 208 }
+172 -43
src/storage/resync_queue.rs
··· 5 5 6 6 use std::collections::HashSet; 7 7 use std::sync::atomic::Ordering; 8 + use std::time::SystemTime; 8 9 9 10 use fjall::util::prefixed_range; 10 11 use jacquard_common::types::string::Did; ··· 146 147 // --------------------------------------------------------------------------- 147 148 148 149 /// queue a repo into a batch 149 - pub fn enqueue_into(batch: &mut fjall::OwnedWriteBatch, db: &DbRef, ts: u64, item: &ResyncItem) { 150 + pub fn enqueue_into( 151 + batch: &mut fjall::OwnedWriteBatch, 152 + db: &DbRef, 153 + when: SystemTime, 154 + item: &ResyncItem, 155 + ) { 156 + let ts = crate::util::to_millis(when); 150 157 if item.retry_reason == "backfill" { 151 158 trace!( 152 159 did = item.did.as_str(), ··· 175 182 db.ks.prefix(key_prefix_all()).count() 176 183 } 177 184 178 - /// Enqueue a repo for resync at the given Unix timestamp (seconds). 179 - pub fn enqueue(db: &DbRef, ts: u64, item: &ResyncItem) -> StorageResult<()> { 185 + /// Enqueue a repo for resync at the given time. 186 + pub fn enqueue(db: &DbRef, when: SystemTime, item: &ResyncItem) -> StorageResult<()> { 180 187 let mut batch = db.database.batch(); 181 - enqueue_into(&mut batch, db, ts, item); 188 + enqueue_into(&mut batch, db, when, item); 182 189 batch.commit()?; 183 190 Ok(()) 184 191 } ··· 198 205 /// cost once on startup. 199 206 pub fn dequeue_ready( 200 207 db: &DbRef, 201 - now: u64, 208 + now: SystemTime, 202 209 since: Option<Vec<u8>>, 203 210 ) -> StorageResult<Option<(ResyncItem, Vec<u8>)>> { 211 + let now_ms = crate::util::to_millis(now); 204 212 let prefix = key_prefix_all(); 205 213 206 214 let lower_suffix = since.unwrap_or(vec![]); 207 - let upper_suffix = key_ts_midfix(now); 215 + let upper_suffix = key_ts_midfix(now_ms); 208 216 209 217 let Some(guard) = db 210 218 .ks ··· 217 225 let (key_slice, val_slice) = guard.into_inner()?; 218 226 let key_bytes = key_slice.as_ref(); 219 227 let (ts, did) = key_parse(key_bytes)?; 220 - assert!(ts < now); 228 + assert!(ts < now_ms); 221 229 let key_str = String::from_utf8_lossy(key_bytes).into_owned(); 222 230 let item = decode(val_slice.as_ref(), &key_str, did)?; 223 231 debug!( ··· 245 253 /// [`dequeue_ready`]). Returns `None` if no claimable entry exists. 246 254 pub fn claim_resync( 247 255 db: &DbRef, 248 - now: u64, 256 + now: SystemTime, 249 257 since: Option<Vec<u8>>, 250 258 busy: &HashSet<Did<'_>>, 251 259 ) -> StorageResult<Option<(ResyncItem, Vec<u8>)>> { 260 + let now_ms = crate::util::to_millis(now); 252 261 let prefix = key_prefix_all(); 253 262 let lower_suffix = since.unwrap_or_default(); 254 - let upper_suffix = key_ts_midfix(now); 263 + let upper_suffix = key_ts_midfix(now_ms); 255 264 256 265 for guard in db 257 266 .ks ··· 322 331 323 332 use super::*; 324 333 use crate::storage::{open_temporary, repo}; 334 + use crate::util::from_millis; 325 335 326 336 fn did(s: &str) -> Did<'static> { 327 337 Did::new_owned(s).unwrap() ··· 434 444 #[test] 435 445 fn dequeue_returns_none_on_empty_queue() { 436 446 let db = open_temporary().unwrap(); 437 - assert!(dequeue_ready(&db, 9999, None).unwrap().is_none()); 447 + assert!( 448 + dequeue_ready(&db, from_millis(9999), None) 449 + .unwrap() 450 + .is_none() 451 + ); 438 452 } 439 453 440 454 #[test] 441 455 fn enqueue_and_dequeue_basic() { 442 456 let db = open_temporary().unwrap(); 443 - enqueue(&db, 100, &item("did:web:a.com", 0, "backfill", &[1, 2, 3])).unwrap(); 457 + enqueue( 458 + &db, 459 + from_millis(100), 460 + &item("did:web:a.com", 0, "backfill", &[1, 2, 3]), 461 + ) 462 + .unwrap(); 444 463 445 - let (got, _cursor) = dequeue_ready(&db, 101, None).unwrap().unwrap(); 464 + let (got, _cursor) = dequeue_ready(&db, from_millis(101), None).unwrap().unwrap(); 446 465 assert_eq!(got.did.as_str(), "did:web:a.com"); 447 466 assert_eq!(got.retry_reason, "backfill"); 448 467 assert_eq!(got.commit_cbor, vec![1, 2, 3]); 449 468 450 - assert!(dequeue_ready(&db, 101, None).unwrap().is_none()); 469 + assert!( 470 + dequeue_ready(&db, from_millis(101), None) 471 + .unwrap() 472 + .is_none() 473 + ); 451 474 } 452 475 453 476 #[test] 454 477 fn dequeue_excludes_items_at_or_after_now() { 455 478 let db = open_temporary().unwrap(); 456 - enqueue(&db, 100, &item("did:web:a.com", 0, "test", &[])).unwrap(); 479 + enqueue( 480 + &db, 481 + from_millis(100), 482 + &item("did:web:a.com", 0, "test", &[]), 483 + ) 484 + .unwrap(); 457 485 458 - assert!(dequeue_ready(&db, 100, None).unwrap().is_none()); 459 - assert!(dequeue_ready(&db, 99, None).unwrap().is_none()); 460 - assert!(dequeue_ready(&db, 101, None).unwrap().is_some()); 486 + assert!( 487 + dequeue_ready(&db, from_millis(100), None) 488 + .unwrap() 489 + .is_none() 490 + ); 491 + assert!(dequeue_ready(&db, from_millis(99), None).unwrap().is_none()); 492 + assert!( 493 + dequeue_ready(&db, from_millis(101), None) 494 + .unwrap() 495 + .is_some() 496 + ); 461 497 } 462 498 463 499 #[test] 464 500 fn dequeue_returns_oldest_entry_first() { 465 501 let db = open_temporary().unwrap(); 466 - enqueue(&db, 200, &item("did:web:b.com", 0, "later", &[])).unwrap(); 467 - enqueue(&db, 100, &item("did:web:a.com", 0, "earlier", &[])).unwrap(); 502 + enqueue( 503 + &db, 504 + from_millis(200), 505 + &item("did:web:b.com", 0, "later", &[]), 506 + ) 507 + .unwrap(); 508 + enqueue( 509 + &db, 510 + from_millis(100), 511 + &item("did:web:a.com", 0, "earlier", &[]), 512 + ) 513 + .unwrap(); 468 514 469 - let (first, _) = dequeue_ready(&db, 9999, None).unwrap().unwrap(); 515 + let (first, _) = dequeue_ready(&db, from_millis(9999), None) 516 + .unwrap() 517 + .unwrap(); 470 518 assert_eq!(first.retry_reason, "earlier"); 471 519 472 - let (second, _) = dequeue_ready(&db, 9999, None).unwrap().unwrap(); 520 + let (second, _) = dequeue_ready(&db, from_millis(9999), None) 521 + .unwrap() 522 + .unwrap(); 473 523 assert_eq!(second.retry_reason, "later"); 474 524 } 475 525 476 526 #[test] 477 527 fn since_cursor_skips_over_tombstone_region() { 478 528 let db = open_temporary().unwrap(); 479 - enqueue(&db, 10, &item("did:web:a.com", 0, "first", &[])).unwrap(); 480 - enqueue(&db, 20, &item("did:web:b.com", 0, "second", &[])).unwrap(); 529 + enqueue( 530 + &db, 531 + from_millis(10), 532 + &item("did:web:a.com", 0, "first", &[]), 533 + ) 534 + .unwrap(); 535 + enqueue( 536 + &db, 537 + from_millis(20), 538 + &item("did:web:b.com", 0, "second", &[]), 539 + ) 540 + .unwrap(); 481 541 482 - let (first, cursor) = dequeue_ready(&db, 9999, None).unwrap().unwrap(); 542 + let (first, cursor) = dequeue_ready(&db, from_millis(9999), None) 543 + .unwrap() 544 + .unwrap(); 483 545 assert_eq!(first.retry_reason, "first"); 484 546 485 - enqueue(&db, 5, &item("did:web:late.com", 0, "late", &[])).unwrap(); 547 + enqueue( 548 + &db, 549 + from_millis(5), 550 + &item("did:web:late.com", 0, "late", &[]), 551 + ) 552 + .unwrap(); 486 553 487 - let (second, _) = dequeue_ready(&db, 9999, Some(cursor)).unwrap().unwrap(); 554 + let (second, _) = dequeue_ready(&db, from_millis(9999), Some(cursor)) 555 + .unwrap() 556 + .unwrap(); 488 557 assert_eq!(second.retry_reason, "second"); 489 558 } 490 559 ··· 507 576 fn claim_resync_transitions_state_and_dequeues() { 508 577 let db = open_temporary().unwrap(); 509 578 pending_repo(&db, "did:web:a.com"); 510 - enqueue(&db, 100, &item("did:web:a.com", 0, "backfill", &[])).unwrap(); 579 + enqueue( 580 + &db, 581 + from_millis(100), 582 + &item("did:web:a.com", 0, "backfill", &[]), 583 + ) 584 + .unwrap(); 511 585 512 - let (claimed, _cursor) = claim_resync(&db, 101, None, &HashSet::new()) 586 + let (claimed, _cursor) = claim_resync(&db, from_millis(101), None, &HashSet::new()) 513 587 .unwrap() 514 588 .unwrap(); 515 589 assert_eq!(claimed.did.as_str(), "did:web:a.com"); 516 590 517 - assert!(dequeue_ready(&db, 9999, None).unwrap().is_none()); 591 + assert!( 592 + dequeue_ready(&db, from_millis(9999), None) 593 + .unwrap() 594 + .is_none() 595 + ); 518 596 519 597 let (info, _) = repo::get(&db, &did("did:web:a.com")).unwrap().unwrap(); 520 598 assert_eq!(info.state, repo::RepoState::Resyncing); ··· 533 611 }, 534 612 ) 535 613 .unwrap(); 536 - enqueue(&db, 100, &item("did:web:a.com", 0, "backfill", &[])).unwrap(); 614 + enqueue( 615 + &db, 616 + from_millis(100), 617 + &item("did:web:a.com", 0, "backfill", &[]), 618 + ) 619 + .unwrap(); 537 620 538 - claim_resync(&db, 101, None, &HashSet::new()) 621 + claim_resync(&db, from_millis(101), None, &HashSet::new()) 539 622 .unwrap() 540 623 .unwrap(); 541 624 ··· 548 631 let db = open_temporary().unwrap(); 549 632 pending_repo(&db, "did:web:a.com"); 550 633 pending_repo(&db, "did:web:b.com"); 551 - enqueue(&db, 100, &item("did:web:a.com", 0, "first", &[])).unwrap(); 552 - enqueue(&db, 101, &item("did:web:b.com", 0, "second", &[])).unwrap(); 634 + enqueue( 635 + &db, 636 + from_millis(100), 637 + &item("did:web:a.com", 0, "first", &[]), 638 + ) 639 + .unwrap(); 640 + enqueue( 641 + &db, 642 + from_millis(101), 643 + &item("did:web:b.com", 0, "second", &[]), 644 + ) 645 + .unwrap(); 553 646 554 647 let mut busy: HashSet<Did<'static>> = HashSet::new(); 555 648 busy.insert(did("did:web:a.com")); 556 649 557 - let (claimed, _) = claim_resync(&db, 9999, None, &busy).unwrap().unwrap(); 650 + let (claimed, _) = claim_resync(&db, from_millis(9999), None, &busy) 651 + .unwrap() 652 + .unwrap(); 558 653 assert_eq!(claimed.did.as_str(), "did:web:b.com"); 559 654 } 560 655 ··· 562 657 fn claim_resync_returns_none_when_all_ready_are_busy() { 563 658 let db = open_temporary().unwrap(); 564 659 pending_repo(&db, "did:web:a.com"); 565 - enqueue(&db, 100, &item("did:web:a.com", 0, "only", &[])).unwrap(); 660 + enqueue( 661 + &db, 662 + from_millis(100), 663 + &item("did:web:a.com", 0, "only", &[]), 664 + ) 665 + .unwrap(); 566 666 567 667 let mut busy: HashSet<Did<'static>> = HashSet::new(); 568 668 busy.insert(did("did:web:a.com")); 569 669 570 - assert!(claim_resync(&db, 9999, None, &busy).unwrap().is_none()); 670 + assert!( 671 + claim_resync(&db, from_millis(9999), None, &busy) 672 + .unwrap() 673 + .is_none() 674 + ); 571 675 } 572 676 573 677 #[test] 574 678 fn consecutive_dequeues_drain_in_order() { 575 679 let db = open_temporary().unwrap(); 576 - enqueue(&db, 10, &item("did:web:a.com", 0, "first", &[])).unwrap(); 577 - enqueue(&db, 20, &item("did:web:b.com", 0, "second", &[])).unwrap(); 578 - enqueue(&db, 30, &item("did:web:c.com", 0, "third", &[])).unwrap(); 680 + enqueue( 681 + &db, 682 + from_millis(10), 683 + &item("did:web:a.com", 0, "first", &[]), 684 + ) 685 + .unwrap(); 686 + enqueue( 687 + &db, 688 + from_millis(20), 689 + &item("did:web:b.com", 0, "second", &[]), 690 + ) 691 + .unwrap(); 692 + enqueue( 693 + &db, 694 + from_millis(30), 695 + &item("did:web:c.com", 0, "third", &[]), 696 + ) 697 + .unwrap(); 579 698 580 - let (a, _) = dequeue_ready(&db, 9999, None).unwrap().unwrap(); 581 - let (b, _) = dequeue_ready(&db, 9999, None).unwrap().unwrap(); 582 - let (c, _) = dequeue_ready(&db, 9999, None).unwrap().unwrap(); 583 - assert!(dequeue_ready(&db, 9999, None).unwrap().is_none()); 699 + let (a, _) = dequeue_ready(&db, from_millis(9999), None) 700 + .unwrap() 701 + .unwrap(); 702 + let (b, _) = dequeue_ready(&db, from_millis(9999), None) 703 + .unwrap() 704 + .unwrap(); 705 + let (c, _) = dequeue_ready(&db, from_millis(9999), None) 706 + .unwrap() 707 + .unwrap(); 708 + assert!( 709 + dequeue_ready(&db, from_millis(9999), None) 710 + .unwrap() 711 + .is_none() 712 + ); 584 713 585 714 assert_eq!(a.retry_reason, "first"); 586 715 assert_eq!(b.retry_reason, "second");
+15 -21
src/sync/backfill.rs
··· 31 31 backfill_progress::{BackfillProgress, get, set}, 32 32 resync_queue::ResyncItem, 33 33 }, 34 - util::{TokenExt, unix_now_ms}, 34 + util::TokenExt, 35 35 }; 36 36 use std::sync::atomic::Ordering; 37 + use std::time::SystemTime; 37 38 38 39 const PAGE_LIMIT: i64 = 500; 39 40 /// Delay between retry attempts after a transient page failure. ··· 43 44 /// Typical number of requests required to complete one repo resync 44 45 const REQUESTS_PER_RESYNC: u64 = 2; 45 46 46 - /// Walk the full `listRepos` feed for `host`, enqueuing newly discovered repos. 47 - /// 48 - /// Resumes from the last persisted cursor on restart. 4xx errors are treated 49 - /// as fatal and abort the walk immediately. Transient errors are retried up to 50 - /// `MAX_PAGE_FAILURES` times before giving up. Returns `Ok(true)` when the 51 - /// full walk completes, `Ok(false)` if cancelled or the host gives up. 52 - /// 53 47 /// Walk `listRepos` on `host` and enqueue new repos for resync. 54 48 /// 55 49 /// When `validate` is true (deep crawl / untrusted PDS), DIDs are verified to ··· 88 82 // same host are spread across time at 1/crawl_qps intervals. 89 83 // This prevents the timestamp-ordered queue from bunching all items for 90 84 // a popular host together. 91 - let host_interval_ms = 1000 / crawl_qps.get() as u64; 92 - let mut host_schedule: HashMap<Arc<Url>, u64> = HashMap::new(); // host → last scheduled ts (seconds) 85 + let host_interval = Duration::from_secs(1) / crawl_qps.get(); 86 + let mut host_schedule: HashMap<Arc<Url>, SystemTime> = HashMap::new(); 93 87 94 88 loop { 95 89 if token.is_cancelled() { ··· 103 97 }; 104 98 105 99 let page_len = dids.len(); 106 - let now = unix_now_ms(); 100 + let now = SystemTime::now(); 107 101 108 102 // For untrusted hosts (deep crawl), filter DIDs to those whose 109 103 // resolved PDS actually matches this host. ··· 146 140 dids_with_hosts, 147 141 progress_cursor, 148 142 now, 149 - host_interval_ms, 143 + host_interval, 150 144 schedule, 151 145 ) 152 146 }) ··· 184 178 &host_owned, 185 179 &BackfillProgress { 186 180 cursor: "".to_string(), 187 - completed_at: Some(now.to_string()), 181 + completed_at: Some(crate::util::to_millis(now).to_string()), 188 182 }, 189 183 ) 190 184 }) ··· 323 317 host: &Host, 324 318 items: Vec<(Did<'static>, Arc<Url>)>, 325 319 progress_cursor: String, 326 - now: u64, 327 - interval_ms: u64, 328 - mut host_schedule: HashMap<Arc<Url>, u64>, 329 - ) -> Result<(u64, HashMap<Arc<Url>, u64>)> { 320 + now: SystemTime, 321 + interval: Duration, 322 + mut host_schedule: HashMap<Arc<Url>, SystemTime>, 323 + ) -> Result<(u64, HashMap<Arc<Url>, SystemTime>)> { 330 324 let mut count: u64 = 0; 331 - let meta_interval = interval_ms * REQUESTS_PER_RESYNC; 325 + let meta_interval = interval * REQUESTS_PER_RESYNC as u32; 332 326 for (did, pds) in items { 333 327 let newly_inserted = storage::repo::ensure_repo(db, &did)?; 334 328 if newly_inserted { 335 329 let last = host_schedule.get(&pds).copied().unwrap_or(now); 336 - let ts = if last >= now { 330 + let when = if last >= now { 337 331 last + meta_interval 338 332 } else { 339 333 now 340 334 }; 341 - host_schedule.insert(pds, ts); 335 + host_schedule.insert(pds, when); 342 336 let item = ResyncItem { 343 337 did, 344 338 retry_count: 0, 345 339 retry_reason: "backfill".to_string(), 346 340 commit_cbor: vec![], 347 341 }; 348 - storage::resync_queue::enqueue(db, ts, &item)?; 342 + storage::resync_queue::enqueue(db, when, &item)?; 349 343 db.stats.repos_queued_total.fetch_add(1, Ordering::Relaxed); 350 344 count += 1; 351 345 }
+7 -7
src/sync/firehose/commit_event.rs
··· 60 60 let db2 = db.clone(); 61 61 let did2 = did.clone(); 62 62 let pds_host2 = pds_host.clone(); 63 - let now = crate::util::unix_now_ms(); 63 + let now = std::time::SystemTime::now(); 64 64 let step2 = 65 65 tokio::task::spawn_blocking(move || check_step2_blocking(&db2, did2, &rev, pds_host2, now)) 66 66 .await??; ··· 284 284 did: Did<'static>, 285 285 rev: &Tid, 286 286 pds_host: Option<Host>, 287 - now_ms: u64, 287 + now: std::time::SystemTime, 288 288 ) -> crate::error::Result<Step2Result> { 289 289 let Some((info, prev)) = storage::repo::get(db, &did)? else { 290 290 // Unknown repo — create an entry and enqueue for initial fetch so that ··· 304 304 storage::resync_queue::enqueue_into( 305 305 &mut batch, 306 306 db, 307 - now_ms, 307 + now, 308 308 &crate::storage::resync_queue::ResyncItem { 309 309 did, 310 310 retry_count: 0, ··· 327 327 if !info.status.is_active() { 328 328 return Ok(Step2Result::InactiveAccount(info, prev)); 329 329 } 330 - if validate::should_drop(&info, prev.as_ref(), rev, "commit", &did, now_ms) { 330 + if validate::should_drop(&info, prev.as_ref(), rev, "commit", &did, now) { 331 331 return Ok(Step2Result::Drop); 332 332 } 333 333 let mode = match pds_host { ··· 344 344 did: &Did<'static>, 345 345 rev: &Tid, 346 346 pds_host: Option<Host>, 347 - now: u64, 347 + now: std::time::SystemTime, 348 348 ) -> crate::error::Result<Step2Result> { 349 349 let Some((mut info, _)) = storage::repo::get(db, did)? else { 350 350 return Ok(Step2Result::Drop); ··· 510 510 storage::resync_queue::enqueue_into( 511 511 &mut batch, 512 512 db, 513 - crate::util::unix_now_ms(), 513 + std::time::SystemTime::now(), 514 514 &crate::storage::resync_queue::ResyncItem { 515 515 did: did.clone(), 516 516 retry_count: 0, ··· 587 587 storage::resync_queue::enqueue_into( 588 588 &mut batch, 589 589 db, 590 - crate::util::unix_now_ms(), 590 + std::time::SystemTime::now(), 591 591 &crate::storage::resync_queue::ResyncItem { 592 592 did: did.clone(), 593 593 retry_count: 0,
+4 -4
src/sync/firehose/sync_event.rs
··· 82 82 83 83 let pds_host: Option<Host> = resolved.pds.host().map(|h| h.to_owned()); 84 84 let blocks = sync.blocks.to_vec(); 85 - let now = crate::util::unix_now_ms(); 85 + let now = std::time::SystemTime::now(); 86 86 87 87 // ── Steps 2, 6, 7: Blocking storage checks + mark desync + enqueue ─────── 88 88 let outcome = { ··· 184 184 mst_root: Vec<u8>, 185 185 blocks: Vec<u8>, 186 186 pds_host: Option<Host>, 187 - now_ms: u64, 187 + now: std::time::SystemTime, 188 188 ) -> crate::error::Result<SyncCheckOutcome> { 189 189 // A #sync event is unambiguously sync1.1 — upgrade the PDS mode eagerly, 190 190 // before any drop checks, since the presence of a valid signed event is ··· 224 224 } 225 225 226 226 // Steps 6, 7: shared drop checks (active status already verified above). 227 - if validate::should_drop(&info, prev.as_ref(), &rev, "sync", &did, now_ms) { 227 + if validate::should_drop(&info, prev.as_ref(), &rev, "sync", &did, now) { 228 228 return Ok(SyncCheckOutcome::Done); 229 229 } 230 230 ··· 243 243 storage::resync_queue::enqueue_into( 244 244 &mut batch, 245 245 db, 246 - now_ms, 246 + now, 247 247 &ResyncItem { 248 248 did, 249 249 retry_count: 0,
+2 -1
src/sync/firehose/validate.rs
··· 226 226 rev: &Tid, 227 227 label: &'static str, 228 228 did: &Did<'_>, 229 - now_ms: u64, 229 + now: std::time::SystemTime, 230 230 ) -> bool { 231 231 if !info.status.is_active() { 232 232 metrics::counter!("lightrail_event_dropped_total", ··· 256 256 return true; 257 257 } 258 258 259 + let now_ms = crate::util::to_millis(now); 259 260 let rev_ms = rev.timestamp() / 1_000; // micros -> millis 260 261 if rev_ms > now_ms + REV_FUTURE_TOLERANCE_MS { 261 262 metrics::counter!("lightrail_event_dropped_total",
+99 -85
src/sync/resync/dispatcher.rs
··· 11 11 //! claiming a new entry for a DID whose previous entry is still being processed. 12 12 13 13 use jacquard_api::com_atproto::sync::subscribe_repos::Commit; 14 - use jacquard_common::{IntoStatic, types::string::Did}; 14 + use jacquard_common::{IntoStatic, types::string::Did, url::Url}; 15 15 use std::collections::{HashMap, HashSet}; 16 - use std::time::{Duration, Instant}; 16 + use std::sync::Arc; 17 + use std::time::{Duration, Instant, SystemTime}; 17 18 18 - use super::{GetCollectionsError, ResyncError}; 19 + use super::ResyncError; 19 20 20 21 use tokio::task::{Id as TaskId, JoinSet}; 21 22 use tracing::{debug, error, info, trace, warn}; ··· 28 29 repo::{AccountStatus, RepoInfo, RepoState}, 29 30 resync_queue::ResyncItem, 30 31 }; 31 - use crate::util::{TokenExt, unix_now_ms}; 32 + use crate::util::TokenExt; 32 33 33 34 /// How long to wait between queue polls when no workers are running. 34 - const IDLE_POLL_MS: u64 = 500; 35 + const IDLE_POLL: Duration = Duration::from_millis(500); 35 36 36 37 /// Default per-host cooldown after receiving an HTTP 429 from a PDS. 37 38 /// 38 39 /// Used when the PDS does not send a `Retry-After` header. During this window 39 40 /// the dispatcher will not dispatch new work to that host. 40 - const RATE_LIMIT_COOLDOWN_MS: u64 = 20_000; 41 + const RATE_LIMIT_COOLDOWN: Duration = Duration::from_secs(20); 41 42 42 43 pub struct DispatcherConfig { 43 44 pub resolver: std::sync::Arc<crate::identity::Resolver>, ··· 71 72 let mut task_dids: HashMap<TaskId, Did<'static>> = HashMap::new(); 72 73 let mut since: Option<Vec<u8>> = None; 73 74 let mut workers: JoinSet<WorkerOutcome> = JoinSet::new(); 74 - // Maps PDS hostname → Instant when the 429 cooldown expires. 75 - let mut cooling_hosts: HashMap<String, Instant> = HashMap::new(); 75 + // Maps PDS URL → Instant when the 429 cooldown expires. 76 + let mut cooling_hosts: HashMap<Arc<Url>, Instant> = HashMap::new(); 76 77 77 78 info!(max_concurrent, "resync dispatcher started"); 78 79 79 80 loop { 80 81 // Fill up to max_concurrent slots from the queue. 81 82 while workers.len() < max_concurrent { 82 - let now = unix_now_ms(); 83 + let now = SystemTime::now(); 83 84 let claim = { 84 85 let db = db.clone(); 85 86 let since = since.clone(); ··· 102 103 // Check whether this item's PDS host is still cooling down 103 104 // after a 429. The resolver cache makes this a local lookup 104 105 // for any DID we've seen recently. 105 - if let Ok(resolved) = resolver.resolve(&did).await { 106 - let host = resolved.pds.host_str().unwrap_or("").to_string(); 107 - match cooling_hosts.get(&host) { 108 - Some(&until) if Instant::now() < until => { 109 - let remaining_ms = 110 - until.duration_since(Instant::now()).as_millis().max(1000) 111 - as u64; 112 - let db = db.clone(); 113 - let ts = unix_now_ms() + remaining_ms; 114 - match tokio::task::spawn_blocking(move || { 115 - storage::resync_queue::enqueue(&db, ts, &item) 116 - }) 117 - .await 118 - { 119 - Ok(Ok(())) => {} 120 - Ok(Err(e)) => error!(error = %e, did = %did, 121 - "failed to defer during host cooldown"), 122 - Err(e) => error!(error = %e, did = %did, 123 - "failed to defer during host cooldown (panic)"), 106 + match resolver.resolve(&did).await { 107 + Ok(resolved) => { 108 + if let Some(&until) = cooling_hosts.get(&resolved.pds) { 109 + if Instant::now() < until { 110 + let remaining = until.duration_since(Instant::now()); 111 + let remaining = if remaining < Duration::from_secs(1) { 112 + Duration::from_secs(1) 113 + } else { 114 + remaining 115 + }; 116 + let db = db.clone(); 117 + let when = SystemTime::now() + remaining; 118 + let pds_host = 119 + resolved.pds.host_str().unwrap_or("unknown").to_string(); 120 + let remaining_ms = remaining.as_millis() as u64; 121 + match tokio::task::spawn_blocking(move || { 122 + storage::resync_queue::enqueue(&db, when, &item) 123 + }) 124 + .await 125 + { 126 + Ok(Ok(())) => {} 127 + Ok(Err(e)) => error!(error = %e, did = %did, 128 + "failed to defer during host cooldown"), 129 + Err(e) => error!(error = %e, did = %did, 130 + "failed to defer during host cooldown (panic)"), 131 + } 132 + busy.remove(&did); 133 + debug!(did = %did, host = %pds_host, remaining_ms, 134 + "deferring; PDS host cooling down after 429"); 135 + break; 136 + } else { 137 + cooling_hosts.remove(&resolved.pds); 124 138 } 125 - busy.remove(&did); 126 - debug!(did = %did, host = %host, remaining_ms, 127 - "deferring; PDS host cooling down after 429"); 128 - break; 129 139 } 130 - Some(_) => { 131 - // Cooldown expired; clean up the entry. 132 - cooling_hosts.remove(&host); 133 - } 134 - None => {} 140 + } 141 + Err(e) => { 142 + warn!(did = %did, error = %e, "DID resolution failed during claim; skipping"); 143 + busy.remove(&did); 144 + continue; 135 145 } 136 146 } 137 147 ··· 164 174 165 175 if workers.is_empty() { 166 176 // Nothing running, nothing claimable right now. Yield before polling. 167 - if !token.sleep(Duration::from_millis(IDLE_POLL_MS)).await { 177 + if !token.sleep(IDLE_POLL).await { 168 178 return Ok(()); 169 179 }; 170 180 continue; ··· 192 202 metrics::gauge!("lightrail_resync_workers").set(workers.len() as f64); 193 203 194 204 match worker_result { 195 - Ok(WorkerOutcome::RateLimited { host, retry_count }) => { 205 + Ok(WorkerOutcome::RateLimited { pds, retry_count }) => { 196 206 metrics::counter!("lightrail_resync_completed_total", 197 207 "outcome" => "rate_limited") 198 208 .increment(1); 199 - cooling_hosts.insert( 200 - host.clone(), 201 - Instant::now() + Duration::from_millis(RATE_LIMIT_COOLDOWN_MS), 202 - ); 203 - warn!(did = %did, host = %host, cooldown_ms = RATE_LIMIT_COOLDOWN_MS, 209 + cooling_hosts.insert(pds.clone(), Instant::now() + RATE_LIMIT_COOLDOWN); 210 + warn!(did = %did, pds = %pds, cooldown_secs = RATE_LIMIT_COOLDOWN.as_secs(), 204 211 "PDS rate-limited; cooling down"); 212 + let pds_host = pds.host_str().unwrap_or("unknown"); 205 213 let item = ResyncItem { 206 214 did: did.clone(), 207 215 retry_count, 208 - retry_reason: format!("rate limited by {host}"), 216 + retry_reason: format!("rate limited by {pds_host}"), 209 217 commit_cbor: vec![], 210 218 }; 211 219 let db = db.clone(); 212 - let ts = unix_now_ms() + RATE_LIMIT_COOLDOWN_MS; 220 + let when = SystemTime::now() + RATE_LIMIT_COOLDOWN; 213 221 match tokio::task::spawn_blocking(move || { 214 - storage::resync_queue::enqueue(&db, ts, &item) 222 + storage::resync_queue::enqueue(&db, when, &item) 215 223 }) 216 224 .await 217 225 { ··· 262 270 /// The PDS returned HTTP 429. The dispatcher will cool down this host 263 271 /// before sending it more requests. 264 272 RateLimited { 265 - host: String, 273 + pds: Arc<Url>, 266 274 retry_count: u16, 267 275 }, 268 276 /// The PDS returned a definitive "repo not found" response. ··· 297 305 trace!(did = %did, "resync completed"); 298 306 WorkerOutcome::Success 299 307 } 300 - Err(ResyncError::Fetch(GetCollectionsError::RateLimited(host))) => { 301 - warn!(did = %did, host = %host, "PDS rate-limited; reporting to dispatcher"); 308 + Err(ResyncError::RateLimited(pds)) => { 309 + warn!(did = %did, pds = %pds, "PDS rate-limited; reporting to dispatcher"); 302 310 WorkerOutcome::RateLimited { 303 - host, 311 + pds, 304 312 retry_count: item.retry_count, 305 313 } 306 314 } ··· 359 367 ) 360 368 .await?; 361 369 let new_retry = retry_count.saturating_add(1); 362 - let delay_ms = backoff_secs(new_retry) * 1000; 363 - metrics::histogram!("lightrail_resync_retry_delay_ms").record(delay_ms as f64); 370 + let delay = backoff(new_retry); 371 + metrics::histogram!("lightrail_resync_retry_delay_seconds").record(delay.as_secs_f64()); 364 372 let item = ResyncItem { 365 373 did: did.clone(), 366 374 retry_count: new_retry, 367 375 retry_reason: error, 368 376 commit_cbor: vec![], 369 377 }; 370 - let ts = unix_now_ms() + delay_ms; 371 - tokio::task::spawn_blocking(move || storage::resync_queue::enqueue(&db, ts, &item)) 378 + let when = SystemTime::now() + delay; 379 + tokio::task::spawn_blocking(move || storage::resync_queue::enqueue(&db, when, &item)) 372 380 .await??; 373 - info!(did = %did, retry = new_retry, delay_ms, "re-enqueued for retry"); 381 + info!(did = %did, retry = new_retry, delay_secs = delay.as_secs(), "re-enqueued for retry"); 374 382 } 375 383 WorkerOutcome::NotFound { retry_count } => { 376 384 metrics::counter!("lightrail_resync_completed_total", "outcome" => "not_found") 377 385 .increment(1); 378 386 let new_retry = retry_count.saturating_add(1); 379 - if let Some(delay) = not_found_backoff_secs(new_retry) { 387 + if let Some(delay) = not_found_backoff(new_retry) { 380 388 let item = ResyncItem { 381 389 did: did.clone(), 382 390 retry_count: new_retry, 383 391 retry_reason: "repo not found".to_string(), 384 392 commit_cbor: vec![], 385 393 }; 386 - let delay_ms = delay * 1000; 387 - let ts = unix_now_ms() + delay_ms; 388 - tokio::task::spawn_blocking(move || storage::resync_queue::enqueue(&db, ts, &item)) 389 - .await??; 390 - info!(did = %did, retry = new_retry, delay_secs = delay, 394 + let when = SystemTime::now() + delay; 395 + tokio::task::spawn_blocking(move || { 396 + storage::resync_queue::enqueue(&db, when, &item) 397 + }) 398 + .await??; 399 + info!(did = %did, retry = new_retry, delay_secs = delay.as_secs(), 391 400 "re-enqueued after repo not found"); 392 401 } else { 393 402 info!(did = %did, retries = retry_count, ··· 435 444 436 445 // Deserialize borrowing from `event.cbor`, then convert to owned via 437 446 // IntoStatic so the commit lifetime is independent of the buffer bytes. 438 - let commit: Box<Commit<'static>> = 439 - match serde_ipld_dagcbor::from_slice::<Commit<'_>>(&event.cbor) 440 - .map(IntoStatic::into_static) 441 - { 442 - Ok(c) => Box::new(c), 443 - Err(e) => { 444 - warn!(did = %did, seq, error = %e, 447 + let commit: Box<Commit<'static>> = match serde_ipld_dagcbor::from_slice::<Commit<'_>>( 448 + &event.cbor, 449 + ) 450 + .map(IntoStatic::into_static) 451 + { 452 + Ok(c) => Box::new(c), 453 + Err(e) => { 454 + warn!(did = %did, seq, error = %e, 445 455 "failed to deserialize buffered commit; skipping"); 446 - // Ack to avoid the entry accumulating across future resyncs. 447 - let did_ack = did.clone(); 448 - let db_ack = db.clone(); 449 - let _ = tokio::task::spawn_blocking(move || { 450 - crate::storage::resync_buffer::ack_buffer_entry(&db_ack, did_ack, seq) 451 - }) 452 - .await; 453 - continue; 456 + // Ack to avoid the entry accumulating across future resyncs. 457 + let did_ack = did.clone(); 458 + let db_ack = db.clone(); 459 + if let Err(e) = tokio::task::spawn_blocking(move || { 460 + crate::storage::resync_buffer::ack_buffer_entry(&db_ack, did_ack, seq) 461 + }) 462 + .await 463 + { 464 + warn!(did = %did, seq, error = %e, "failed to ack malformed buffered commit"); 454 465 } 455 - }; 466 + continue; 467 + } 468 + }; 456 469 457 470 if let Err(e) = crate::sync::firehose::commit_event::process_commit_event( 458 471 commit, seq as i64, resolver, &db, client, ··· 486 499 // Short delay so we don't immediately re-panic on the same broken input. 487 500 let db2 = db.clone(); 488 501 match tokio::task::spawn_blocking(move || { 489 - storage::resync_queue::enqueue(&db2, unix_now_ms() + 60_000, &item) 502 + storage::resync_queue::enqueue(&db2, SystemTime::now() + Duration::from_secs(60), &item) 490 503 }) 491 504 .await 492 505 { ··· 532 545 Ok(()) 533 546 } 534 547 535 - fn backoff_secs(retry_count: u16) -> u64 { 548 + fn backoff(retry_count: u16) -> Duration { 536 549 // 60s, 120s, 240s, 480s, 960s, 1920s, 3600s (capped) 537 550 let exp = (retry_count as u32).saturating_sub(1).min(5); 538 - (60u64 * (1u64 << exp)).min(3600) 551 + let secs = (60u64 * (1u64 << exp)).min(3600); 552 + Duration::from_secs(secs) 539 553 } 540 554 541 555 /// Slow-retry schedule for repos that returned "not found" from their PDS. ··· 543 557 /// Returns `None` when we've exhausted retries — the `RepoState::NotFound` 544 558 /// written by `index_repo` is the terminal state; future firehose activity 545 559 /// for the DID will re-trigger processing normally. 546 - fn not_found_backoff_secs(retry_count: u16) -> Option<u64> { 560 + fn not_found_backoff(retry_count: u16) -> Option<Duration> { 547 561 match retry_count { 548 - 1 => Some(6 * 3600), // 6h 549 - 2 => Some(24 * 3600), // 24h 550 - 3 => Some(24 * 3600), // 24h 562 + 1 => Some(Duration::from_secs(6 * 3600)), // 6h 563 + 2 => Some(Duration::from_secs(24 * 3600)), // 24h 564 + 3 => Some(Duration::from_secs(24 * 3600)), // 24h 551 565 _ => None, 552 566 } 553 567 }
+7
src/sync/resync/mod.rs
··· 20 20 pub mod get_repo; 21 21 22 22 use std::collections::BTreeSet; 23 + use std::sync::Arc; 23 24 use std::time::Duration; 24 25 25 26 use cid::Cid as RawCid; ··· 48 49 Fetch(GetCollectionsError), 49 50 #[error(transparent)] 50 51 Storage(#[from] crate::storage::StorageError), 52 + /// The PDS returned HTTP 429. Carries the PDS URL for host-level cooldown. 53 + #[error("rate limited by {0}")] 54 + RateLimited(Arc<jacquard_common::url::Url>), 51 55 #[error("invalid DID in queue: {0}")] 52 56 InvalidDid(String), 53 57 #[error("blocking storage task panicked: {0}")] ··· 163 167 .await 164 168 .map_err(|e| ResyncError::TaskPanic(e.to_string()))??; 165 169 return Ok(()); 170 + } 171 + Err(GetCollectionsError::RateLimited(_)) => { 172 + return Err(ResyncError::RateLimited(resolved.pds.clone())); 166 173 } 167 174 Err(e) => return Err(ResyncError::Fetch(e)), 168 175 };
+11 -6
src/util.rs
··· 1 - use std::time::{SystemTime, UNIX_EPOCH}; 2 - use tokio::time::{Duration, sleep}; 1 + use std::time::{Duration, SystemTime, UNIX_EPOCH}; 2 + use tokio::time::sleep; 3 3 use tokio_util::sync::CancellationToken; 4 4 5 - pub fn unix_now_ms() -> u64 { 6 - SystemTime::now() 7 - .duration_since(UNIX_EPOCH) 8 - .expect("not to be way in the past") 5 + /// Convert a `SystemTime` to milliseconds since the Unix epoch. 6 + pub fn to_millis(t: SystemTime) -> u64 { 7 + t.duration_since(UNIX_EPOCH) 8 + .expect("system clock before Unix epoch") 9 9 .as_millis() as u64 10 + } 11 + 12 + /// Convert milliseconds since the Unix epoch to a `SystemTime`. 13 + pub fn from_millis(ms: u64) -> SystemTime { 14 + UNIX_EPOCH + Duration::from_millis(ms) 10 15 } 11 16 12 17 pub trait TokenExt {