//! Timestamp-ordered backfill queue backed by fjall. //! //! Keys: `"bfq"\0` — big-endian timestamp gives FIFO ordering. //! Values: postcard-encoded [`BackfillJob`]. use std::collections::HashSet; use std::time::{SystemTime, UNIX_EPOCH}; use serde::{Deserialize, Serialize}; use crate::storage::{DbRef, PREFIX_BACKFILL, StorageError, StorageResult}; const NUL: u8 = b'\0'; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct BackfillJob { pub did: String, pub pds_host: String, pub cursor: Option, pub retry_count: u16, pub reason: String, } fn to_millis(t: SystemTime) -> u64 { t.duration_since(UNIX_EPOCH) .expect("system clock before Unix epoch") .as_millis() as u64 } fn key(ts: u64, did: &str) -> Vec { let mut k = Vec::with_capacity(PREFIX_BACKFILL.len() + 8 + 1 + did.len()); k.extend_from_slice(&PREFIX_BACKFILL); k.extend_from_slice(&ts.to_be_bytes()); k.push(NUL); k.extend_from_slice(did.as_bytes()); k } fn parse_did(raw: &[u8]) -> StorageResult { let rest = raw .strip_prefix(PREFIX_BACKFILL.as_slice()) .ok_or(StorageError::Corrupt { key: String::from_utf8_lossy(raw).into_owned(), reason: "wrong prefix", })?; if rest.len() < 9 { return Err(StorageError::Corrupt { key: String::from_utf8_lossy(raw).into_owned(), reason: "key too short", }); } let did_bytes = rest[9..].to_vec(); String::from_utf8(did_bytes).map_err(|_| StorageError::Corrupt { key: String::from_utf8_lossy(raw).into_owned(), reason: "did not utf-8", }) } /// Enqueue a job to be claimed when wall-clock time reaches `when`. pub fn _enqueue(db: &DbRef, when: SystemTime, job: &BackfillJob) -> StorageResult<()> { let ts = to_millis(when); let k = key(ts, &job.did); let v = postcard::to_allocvec(job)?; db.persistent.insert(k, v)?; Ok(()) } /// Claim the next ready job whose timestamp is `<= now` and whose DID is not already in flight. /// /// Atomically removes the key before returning. `since` is an opaque cursor from a prior /// `claim` call — callers can hold it in memory to skip over tombstones left by prior claims. pub fn claim( db: &DbRef, now: SystemTime, since: Option>, busy: &HashSet, ) -> StorageResult)>> { let now_ms = to_millis(now); let lower = match since { Some(suffix) => { let mut k = PREFIX_BACKFILL.to_vec(); k.extend_from_slice(&suffix); k } None => PREFIX_BACKFILL.to_vec(), }; // Exclusive upper bound at "now": only claim jobs whose ts < now_ms. let mut upper = PREFIX_BACKFILL.to_vec(); upper.extend_from_slice(&now_ms.to_be_bytes()); upper.push(NUL); for guard in db.persistent.range(lower..upper) { let (key_slice, val_slice) = guard.into_inner()?; let key_bytes = key_slice.as_ref(); let did = parse_did(key_bytes)?; if busy.contains(&did) { continue; } let job: BackfillJob = postcard::from_bytes(val_slice.as_ref())?; let next_since = key_bytes[PREFIX_BACKFILL.len()..].to_vec(); let mut batch = db.database.batch(); batch.remove_weak(&db.persistent, key_bytes); batch.commit()?; return Ok(Some((job, next_since))); } Ok(None) }