ceres: a small planet in a giant solar system
33
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 113 lines 3.5 kB view raw
1//! Timestamp-ordered backfill queue backed by fjall. 2//! 3//! Keys: `"bfq"<ts_be:u64>\0<did>` — big-endian timestamp gives FIFO ordering. 4//! Values: postcard-encoded [`BackfillJob`]. 5 6use std::collections::HashSet; 7use std::time::{SystemTime, UNIX_EPOCH}; 8 9use serde::{Deserialize, Serialize}; 10 11use crate::storage::{DbRef, PREFIX_BACKFILL, StorageError, StorageResult}; 12 13const NUL: u8 = b'\0'; 14 15#[derive(Debug, Clone, Serialize, Deserialize)] 16pub struct BackfillJob { 17 pub did: String, 18 pub pds_host: String, 19 pub cursor: Option<String>, 20 pub retry_count: u16, 21 pub reason: String, 22} 23 24fn to_millis(t: SystemTime) -> u64 { 25 t.duration_since(UNIX_EPOCH) 26 .expect("system clock before Unix epoch") 27 .as_millis() as u64 28} 29 30fn key(ts: u64, did: &str) -> Vec<u8> { 31 let mut k = Vec::with_capacity(PREFIX_BACKFILL.len() + 8 + 1 + did.len()); 32 k.extend_from_slice(&PREFIX_BACKFILL); 33 k.extend_from_slice(&ts.to_be_bytes()); 34 k.push(NUL); 35 k.extend_from_slice(did.as_bytes()); 36 k 37} 38 39fn parse_did(raw: &[u8]) -> StorageResult<String> { 40 let rest = raw 41 .strip_prefix(PREFIX_BACKFILL.as_slice()) 42 .ok_or(StorageError::Corrupt { 43 key: String::from_utf8_lossy(raw).into_owned(), 44 reason: "wrong prefix", 45 })?; 46 if rest.len() < 9 { 47 return Err(StorageError::Corrupt { 48 key: String::from_utf8_lossy(raw).into_owned(), 49 reason: "key too short", 50 }); 51 } 52 let did_bytes = rest[9..].to_vec(); 53 String::from_utf8(did_bytes).map_err(|_| StorageError::Corrupt { 54 key: String::from_utf8_lossy(raw).into_owned(), 55 reason: "did not utf-8", 56 }) 57} 58 59/// Enqueue a job to be claimed when wall-clock time reaches `when`. 60pub fn _enqueue(db: &DbRef, when: SystemTime, job: &BackfillJob) -> StorageResult<()> { 61 let ts = to_millis(when); 62 let k = key(ts, &job.did); 63 let v = postcard::to_allocvec(job)?; 64 db.persistent.insert(k, v)?; 65 Ok(()) 66} 67 68/// Claim the next ready job whose timestamp is `<= now` and whose DID is not already in flight. 69/// 70/// Atomically removes the key before returning. `since` is an opaque cursor from a prior 71/// `claim` call — callers can hold it in memory to skip over tombstones left by prior claims. 72pub fn claim( 73 db: &DbRef, 74 now: SystemTime, 75 since: Option<Vec<u8>>, 76 busy: &HashSet<String>, 77) -> StorageResult<Option<(BackfillJob, Vec<u8>)>> { 78 let now_ms = to_millis(now); 79 80 let lower = match since { 81 Some(suffix) => { 82 let mut k = PREFIX_BACKFILL.to_vec(); 83 k.extend_from_slice(&suffix); 84 k 85 } 86 None => PREFIX_BACKFILL.to_vec(), 87 }; 88 // Exclusive upper bound at "now": only claim jobs whose ts < now_ms. 89 let mut upper = PREFIX_BACKFILL.to_vec(); 90 upper.extend_from_slice(&now_ms.to_be_bytes()); 91 upper.push(NUL); 92 93 for guard in db.persistent.range(lower..upper) { 94 let (key_slice, val_slice) = guard.into_inner()?; 95 let key_bytes = key_slice.as_ref(); 96 let did = parse_did(key_bytes)?; 97 98 if busy.contains(&did) { 99 continue; 100 } 101 102 let job: BackfillJob = postcard::from_bytes(val_slice.as_ref())?; 103 let next_since = key_bytes[PREFIX_BACKFILL.len()..].to_vec(); 104 105 let mut batch = db.database.batch(); 106 batch.remove_weak(&db.persistent, key_bytes); 107 batch.commit()?; 108 109 return Ok(Some((job, next_since))); 110 } 111 112 Ok(None) 113}