···148148 key
149149}
150150151151-/// `"repoResyncQueue"\0<ts_be:u64>\0<did>` — timestamp-ordered resync queue.
151151+/// `"repoResyncQueue"\0<ts_be:u64>\0<did>`: timestamp-ordered resync queue.
152152///
153153/// Big-endian timestamp gives natural chronological ordering.
154154pub fn resync_queue(ts_be: u64, did: Did<'_>) -> Vec<u8> {
···162162 key
163163}
164164165165-/// `"repoResyncQueue"\0` — prefix for scanning the entire resync queue.
165165+/// `"repoResyncQueue"\0`: prefix for scanning the entire resync queue.
166166pub fn resync_queue_prefix_all() -> Vec<u8> {
167167 let mut key = Vec::with_capacity(PREFIX_RESYNC_QUEUE.len() + 1);
168168 key.extend_from_slice(PREFIX_RESYNC_QUEUE);
···170170 key
171171}
172172173173-/// `"repoResyncQueue"\0<ts_be:u64>\0` — prefix for scanning resync queue up to a timestamp.
174174-pub fn resync_queue_prefix(ts_be: u64) -> Vec<u8> {
175175- let mut key = Vec::with_capacity(PREFIX_RESYNC_QUEUE.len() + 1 + 8 + 1);
176176- key.extend_from_slice(PREFIX_RESYNC_QUEUE);
177177- key.push(NUL_SEP);
173173+/// `<ts_be:u64>\0`: timestamp middle serialization for resync queue
174174+///
175175+/// can be directly concattenated after resync_queue_prefix_all()
176176+pub fn resync_queue_ts_midfix(ts_be: u64) -> Vec<u8> {
177177+ let mut key = Vec::with_capacity(8 + 1);
178178 key.extend_from_slice(&ts_be.to_be_bytes());
179179 key.push(NUL_SEP);
180180 key
···184184///
185185/// Key layout: `"repoResyncQueue"\0<ts_be:u64>\0<did>`
186186/// Returns `None` if the key is structurally invalid.
187187-pub fn resync_queue_parse(key: &[u8]) -> Option<(u64, Did<'static>)> {
188188- // TODO: error on unparseable!
187187+pub fn resync_queue_parse(key: &[u8]) -> StorageResult<(u64, Did<'static>)> {
188188+ let key_str = String::from_utf8_lossy(key);
189189 let rest = key
190190- .strip_prefix(PREFIX_RESYNC_QUEUE)?
191191- .strip_prefix(&[NUL_SEP])?;
190190+ .strip_prefix(PREFIX_RESYNC_QUEUE)
191191+ .ok_or(StorageError::Corrupt {
192192+ key: key_str.to_string(),
193193+ reason: "wrong prefix for resync queue",
194194+ })?
195195+ .strip_prefix(&[NUL_SEP])
196196+ .ok_or(StorageError::Corrupt {
197197+ key: key_str.to_string(),
198198+ reason: "wrong prefix for resync queue",
199199+ })?;
192200 if rest.len() < 9 {
193201 // Need at least 8 bytes for timestamp + 1 separator byte.
194194- return None;
202202+ return Err(StorageError::Corrupt {
203203+ key: key_str.to_string(),
204204+ reason: "not enough suffix bytes for resync queue",
205205+ });
195206 }
196196- let ts = u64::from_be_bytes(rest[..8].try_into().ok()?);
197197- let rest = rest[8..].strip_prefix(&[NUL_SEP])?;
198198- let did_str = std::str::from_utf8(rest).ok()?;
199199- Did::new_owned(did_str).ok().map(|did| (ts, did))
207207+ let ts_bytes = rest[..8].try_into().map_err(|_| StorageError::Corrupt {
208208+ key: key_str.to_string(),
209209+ reason: "not enough suffix bytes for resync queue",
210210+ })?;
211211+ let ts = u64::from_be_bytes(ts_bytes);
212212+ let rest = rest[8..]
213213+ .strip_prefix(&[NUL_SEP])
214214+ .ok_or(StorageError::Corrupt {
215215+ key: key_str.to_string(),
216216+ reason: "missing NUL separator in suffix bytes for resync queue",
217217+ })?;
218218+ let did_str = std::str::from_utf8(rest).map_err(|_| StorageError::Corrupt {
219219+ key: key_str.to_string(),
220220+ reason: "invalid bytes for did string for resync queue",
221221+ })?;
222222+ let did = Did::new_owned(did_str).map_err(|_| StorageError::Corrupt {
223223+ key: key_str.to_string(),
224224+ reason: "invalid DID for resync queue",
225225+ })?;
226226+ Ok((ts, did))
200227}
201228202229#[cfg(test)]
···317344 }
318345319346 #[test]
320320- fn resync_queue_prefix_matches_full_key() {
321321- let prefix = resync_queue_prefix(42);
322322- let key = resync_queue(42, did("did:web:example.com"));
323323- assert!(key.starts_with(&prefix));
324324- }
325325-326326- #[test]
327327- fn resync_queue_prefix_does_not_match_different_timestamp() {
328328- let prefix = resync_queue_prefix(42);
329329- let key = resync_queue(43, did("did:web:example.com"));
330330- assert!(!key.starts_with(&prefix));
331331- }
332332-333333- #[test]
334347 fn resync_queue_sorts_by_timestamp() {
335348 let earlier = resync_queue(100, did("did:web:example.com"));
336349 let later = resync_queue(200, did("did:web:example.com"));
···356369357370 #[test]
358371 fn resync_queue_parse_returns_none_for_truncated_key() {
359359- assert!(resync_queue_parse(b"repoResyncQueue\0").is_none());
372372+ assert_eq!(
373373+ resync_queue_parse(b"repoResyncQueue\0"),
374374+ Err(StorageError::Corrupt {
375375+ key: "repoResyncQueue\0".to_string(),
376376+ reason: "not enough suffix bytes for resync queue",
377377+ })
378378+ );
360379 }
361380}
+30-20
src/storage/resync.rs
···33//! Keys: `"repoResyncQueue"\0<ts_be:u64>\0<did>`
44//! Values: `[u16 BE retry_count][u16 BE reason_len][reason_bytes][commit_cbor_bytes]`
5566+use fjall::util::prefixed_range;
67use jacquard_common::types::string::Did;
7889use crate::storage::{
···7980/// note: deleted accounts aren't removed from the resync queue so we need to
8081/// check that (or does the caller deal with it?)
8182///
8282-/// TODO: we actually want to pass in an optional cursor so we can efficiently
8383-/// skip over tombstones. we don't have to persist the cursor to disk, but the
8484-/// caller can hold it in memory over the app's lifetime so we only pay the tomb
8585-/// scan cost once on startup.
8686-pub fn dequeue_ready(db: &DbRef, now: u64) -> StorageResult<Option<ResyncItem>> {
8383+/// `since`: we actually want to pass in a cursor so we can efficiently skip
8484+/// over tombstones. we don't have to persist the cursor to disk, but the caller
8585+/// can hold it in memory over the app's lifetime so we only pay the tomb scan
8686+/// cost once on startup.
8787+pub fn dequeue_ready(
8888+ db: &DbRef,
8989+ now: u64,
9090+ since: Option<Vec<u8>>,
9191+) -> StorageResult<Option<(ResyncItem, Vec<u8>)>> {
8792 let prefix = keys::resync_queue_prefix_all();
8888- for guard in db.ks.prefix(&prefix) {
8989- let (key_slice, val_slice) = guard.into_inner()?;
9090- let key_bytes: &[u8] = key_slice.as_ref();
9191- let Some((ts, did)) = keys::resync_queue_parse(key_bytes) else {
9292- continue;
9393- };
9494- if ts > now {
9595- break; // queue is ordered; no earlier entries remain
9696- }
9797- let key_str = String::from_utf8_lossy(key_bytes).into_owned();
9898- let item = decode(val_slice.as_ref(), &key_str, did)?;
9999- db.ks.remove(key_bytes)?;
100100- return Ok(Some(item));
101101- }
102102- Ok(None)
9393+9494+ let lower_suffix = since.unwrap_or(vec![]);
9595+ let upper_suffix = keys::resync_queue_ts_midfix(now);
9696+9797+ let Some(guard) = db
9898+ .ks
9999+ .range(prefixed_range(prefix, lower_suffix..upper_suffix))
100100+ .next()
101101+ else {
102102+ return Ok(None);
103103+ };
104104+105105+ let (key_slice, val_slice) = guard.into_inner()?;
106106+ let key_bytes = key_slice.to_vec();
107107+ let (ts, did) = keys::resync_queue_parse(&key_bytes)?;
108108+ assert!(ts < now);
109109+ let key_str = String::from_utf8_lossy(&key_bytes).into_owned();
110110+ let item = decode(val_slice.as_ref(), &key_str, did)?;
111111+ db.ks.remove(&key_bytes)?;
112112+ Ok(Some((item, key_bytes)))
103113}