ceres: a small planet in a giant solar system
1//! Timestamp-ordered backfill queue backed by fjall.
2//!
3//! Keys: `"bfq"<ts_be:u64>\0<did>` — big-endian timestamp gives FIFO ordering.
4//! Values: postcard-encoded [`BackfillJob`].
5
6use std::collections::HashSet;
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use serde::{Deserialize, Serialize};
10
11use crate::storage::{DbRef, PREFIX_BACKFILL, StorageError, StorageResult};
12
13const NUL: u8 = b'\0';
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct BackfillJob {
17 pub did: String,
18 pub pds_host: String,
19 pub cursor: Option<String>,
20 pub retry_count: u16,
21 pub reason: String,
22}
23
24fn to_millis(t: SystemTime) -> u64 {
25 t.duration_since(UNIX_EPOCH)
26 .expect("system clock before Unix epoch")
27 .as_millis() as u64
28}
29
30fn key(ts: u64, did: &str) -> Vec<u8> {
31 let mut k = Vec::with_capacity(PREFIX_BACKFILL.len() + 8 + 1 + did.len());
32 k.extend_from_slice(&PREFIX_BACKFILL);
33 k.extend_from_slice(&ts.to_be_bytes());
34 k.push(NUL);
35 k.extend_from_slice(did.as_bytes());
36 k
37}
38
39fn parse_did(raw: &[u8]) -> StorageResult<String> {
40 let rest = raw
41 .strip_prefix(PREFIX_BACKFILL.as_slice())
42 .ok_or(StorageError::Corrupt {
43 key: String::from_utf8_lossy(raw).into_owned(),
44 reason: "wrong prefix",
45 })?;
46 if rest.len() < 9 {
47 return Err(StorageError::Corrupt {
48 key: String::from_utf8_lossy(raw).into_owned(),
49 reason: "key too short",
50 });
51 }
52 let did_bytes = rest[9..].to_vec();
53 String::from_utf8(did_bytes).map_err(|_| StorageError::Corrupt {
54 key: String::from_utf8_lossy(raw).into_owned(),
55 reason: "did not utf-8",
56 })
57}
58
59/// Enqueue a job to be claimed when wall-clock time reaches `when`.
60pub fn _enqueue(db: &DbRef, when: SystemTime, job: &BackfillJob) -> StorageResult<()> {
61 let ts = to_millis(when);
62 let k = key(ts, &job.did);
63 let v = postcard::to_allocvec(job)?;
64 db.persistent.insert(k, v)?;
65 Ok(())
66}
67
68/// Claim the next ready job whose timestamp is `<= now` and whose DID is not already in flight.
69///
70/// Atomically removes the key before returning. `since` is an opaque cursor from a prior
71/// `claim` call — callers can hold it in memory to skip over tombstones left by prior claims.
72pub fn claim(
73 db: &DbRef,
74 now: SystemTime,
75 since: Option<Vec<u8>>,
76 busy: &HashSet<String>,
77) -> StorageResult<Option<(BackfillJob, Vec<u8>)>> {
78 let now_ms = to_millis(now);
79
80 let lower = match since {
81 Some(suffix) => {
82 let mut k = PREFIX_BACKFILL.to_vec();
83 k.extend_from_slice(&suffix);
84 k
85 }
86 None => PREFIX_BACKFILL.to_vec(),
87 };
88 // Exclusive upper bound at "now": only claim jobs whose ts < now_ms.
89 let mut upper = PREFIX_BACKFILL.to_vec();
90 upper.extend_from_slice(&now_ms.to_be_bytes());
91 upper.push(NUL);
92
93 for guard in db.persistent.range(lower..upper) {
94 let (key_slice, val_slice) = guard.into_inner()?;
95 let key_bytes = key_slice.as_ref();
96 let did = parse_did(key_bytes)?;
97
98 if busy.contains(&did) {
99 continue;
100 }
101
102 let job: BackfillJob = postcard::from_bytes(val_slice.as_ref())?;
103 let next_since = key_bytes[PREFIX_BACKFILL.len()..].to_vec();
104
105 let mut batch = db.database.batch();
106 batch.remove_weak(&db.persistent, key_bytes);
107 batch.commit()?;
108
109 return Ok(Some((job, next_since)));
110 }
111
112 Ok(None)
113}