lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

scatter crawled resyncs to the future at qps limit

instead of a two-level queue keyed at the top by host to acheive round-robin requests by pds host, keep track of pds hosts last-queue-time when inserting, spreading them into the future so that new hosts get mixed in early

or another way:

schedule per-host resyncs as early as possible per-host but no earlier

phil 7723c291 f20205c0

+166 -86
+2 -5
examples/enqueue_resync.rs
··· 1 1 use std::path::PathBuf; 2 - use std::time::{SystemTime, UNIX_EPOCH}; 3 2 4 3 use clap::Parser; 5 4 use jacquard_common::types::string::Did; 6 5 7 6 use lightrail::storage::{self, resync_queue::ResyncItem}; 7 + use lightrail::util::unix_now_ms; 8 8 9 9 #[derive(Parser, Debug)] 10 10 #[command( ··· 28 28 fn main() -> Result<(), Box<dyn std::error::Error>> { 29 29 let args = Args::parse(); 30 30 let db = storage::open(&args.db_path, 64)?; 31 - let now = SystemTime::now() 32 - .duration_since(UNIX_EPOCH) 33 - .unwrap() 34 - .as_secs(); 31 + let now = unix_now_ms(); 35 32 36 33 for raw in &args.dids { 37 34 let did = Did::new_owned(raw.clone())?;
+14 -4
src/main.rs
··· 175 175 let db = db.clone(); 176 176 let client = client.clone(); 177 177 let host = subscribe_host.clone(); 178 + let resolver = resolver.clone(); 178 179 async move { 179 - if backfill::run(host, db, client, token.clone(), None) 180 - .await 181 - .inspect(|_| info!("backfill done.")) 182 - .inspect_err(|e| warn!(error = %e, "backfill exited"))? 180 + if backfill::run( 181 + host, 182 + db, 183 + client, 184 + token.clone(), 185 + resolver, 186 + false, 187 + args.crawl_qps, 188 + ) 189 + .await 190 + .inspect(|_| info!("backfill done.")) 191 + .inspect_err(|e| warn!(error = %e, "backfill exited"))? 183 192 { 184 193 info!("backfill task idling"); 185 194 token.cancelled().await; ··· 259 268 args.max_deep_crawl_workers, 260 269 token, 261 270 resolver, 271 + args.crawl_qps, 262 272 ) 263 273 .await 264 274 .inspect(|_| info!("deep crawl done."))
+2 -2
src/server/admin.rs
··· 18 18 #[derive(Serialize)] 19 19 pub(super) struct AdminStatus { 20 20 // ── Persistent counters (survive restarts) ──────────────────────────── 21 - first_startup_secs: u64, 21 + first_startup_ms: u64, 22 22 startup_count: u64, 23 23 repos_queued_total: u64, 24 24 collection_births_total: u64, ··· 90 90 91 91 let s = &db.stats; 92 92 Ok(Json(AdminStatus { 93 - first_startup_secs: s.first_startup_secs.load(Ordering::Relaxed), 93 + first_startup_ms: s.first_startup_ms.load(Ordering::Relaxed), 94 94 startup_count: s.startup_count.load(Ordering::Relaxed), 95 95 repos_queued_total: s.repos_queued_total.load(Ordering::Relaxed), 96 96 collection_births_total: s.collection_births_total.load(Ordering::Relaxed),
+15 -16
src/storage/meta.rs
··· 20 20 21 21 pub struct Stats { 22 22 // ── Counters ───────────────────────────────────────────────────────────── 23 - /// Unix timestamp of the very first startup; set once, never updated. 24 - pub first_startup_secs: AtomicU64, 23 + /// Unix ms timestamp of the very first startup; set once, never updated. 24 + pub first_startup_ms: AtomicU64, 25 25 pub startup_count: AtomicU64, 26 26 /// New repos ever enqueued for resync. 27 27 pub repos_queued_total: AtomicU64, ··· 34 34 /// Approximate count of buffered in-flight events. 35 35 pub resync_buffer_count: AtomicI64, 36 36 37 - // ── Cardinality sketches (~3 KiB each at full HLL) ──────────────────────── 37 + // BOOOOoooooo mutexes (they should be ok, we only grab them briefly for writes and serializes) 38 38 /// Distinct NSID strings ever indexed. 39 - /// BOOOOoooooo mutexes (they should be ok, we only grab them briefly for writes and serializes) 40 39 pub sketch_collections: Mutex<Sk>, 41 40 /// All DIDs ever seen. 42 41 pub sketch_accounts_all: Mutex<Sk>, ··· 55 54 impl Default for Stats { 56 55 fn default() -> Self { 57 56 Stats { 58 - first_startup_secs: AtomicU64::new(0), 57 + first_startup_ms: AtomicU64::new(0), 59 58 startup_count: AtomicU64::new(0), 60 59 repos_queued_total: AtomicU64::new(0), 61 60 collection_births_total: AtomicU64::new(0), ··· 174 173 /// Load stats from storage on startup. 175 174 /// 176 175 /// Missing keys default to zero / empty sketch. After loading: 177 - /// - If `first_startup_secs` is 0, writes the current time immediately. 176 + /// - If `first_startup_ms` is 0, writes the current time immediately. 178 177 /// - Increments `startup_count` and writes it immediately. 179 178 pub(super) fn load(ks: &fjall::Keyspace) -> StorageResult<StatsRef> { 180 179 let stats = Stats { 181 - first_startup_secs: AtomicU64::new(read_u64(ks, K_FIRST_STARTUP)?), 180 + first_startup_ms: AtomicU64::new(read_u64(ks, K_FIRST_STARTUP)?), 182 181 startup_count: AtomicU64::new(read_u64(ks, K_STARTUP_COUNT)?), 183 182 repos_queued_total: AtomicU64::new(read_u64(ks, K_REPOS_QUEUED)?), 184 183 collection_births_total: AtomicU64::new(read_u64(ks, K_BIRTHS)?), ··· 195 194 sketch_pds_hosts: Mutex::new(read_sketch(ks, K_SK_PDS_HOSTS)), 196 195 }; 197 196 198 - // Set first_startup_secs if this is the very first startup. 199 - if stats.first_startup_secs.load(Ordering::Relaxed) == 0 { 200 - let now = crate::util::unix_now(); 201 - stats.first_startup_secs.store(now, Ordering::Relaxed); 197 + // Set first_startup_ms if this is the very first startup. 198 + if stats.first_startup_ms.load(Ordering::Relaxed) == 0 { 199 + let now = crate::util::unix_now_ms(); 200 + stats.first_startup_ms.store(now, Ordering::Relaxed); 202 201 write_u64(ks, K_FIRST_STARTUP, now)?; 203 202 } 204 203 ··· 217 216 write_u64( 218 217 ks, 219 218 K_FIRST_STARTUP, 220 - s.first_startup_secs.load(Ordering::Relaxed), 219 + s.first_startup_ms.load(Ordering::Relaxed), 221 220 )?; 222 221 write_u64(ks, K_STARTUP_COUNT, s.startup_count.load(Ordering::Relaxed))?; 223 222 write_u64( ··· 275 274 fn first_startup_set_on_open() { 276 275 // open_temporary already calls meta::load internally. 277 276 let db = open_temporary().unwrap(); 278 - assert!(db.stats.first_startup_secs.load(Ordering::Relaxed) > 0); 277 + assert!(db.stats.first_startup_ms.load(Ordering::Relaxed) > 0); 279 278 assert_eq!(db.stats.startup_count.load(Ordering::Relaxed), 1); 280 279 } 281 280 282 281 #[test] 283 282 fn startup_count_increments_on_reload() { 284 283 let db = open_temporary().unwrap(); 285 - let first_ts = db.stats.first_startup_secs.load(Ordering::Relaxed); 284 + let first_ts = db.stats.first_startup_ms.load(Ordering::Relaxed); 286 285 assert_eq!(db.stats.startup_count.load(Ordering::Relaxed), 1); 287 286 288 287 // Simulate a second startup: save current stats, then call load again. 289 288 save(&db).unwrap(); 290 289 let s2 = load(&db.ks).unwrap(); 291 290 assert_eq!(s2.startup_count.load(Ordering::Relaxed), 2); 292 - // first_startup_secs must not change. 293 - assert_eq!(s2.first_startup_secs.load(Ordering::Relaxed), first_ts); 291 + // first_startup_ms must not change. 292 + assert_eq!(s2.first_startup_ms.load(Ordering::Relaxed), first_ts); 294 293 } 295 294 296 295 #[test]
+73 -19
src/sync/backfill.rs
··· 6 6 //! (any state) are skipped — the dispatcher's retry mechanism handles repos 7 7 //! that need re-syncing. 8 8 9 + use std::collections::HashMap; 10 + use std::num::NonZeroU32; 9 11 use std::sync::Arc; 10 12 11 13 use jacquard_api::com_atproto::sync::list_repos::ListRepos; ··· 28 30 backfill_progress::{BackfillProgress, get, set}, 29 31 resync_queue::ResyncItem, 30 32 }, 31 - util::{TokenExt, unix_now}, 33 + util::{TokenExt, unix_now_ms}, 32 34 }; 33 35 use std::sync::atomic::Ordering; 34 36 ··· 45 47 /// `MAX_PAGE_FAILURES` times before giving up. Returns `Ok(true)` when the 46 48 /// full walk completes, `Ok(false)` if cancelled or the host gives up. 47 49 /// 48 - /// passing in `resolver` implies that this is a PDS, not a relay -- treat with 49 - /// lower trust: verify DIDs belong on that host and check for evil cursor. 50 + /// Walk `listRepos` on `host` and enqueue new repos for resync. 51 + /// 52 + /// When `validate` is true (deep crawl / untrusted PDS), DIDs are verified to 53 + /// actually live on `host` and non-matching ones are rejected. The resolver is 54 + /// always used for PDS-host discovery so that enqueue timestamps can be 55 + /// staggered per actual PDS, spreading work across hosts in the queue. 50 56 pub async fn run( 51 57 host: Host, 52 58 db: DbRef, 53 59 client: ThrottledClient, 54 60 token: CancellationToken, 55 - resolver: Option<Arc<Resolver>>, 61 + resolver: Arc<Resolver>, 62 + validate: bool, 63 + crawl_qps: NonZeroU32, 56 64 ) -> Result<bool> { 57 65 let base: jacquard_common::url::Url = format!("https://{host}") 58 66 .parse() ··· 73 81 ); 74 82 75 83 let mut total_queued: u64 = 0; 84 + // Per-host staggering: track last-scheduled timestamp (in seconds, with 85 + // fractional precision via a separate sub-second counter) so that items 86 + // for the same host are spread across time at 1/crawl_qps intervals. 87 + // This prevents the timestamp-ordered queue from bunching all items for 88 + // a popular host together. 89 + let host_interval_ms: u64 = 1000 / crawl_qps.get() as u64; 90 + let mut host_schedule: HashMap<String, u64> = HashMap::new(); // host → last scheduled ts (seconds) 76 91 77 92 loop { 78 93 if token.is_cancelled() { ··· 86 101 }; 87 102 88 103 let page_len = dids.len(); 89 - let now = unix_now(); 104 + let now = unix_now_ms(); 105 + let host_str = host.to_string(); 90 106 91 - // resolver is passed in for untrusted hosts so we can filter out dids that the host isn't authoritative over 92 - // TODO: if *many* dids are cleared (probably as a percentage?) we might mark this host sketchy 93 - let dids = match &resolver { 94 - Some(r) => validate_dids(dids, r, &host, &token).await, 95 - None => dids, 107 + // For untrusted hosts (deep crawl), filter DIDs to those whose 108 + // resolved PDS actually matches this host. 109 + let dids = if validate { 110 + validate_dids(dids, &resolver, &host, &token).await 111 + } else { 112 + dids 113 + }; 114 + 115 + // Resolve each DID's actual PDS host for per-host stagger. 116 + // Cache hits are free; misses fall back to the listed host. 117 + let dids_with_hosts: Vec<(Did<'static>, String)> = { 118 + let mut out = Vec::with_capacity(dids.len()); 119 + for did in dids { 120 + let pds_host = match resolver.resolve(&did).await { 121 + Ok(resolved) => resolved.pds.host_str().unwrap_or(&host_str).to_string(), 122 + Err(_) => host_str.clone(), 123 + }; 124 + out.push((did, pds_host)); 125 + } 126 + out 96 127 }; 97 128 98 129 let progress_cursor = next_cursor.clone().unwrap_or_default(); 99 - let page_queued = { 130 + let (page_queued, schedule_back) = { 100 131 let db = db.clone(); 101 132 let host = host.clone(); 102 - tokio::task::spawn_blocking(move || store_page(&db, &host, dids, progress_cursor, now)) 103 - .await?? 133 + // Move the schedule into the blocking task so timestamps are 134 + // only advanced for DIDs that are actually newly inserted. 135 + let schedule = std::mem::take(&mut host_schedule); 136 + tokio::task::spawn_blocking(move || { 137 + store_page( 138 + &db, 139 + &host, 140 + dids_with_hosts, 141 + progress_cursor, 142 + now, 143 + host_interval_ms, 144 + schedule, 145 + ) 146 + }) 147 + .await?? 104 148 }; 149 + host_schedule = schedule_back; 105 150 106 151 total_queued += page_queued; 107 152 ··· 262 307 263 308 /// Enqueue newly-seen DIDs and persist the backfill cursor in one blocking task. 264 309 /// 265 - /// Returns the number of DIDs that were newly inserted into the resync queue. 310 + /// Timestamps are staggered per PDS host so that the timestamp-ordered queue 311 + /// naturally interleaves items across hosts. Only newly-inserted DIDs advance 312 + /// the per-host schedule — already-known repos are skipped without leaving gaps. 313 + /// 314 + /// Returns `(count_inserted, updated_host_schedule)`. 266 315 fn store_page( 267 316 db: &DbRef, 268 317 host: &Host, 269 - dids: Vec<Did<'static>>, 318 + items: Vec<(Did<'static>, String)>, 270 319 progress_cursor: String, 271 320 now: u64, 272 - ) -> Result<u64> { 321 + interval_ms: u64, 322 + mut host_schedule: HashMap<String, u64>, 323 + ) -> Result<(u64, HashMap<String, u64>)> { 273 324 let mut count: u64 = 0; 274 - for did in dids { 325 + for (did, pds) in items { 275 326 let newly_inserted = storage::repo::ensure_repo(db, &did)?; 276 327 if newly_inserted { 328 + let last = host_schedule.get(&pds).copied().unwrap_or(now); 329 + let ts = if last >= now { last + interval_ms } else { now }; 330 + host_schedule.insert(pds, ts); 277 331 let item = ResyncItem { 278 332 did, 279 333 retry_count: 0, 280 334 retry_reason: "backfill".to_string(), 281 335 commit_cbor: vec![], 282 336 }; 283 - storage::resync_queue::enqueue(db, now, &item)?; 337 + storage::resync_queue::enqueue(db, ts, &item)?; 284 338 db.stats.repos_queued_total.fetch_add(1, Ordering::Relaxed); 285 339 count += 1; 286 340 } ··· 295 349 completed_at: None, 296 350 }, 297 351 )?; 298 - Ok(count) 352 + Ok((count, host_schedule)) 299 353 }
+19 -7
src/sync/deep_crawl.rs
··· 5 5 //! have already had a completed backfill are skipped — new repos on those PDSes 6 6 //! arrive via the firehose. 7 7 8 + use std::num::NonZeroU32; 8 9 use std::sync::Arc; 9 10 10 11 use jacquard_api::com_atproto::sync::list_hosts::ListHosts; ··· 24 25 25 26 const PAGE_LIMIT: i64 = 500; 26 27 /// Delay between retry attempts after a failed page request. 27 - const RETRY_DELAY_SECS: u64 = 10; 28 + const RETRY_DELAY_MS: u64 = 10_000; 28 29 /// Maximum number of consecutive page failures before abandoning the pass. 29 30 const MAX_PAGE_FAILURES: u32 = 3; 30 31 /// How long to wait between full passes through all listHosts pages. 31 - const REPOLL_SECS: u64 = 20 * 60 * 60; 32 + const REPOLL_MS: u64 = 20 * 60 * 60_000; 32 33 33 34 /// Discover PDS hosts via `listHosts` on `relay_host` and crawl each one, 34 35 /// re-polling every 20 hours after each full pass. ··· 39 40 max_workers: usize, 40 41 token: CancellationToken, 41 42 resolver: Arc<Resolver>, 43 + crawl_qps: NonZeroU32, 42 44 ) -> Result<()> { 43 45 info!(relay = %relay_host, "deep crawl started"); 44 46 ··· 55 57 max_workers, 56 58 token.clone(), 57 59 resolver.clone(), 60 + crawl_qps, 58 61 ) 59 62 .await?; 60 63 ··· 64 67 65 68 info!( 66 69 relay = %relay_host, 67 - repoll_secs = REPOLL_SECS, 70 + repoll_ms = REPOLL_MS, 68 71 "deep crawl pass complete; re-polling in 20 hours" 69 72 ); 70 73 71 - if !token.sleep(Duration::from_secs(REPOLL_SECS)).await { 74 + if !token.sleep(Duration::from_millis(REPOLL_MS)).await { 72 75 return Ok(()); 73 76 } 74 77 } ··· 85 88 max_workers: usize, 86 89 token: CancellationToken, 87 90 resolver: Arc<Resolver>, 91 + crawl_qps: NonZeroU32, 88 92 ) -> Result<()> { 89 93 let base: jacquard_common::url::Url = format!("https://{relay_host}") 90 94 .parse() ··· 163 167 let host2 = pds_host.clone(); 164 168 let resolver2 = resolver.clone(); 165 169 workers.spawn(async move { 166 - let outcome = 167 - backfill::run(host2.clone(), db2, client2, child, Some(resolver2)).await; 170 + let outcome = backfill::run( 171 + host2.clone(), 172 + db2, 173 + client2, 174 + child, 175 + resolver2, 176 + true, 177 + crawl_qps, 178 + ) 179 + .await; 168 180 (host2, outcome) 169 181 }); 170 182 metrics::gauge!("lightrail_deep_crawl_workers").set(workers.len() as f64); ··· 216 228 limit: Some(PAGE_LIMIT), 217 229 }; 218 230 for attempt in 0..MAX_PAGE_FAILURES { 219 - if attempt > 0 && !token.sleep(Duration::from_secs(RETRY_DELAY_SECS)).await { 231 + if attempt > 0 && !token.sleep(Duration::from_millis(RETRY_DELAY_MS)).await { 220 232 return None; 221 233 } 222 234
+10 -7
src/sync/firehose/commit_event.rs
··· 60 60 let db2 = db.clone(); 61 61 let did2 = did.clone(); 62 62 let pds_host2 = pds_host.clone(); 63 + let now = crate::util::unix_now_ms(); 63 64 let step2 = 64 - tokio::task::spawn_blocking(move || check_step2_blocking(&db2, did2, &rev, pds_host2)) 65 + tokio::task::spawn_blocking(move || check_step2_blocking(&db2, did2, &rev, pds_host2, now)) 65 66 .await??; 66 67 let (info, prev, pds_mode) = match step2 { 67 68 Step2Result::Proceed(info, prev, mode) => (info, prev, mode), ··· 88 89 let rev_ra = commit.rev.clone(); 89 90 let pds_host_ra = pds_host.clone(); 90 91 let step2_retry = tokio::task::spawn_blocking(move || { 91 - reactivate_and_recheck(&db_ra, &did_ra, &rev_ra, pds_host_ra) 92 + reactivate_and_recheck(&db_ra, &did_ra, &rev_ra, pds_host_ra, now) 92 93 }) 93 94 .await??; 94 95 match step2_retry { ··· 283 284 did: Did<'static>, 284 285 rev: &Tid, 285 286 pds_host: Option<Host>, 287 + now_ms: u64, 286 288 ) -> crate::error::Result<Step2Result> { 287 289 let Some((info, prev)) = storage::repo::get(db, &did)? else { 288 290 // Unknown repo — create an entry and enqueue for initial fetch so that ··· 302 304 storage::resync_queue::enqueue_into( 303 305 &mut batch, 304 306 db, 305 - crate::util::unix_now(), 307 + now_ms, 306 308 &crate::storage::resync_queue::ResyncItem { 307 309 did, 308 310 retry_count: 0, ··· 325 327 if !info.status.is_active() { 326 328 return Ok(Step2Result::InactiveAccount(info, prev)); 327 329 } 328 - if validate::should_drop(&info, prev.as_ref(), rev, "commit", &did) { 330 + if validate::should_drop(&info, prev.as_ref(), rev, "commit", &did, now_ms) { 329 331 return Ok(Step2Result::Drop); 330 332 } 331 333 let mode = match pds_host { ··· 342 344 did: &Did<'static>, 343 345 rev: &Tid, 344 346 pds_host: Option<Host>, 347 + now: u64, 345 348 ) -> crate::error::Result<Step2Result> { 346 349 let Some((mut info, _)) = storage::repo::get(db, did)? else { 347 350 return Ok(Step2Result::Drop); ··· 354 357 .map_err(Into::<crate::storage::StorageError>::into)?; 355 358 // Re-run the full step-2 check; the updated Active status means 356 359 // InactiveAccount will not be returned again. 357 - check_step2_blocking(db, did.clone(), rev, pds_host) 360 + check_step2_blocking(db, did.clone(), rev, pds_host, now) 358 361 } 359 362 360 363 /// everything needed to ship off to the blocking step ··· 507 510 storage::resync_queue::enqueue_into( 508 511 &mut batch, 509 512 db, 510 - crate::util::unix_now(), 513 + crate::util::unix_now_ms(), 511 514 &crate::storage::resync_queue::ResyncItem { 512 515 did: did.clone(), 513 516 retry_count: 0, ··· 584 587 storage::resync_queue::enqueue_into( 585 588 &mut batch, 586 589 db, 587 - crate::util::unix_now(), 590 + crate::util::unix_now_ms(), 588 591 &crate::storage::resync_queue::ResyncItem { 589 592 did: did.clone(), 590 593 retry_count: 0,
+6 -4
src/sync/firehose/sync_event.rs
··· 82 82 83 83 let pds_host: Option<Host> = resolved.pds.host().map(|h| h.to_owned()); 84 84 let blocks = sync.blocks.to_vec(); 85 + let now = crate::util::unix_now_ms(); 85 86 86 87 // ── Steps 2, 6, 7: Blocking storage checks + mark desync + enqueue ─────── 87 88 let outcome = { ··· 92 93 let blocks = blocks.clone(); 93 94 let pds_host = pds_host.clone(); 94 95 tokio::task::spawn_blocking(move || { 95 - check_and_enqueue(&db, did, rev, mst_root, blocks, pds_host) 96 + check_and_enqueue(&db, did, rev, mst_root, blocks, pds_host, now) 96 97 }) 97 98 .await?? 98 99 }; ··· 120 121 batch 121 122 .commit() 122 123 .map_err(Into::<crate::storage::StorageError>::into)?; 123 - check_and_enqueue(&db_owned, did, rev, mst_root, blocks, pds_host).map(|_| ()) 124 + check_and_enqueue(&db_owned, did, rev, mst_root, blocks, pds_host, now).map(|_| ()) 124 125 }) 125 126 .await??; 126 127 } ··· 183 184 mst_root: Vec<u8>, 184 185 blocks: Vec<u8>, 185 186 pds_host: Option<Host>, 187 + now_ms: u64, 186 188 ) -> crate::error::Result<SyncCheckOutcome> { 187 189 // A #sync event is unambiguously sync1.1 — upgrade the PDS mode eagerly, 188 190 // before any drop checks, since the presence of a valid signed event is ··· 222 224 } 223 225 224 226 // Steps 6, 7: shared drop checks (active status already verified above). 225 - if validate::should_drop(&info, prev.as_ref(), &rev, "sync", &did) { 227 + if validate::should_drop(&info, prev.as_ref(), &rev, "sync", &did, now_ms) { 226 228 return Ok(SyncCheckOutcome::Done); 227 229 } 228 230 ··· 241 243 storage::resync_queue::enqueue_into( 242 244 &mut batch, 243 245 db, 244 - crate::util::unix_now(), 246 + now_ms, 245 247 &ResyncItem { 246 248 did, 247 249 retry_count: 0,
+4 -3
src/sync/firehose/validate.rs
··· 23 23 24 24 /// How many seconds in the future a `rev` timestamp may be before the event 25 25 /// is dropped as implausibly future-dated. 26 - pub(super) const REV_FUTURE_TOLERANCE_SECS: u64 = 300; 26 + pub(super) const REV_FUTURE_TOLERANCE_MS: u64 = 300_000; 27 27 28 28 // --------------------------------------------------------------------------- 29 29 // Shared drop-reason type ··· 226 226 rev: &Tid, 227 227 label: &'static str, 228 228 did: &Did<'_>, 229 + now_ms: u64, 229 230 ) -> bool { 230 231 if !info.status.is_active() { 231 232 metrics::counter!("lightrail_event_dropped_total", ··· 255 256 return true; 256 257 } 257 258 258 - let rev_secs = rev.timestamp() / 1_000_000; 259 - if rev_secs > crate::util::unix_now() + REV_FUTURE_TOLERANCE_SECS { 259 + let rev_ms = rev.timestamp() / 1_000; // micros -> millis 260 + if rev_ms > now_ms + REV_FUTURE_TOLERANCE_MS { 260 261 metrics::counter!("lightrail_event_dropped_total", 261 262 "event_type" => label, "reason" => "future_rev") 262 263 .increment(1);
+18 -16
src/sync/resync/dispatcher.rs
··· 28 28 repo::{AccountStatus, RepoInfo, RepoState}, 29 29 resync_queue::ResyncItem, 30 30 }; 31 - use crate::util::{TokenExt, unix_now}; 31 + use crate::util::{TokenExt, unix_now_ms}; 32 32 33 33 /// How long to wait between queue polls when no workers are running. 34 34 const IDLE_POLL_MS: u64 = 500; ··· 37 37 /// 38 38 /// Used when the PDS does not send a `Retry-After` header. During this window 39 39 /// the dispatcher will not dispatch new work to that host. 40 - const RATE_LIMIT_COOLDOWN_SECS: u64 = 60; 40 + const RATE_LIMIT_COOLDOWN_MS: u64 = 20_000; 41 41 42 42 pub struct DispatcherConfig { 43 43 pub resolver: std::sync::Arc<crate::identity::Resolver>, ··· 79 79 loop { 80 80 // Fill up to max_concurrent slots from the queue. 81 81 while workers.len() < max_concurrent { 82 - let now = unix_now(); 82 + let now = unix_now_ms(); 83 83 let claim = { 84 84 let db = db.clone(); 85 85 let since = since.clone(); ··· 106 106 let host = resolved.pds.host_str().unwrap_or("").to_string(); 107 107 match cooling_hosts.get(&host) { 108 108 Some(&until) if Instant::now() < until => { 109 - let remaining = 110 - until.duration_since(Instant::now()).as_secs().max(1); 109 + let remaining_ms = 110 + until.duration_since(Instant::now()).as_millis().max(1000) 111 + as u64; 111 112 let db = db.clone(); 112 - let ts = unix_now() + remaining; 113 + let ts = unix_now_ms() + remaining_ms; 113 114 match tokio::task::spawn_blocking(move || { 114 115 storage::resync_queue::enqueue(&db, ts, &item) 115 116 }) ··· 122 123 "failed to defer during host cooldown (panic)"), 123 124 } 124 125 busy.remove(&did); 125 - debug!(did = %did, host = %host, remaining_secs = remaining, 126 + debug!(did = %did, host = %host, remaining_ms, 126 127 "deferring; PDS host cooling down after 429"); 127 128 break; 128 129 } ··· 197 198 .increment(1); 198 199 cooling_hosts.insert( 199 200 host.clone(), 200 - Instant::now() + Duration::from_secs(RATE_LIMIT_COOLDOWN_SECS), 201 + Instant::now() + Duration::from_millis(RATE_LIMIT_COOLDOWN_MS), 201 202 ); 202 - warn!(did = %did, host = %host, cooldown_secs = RATE_LIMIT_COOLDOWN_SECS, 203 + warn!(did = %did, host = %host, cooldown_ms = RATE_LIMIT_COOLDOWN_MS, 203 204 "PDS rate-limited; cooling down"); 204 205 let item = ResyncItem { 205 206 did: did.clone(), ··· 208 209 commit_cbor: vec![], 209 210 }; 210 211 let db = db.clone(); 211 - let ts = unix_now() + RATE_LIMIT_COOLDOWN_SECS; 212 + let ts = unix_now_ms() + RATE_LIMIT_COOLDOWN_MS; 212 213 match tokio::task::spawn_blocking(move || { 213 214 storage::resync_queue::enqueue(&db, ts, &item) 214 215 }) ··· 358 359 ) 359 360 .await?; 360 361 let new_retry = retry_count.saturating_add(1); 361 - let delay = backoff_secs(new_retry); 362 - metrics::histogram!("lightrail_resync_retry_delay_seconds").record(delay as f64); 362 + let delay_ms = backoff_secs(new_retry) * 1000; 363 + metrics::histogram!("lightrail_resync_retry_delay_ms").record(delay_ms as f64); 363 364 let item = ResyncItem { 364 365 did: did.clone(), 365 366 retry_count: new_retry, 366 367 retry_reason: error, 367 368 commit_cbor: vec![], 368 369 }; 369 - let ts = unix_now() + delay; 370 + let ts = unix_now_ms() + delay_ms; 370 371 tokio::task::spawn_blocking(move || storage::resync_queue::enqueue(&db, ts, &item)) 371 372 .await??; 372 - info!(did = %did, retry = new_retry, delay_secs = delay, "re-enqueued for retry"); 373 + info!(did = %did, retry = new_retry, delay_ms, "re-enqueued for retry"); 373 374 } 374 375 WorkerOutcome::NotFound { retry_count } => { 375 376 metrics::counter!("lightrail_resync_completed_total", "outcome" => "not_found") ··· 382 383 retry_reason: "repo not found".to_string(), 383 384 commit_cbor: vec![], 384 385 }; 385 - let ts = unix_now() + delay; 386 + let delay_ms = delay * 1000; 387 + let ts = unix_now_ms() + delay_ms; 386 388 tokio::task::spawn_blocking(move || storage::resync_queue::enqueue(&db, ts, &item)) 387 389 .await??; 388 390 info!(did = %did, retry = new_retry, delay_secs = delay, ··· 484 486 // Short delay so we don't immediately re-panic on the same broken input. 485 487 let db2 = db.clone(); 486 488 match tokio::task::spawn_blocking(move || { 487 - storage::resync_queue::enqueue(&db2, unix_now() + 60, &item) 489 + storage::resync_queue::enqueue(&db2, unix_now_ms() + 60_000, &item) 488 490 }) 489 491 .await 490 492 {
+3 -3
src/util.rs
··· 2 2 use tokio::time::{Duration, sleep}; 3 3 use tokio_util::sync::CancellationToken; 4 4 5 - pub fn unix_now() -> u64 { 5 + pub fn unix_now_ms() -> u64 { 6 6 SystemTime::now() 7 7 .duration_since(UNIX_EPOCH) 8 - .unwrap_or_default() 9 - .as_secs() 8 + .expect("not to be way in the past") 9 + .as_millis() as u64 10 10 } 11 11 12 12 pub trait TokenExt {