lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

error handling and cursors

phil 5ad3b725 f1525435

+563 -177
+7 -3
src/main.rs
··· 82 82 } 83 83 }); 84 84 85 - tokio::signal::ctrl_c().await?; 86 - eprintln!("Shutting down..."); 87 - token.cancel(); 85 + tokio::select! { 86 + _ = tokio::signal::ctrl_c() => { 87 + eprintln!("Shutting down..."); 88 + token.cancel(); 89 + } 90 + _ = token.cancelled() => {} 91 + }; 88 92 89 93 let (firehose, backfill, server) = tokio::join!(firehose_task, backfill_task, server_task); 90 94 firehose??;
+44 -18
src/server/handler.rs
··· 21 21 22 22 use crate::storage::{DbRef, error::StorageError}; 23 23 24 + pub enum ListReposByCollectionError { 25 + BadCursor, 26 + StorageError, 27 + } 28 + 29 + impl From<StorageError> for ListReposByCollectionError { 30 + fn from(e: StorageError) -> Self { 31 + tracing::error!("Storage error: {e:#}"); 32 + Self::StorageError 33 + } 34 + } 35 + 36 + impl IntoResponse for ListReposByCollectionError { 37 + fn into_response(self) -> Response { 38 + match self { 39 + ListReposByCollectionError::BadCursor => ( 40 + StatusCode::BAD_REQUEST, 41 + Json(json!({ 42 + "error": "InvalidRequest", 43 + "message": "the provided cursor was not valid", 44 + })), 45 + ), 46 + ListReposByCollectionError::StorageError => ( 47 + StatusCode::INTERNAL_SERVER_ERROR, 48 + Json(json!({ 49 + "error": "InternalError", 50 + "message": "Storage issue", 51 + })), 52 + ), 53 + } 54 + .into_response() 55 + } 56 + } 57 + 24 58 /// Handler for `GET /xrpc/com.atproto.sync.listReposByCollection`. 25 59 /// 26 60 /// Performs a cursor-paginated prefix scan over the rbc keyspace, returning ··· 29 63 /// The cursor is the last DID from the previous page. On each request we 30 64 /// scan for `limit + 1` results: if the extra result appears there is a next 31 65 /// page, and we return the last DID of the current page as the next cursor. 66 + /// 67 + /// the `limit` parameter is clamped at 10,000 instead of 2,000 as defined in 68 + /// the lexicon, because bluesky's own collectiondir only clamps at 10k. 32 69 pub async fn list_repos_by_collection( 33 70 State(db): State<DbRef>, 34 71 ExtractXrpc(req): ExtractXrpc<ListReposByCollectionRequest>, 35 - ) -> Result<Json<ListReposByCollectionOutput<'static>>, StatusCode> { 36 - let limit = req.limit.unwrap_or(500).clamp(1, 2000) as usize; 72 + ) -> Result<Json<ListReposByCollectionOutput<'static>>, ListReposByCollectionError> { 73 + let limit = req.limit.unwrap_or(500).clamp(1, 10_000) as usize; 37 74 38 75 // Parse the cursor as a DID, if one was provided. 39 - let cursor: Option<Did<'static>> = req 76 + let cursor = req 40 77 .cursor 41 - .as_ref() 42 78 .map(Did::new_owned) 43 79 .transpose() 44 - .map_err(|_| StatusCode::BAD_REQUEST)?; 80 + .map_err(|_| ListReposByCollectionError::BadCursor)?; 45 81 46 - // Scan one extra to detect whether a next page exists. 47 - let mut dids = crate::storage::list_by::scan_rbc(&db, req.collection, cursor, limit + 1) 48 - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 49 - 50 - // If we got more than limit results, set the next-page cursor to the last 51 - // DID of the current page and drop the extra. 52 - let next_cursor = if dids.len() > limit { 53 - let cursor_did = dids[limit - 1].clone(); 54 - dids.truncate(limit); 55 - Some(cursor_did.into()) // Did<'static> → CowStr<'static> 56 - } else { 57 - None 58 - }; 82 + let (dids, next) = crate::storage::list_by::scan_rbc(&db, req.collection, cursor, limit)?; 59 83 60 84 let repos = dids 61 85 .into_iter() ··· 64 88 extra_data: None, 65 89 }) 66 90 .collect(); 91 + 92 + let next_cursor = next.map(|cursor| cursor.into()); 67 93 68 94 Ok(Json(ListReposByCollectionOutput { 69 95 cursor: next_cursor,
+4 -2
src/storage/cursor.rs
··· 11 11 /// Returns `None` if no cursor has been persisted yet (start from live). 12 12 pub fn get_subscribe_cursor(db: &DbRef, host: &str) -> StorageResult<Option<u64>> { 13 13 let key = keys::subscribe_cursor(host); 14 + let key_str = String::from_utf8_lossy(&key).to_string(); 14 15 match db.ks.get(key)? { 15 16 None => Ok(None), 16 17 Some(v) => { 17 - let bytes: [u8; 8] = v.as_ref().try_into().map_err(|e| { 18 - StorageError::CorruptedCursor(format!("array from slice failed: {e}")) 18 + let bytes: [u8; 8] = v.as_ref().try_into().map_err(|_| StorageError::Corrupt { 19 + key: key_str, 20 + reason: "asdf", 19 21 })?; 20 22 Ok(Some(u64::from_be_bytes(bytes))) 21 23 }
+20 -2
src/storage/error.rs
··· 4 4 pub enum StorageError { 5 5 #[error("fjall: {0}")] 6 6 Fjall(#[from] fjall::Error), 7 - #[error("corrupted cursor: {0}")] 8 - CorruptedCursor(String), 7 + #[error("corruption at key {key}: {reason}")] 8 + Corrupt { key: String, reason: &'static str }, 9 + } 10 + 11 + impl PartialEq for StorageError { 12 + fn eq(&self, other: &StorageError) -> bool { 13 + match (self, other) { 14 + ( 15 + StorageError::Corrupt { 16 + key: ka, 17 + reason: ra, 18 + }, 19 + StorageError::Corrupt { 20 + key: kb, 21 + reason: rb, 22 + }, 23 + ) => ka == kb && ra == rb, 24 + _ => unimplemented!(), 25 + } 26 + } 9 27 } 10 28 11 29 pub type StorageResult<T> = Result<T, StorageError>;
+63 -29
src/storage/keys.rs
··· 6 6 use jacquard_common::types::nsid::Nsid; 7 7 use jacquard_common::types::string::Did; 8 8 9 - // Partition key prefixes 9 + use super::{StorageError, StorageResult}; 10 + 11 + // Partition key prefixes: mut be unique 10 12 pub const PREFIX_RBC: &[u8] = b"rbc"; 11 13 pub const PREFIX_CBR: &[u8] = b"cbr"; 12 14 pub const PREFIX_REPO: &[u8] = b"repo"; ··· 15 17 pub const PREFIX_LIST_REPOS: &[u8] = b"listRepos"; 16 18 pub const PREFIX_RESYNC_QUEUE: &[u8] = b"repoResyncQueue"; 17 19 18 - const SEP: u8 = b'\0'; 20 + const NUL_SEP: u8 = b'\0'; 19 21 20 22 /// `"rbc"\0<collection>\0<did>` — main collection index. 21 23 /// ··· 25 27 let d = did.as_str(); 26 28 let mut key = Vec::with_capacity(PREFIX_RBC.len() + 1 + col.len() + 1 + d.len()); 27 29 key.extend_from_slice(PREFIX_RBC); 28 - key.push(SEP); 30 + key.push(NUL_SEP); 29 31 key.extend_from_slice(col.as_bytes()); 30 - key.push(SEP); 32 + key.push(NUL_SEP); 31 33 key.extend_from_slice(d.as_bytes()); 32 34 key 33 35 } ··· 37 39 let col = collection.as_str(); 38 40 let mut key = Vec::with_capacity(PREFIX_RBC.len() + 1 + col.len() + 1); 39 41 key.extend_from_slice(PREFIX_RBC); 40 - key.push(SEP); 42 + key.push(NUL_SEP); 41 43 key.extend_from_slice(col.as_bytes()); 42 - key.push(SEP); 44 + key.push(NUL_SEP); 43 45 key 44 46 } 45 47 46 - /// Parse the DID suffix from a full rbc key, given the prefix length. 48 + /// the serialized DID, no null prefix or terminator 47 49 /// 48 - /// Returns `None` if the suffix bytes are not valid UTF-8 or not a valid DID. 49 - /// Typically used after prefix-scanning the rbc partition. 50 - pub fn rbc_parse_did(key: &[u8], prefix_len: usize) -> Option<Did<'static>> { 51 - let did_str = std::str::from_utf8(key.get(prefix_len..)?).ok()?; 52 - Did::new_owned(did_str).ok() 50 + /// rbc_prefix has a trailing null-terminator so this can be joined directly 51 + pub fn rbc_suffix(did: Did<'_>) -> Vec<u8> { 52 + did.as_str().as_bytes().to_vec() 53 + } 54 + 55 + /// Parse the DID suffix from a full rbc key, given the prefix length. 56 + pub fn rbc_parse_did(key: &[u8], prefix_len: usize) -> StorageResult<Did<'static>> { 57 + let key_str = String::from_utf8_lossy(key); 58 + let suffix = key.get(prefix_len..).ok_or(StorageError::Corrupt { 59 + key: key_str.to_string(), 60 + reason: "invalid prefix when parsing did in rbc", 61 + })?; 62 + let did_str = std::str::from_utf8(suffix).map_err(|_| StorageError::Corrupt { 63 + key: key_str.to_string(), 64 + reason: "invalid string suffix for DID in rbc suffix", 65 + })?; 66 + let did = Did::new_owned(did_str).map_err(|_| StorageError::Corrupt { 67 + key: key_str.to_string(), 68 + reason: "invalid DID in rbc suffix", 69 + })?; 70 + Ok(did) 53 71 } 54 72 55 73 /// `"cbr"\0<did>\0<collection>` — reversed index for per-repo collection lookup. ··· 60 78 let col = collection.as_str(); 61 79 let mut key = Vec::with_capacity(PREFIX_CBR.len() + 1 + d.len() + 1 + col.len()); 62 80 key.extend_from_slice(PREFIX_CBR); 63 - key.push(SEP); 81 + key.push(NUL_SEP); 64 82 key.extend_from_slice(d.as_bytes()); 65 - key.push(SEP); 83 + key.push(NUL_SEP); 66 84 key.extend_from_slice(col.as_bytes()); 67 85 key 68 86 } ··· 72 90 let d = did.as_str(); 73 91 let mut key = Vec::with_capacity(PREFIX_CBR.len() + 1 + d.len() + 1); 74 92 key.extend_from_slice(PREFIX_CBR); 75 - key.push(SEP); 93 + key.push(NUL_SEP); 76 94 key.extend_from_slice(d.as_bytes()); 77 - key.push(SEP); 95 + key.push(NUL_SEP); 78 96 key 79 97 } 80 98 ··· 83 101 /// Returns `None` if the suffix bytes are not valid UTF-8 or not a valid NSID. 84 102 /// Typically used after prefix-scanning the cbr partition. 85 103 pub fn cbr_parse_collection(key: &[u8], prefix_len: usize) -> Option<Nsid<'static>> { 104 + // TODO: we should error on unparseable! 86 105 let nsid_str = std::str::from_utf8(key.get(prefix_len..)?).ok()?; 87 106 Nsid::new_owned(nsid_str).ok() 88 107 } ··· 92 111 let d = did.as_str(); 93 112 let mut key = Vec::with_capacity(PREFIX_REPO.len() + 1 + d.len()); 94 113 key.extend_from_slice(PREFIX_REPO); 95 - key.push(SEP); 114 + key.push(NUL_SEP); 96 115 key.extend_from_slice(d.as_bytes()); 97 116 key 98 117 } ··· 102 121 let d = did.as_str(); 103 122 let mut key = Vec::with_capacity(PREFIX_REPO_PREV.len() + 1 + d.len()); 104 123 key.extend_from_slice(PREFIX_REPO_PREV); 105 - key.push(SEP); 124 + key.push(NUL_SEP); 106 125 key.extend_from_slice(d.as_bytes()); 107 126 key 108 127 } ··· 113 132 let mut key = 114 133 Vec::with_capacity(PREFIX_SUBSCRIBE_REPOS.len() + 1 + host.len() + 1 + SUFFIX.len()); 115 134 key.extend_from_slice(PREFIX_SUBSCRIBE_REPOS); 116 - key.push(SEP); 135 + key.push(NUL_SEP); 117 136 key.extend_from_slice(host.as_bytes()); 118 - key.push(SEP); 137 + key.push(NUL_SEP); 119 138 key.extend_from_slice(SUFFIX); 120 139 key 121 140 } ··· 124 143 pub fn list_repos_progress(host: &str) -> Vec<u8> { 125 144 let mut key = Vec::with_capacity(PREFIX_LIST_REPOS.len() + 1 + host.len()); 126 145 key.extend_from_slice(PREFIX_LIST_REPOS); 127 - key.push(SEP); 146 + key.push(NUL_SEP); 128 147 key.extend_from_slice(host.as_bytes()); 129 148 key 130 149 } ··· 136 155 let d = did.as_str(); 137 156 let mut key = Vec::with_capacity(PREFIX_RESYNC_QUEUE.len() + 1 + 8 + 1 + d.len()); 138 157 key.extend_from_slice(PREFIX_RESYNC_QUEUE); 139 - key.push(SEP); 158 + key.push(NUL_SEP); 140 159 key.extend_from_slice(&ts_be.to_be_bytes()); 141 - key.push(SEP); 160 + key.push(NUL_SEP); 142 161 key.extend_from_slice(d.as_bytes()); 143 162 key 144 163 } 145 164 165 + /// `"repoResyncQueue"\0` — prefix for scanning the entire resync queue. 166 + pub fn resync_queue_prefix_all() -> Vec<u8> { 167 + let mut key = Vec::with_capacity(PREFIX_RESYNC_QUEUE.len() + 1); 168 + key.extend_from_slice(PREFIX_RESYNC_QUEUE); 169 + key.push(NUL_SEP); 170 + key 171 + } 172 + 146 173 /// `"repoResyncQueue"\0<ts_be:u64>\0` — prefix for scanning resync queue up to a timestamp. 147 174 pub fn resync_queue_prefix(ts_be: u64) -> Vec<u8> { 148 175 let mut key = Vec::with_capacity(PREFIX_RESYNC_QUEUE.len() + 1 + 8 + 1); 149 176 key.extend_from_slice(PREFIX_RESYNC_QUEUE); 150 - key.push(SEP); 177 + key.push(NUL_SEP); 151 178 key.extend_from_slice(&ts_be.to_be_bytes()); 152 - key.push(SEP); 179 + key.push(NUL_SEP); 153 180 key 154 181 } 155 182 ··· 158 185 /// Key layout: `"repoResyncQueue"\0<ts_be:u64>\0<did>` 159 186 /// Returns `None` if the key is structurally invalid. 160 187 pub fn resync_queue_parse(key: &[u8]) -> Option<(u64, Did<'static>)> { 188 + // TODO: error on unparseable! 161 189 let rest = key 162 190 .strip_prefix(PREFIX_RESYNC_QUEUE)? 163 - .strip_prefix(&[SEP])?; 191 + .strip_prefix(&[NUL_SEP])?; 164 192 if rest.len() < 9 { 165 193 // Need at least 8 bytes for timestamp + 1 separator byte. 166 194 return None; 167 195 } 168 196 let ts = u64::from_be_bytes(rest[..8].try_into().ok()?); 169 - let rest = rest[8..].strip_prefix(&[SEP])?; 197 + let rest = rest[8..].strip_prefix(&[NUL_SEP])?; 170 198 let did_str = std::str::from_utf8(rest).ok()?; 171 199 Did::new_owned(did_str).ok().map(|did| (ts, did)) 172 200 } ··· 228 256 let d = did("did:web:example.com"); 229 257 let key = rbc(col.clone(), d.clone()); 230 258 let prefix_len = rbc_prefix(col).len(); 231 - assert_eq!(rbc_parse_did(&key, prefix_len), Some(d)); 259 + assert_eq!(rbc_parse_did(&key, prefix_len), Ok(d)); 232 260 } 233 261 234 262 #[test] ··· 236 264 let prefix = rbc_prefix(nsid("app.bsky.feed.post")); 237 265 // Prefix alone has no DID suffix — but it is valid UTF-8 empty string, 238 266 // which is not a valid DID, so new_owned should reject it. 239 - assert!(rbc_parse_did(&prefix, prefix.len()).is_none()); 267 + assert_eq!( 268 + rbc_parse_did(&prefix, prefix.len()), 269 + Err(StorageError::Corrupt { 270 + key: "rbc\0app.bsky.feed.post\0".to_string(), 271 + reason: "invalid DID in rbc suffix", 272 + }) 273 + ); 240 274 } 241 275 242 276 // --- cbr ---
+84 -89
src/storage/list_by.rs
··· 6 6 use jacquard_common::types::string::Did; 7 7 8 8 use crate::storage::{DbRef, StorageResult, keys}; 9 + use fjall::util::prefixed_range; 10 + 11 + /// Iterate over DIDs in the rbc index for `collection`, starting after `cursor`. 12 + /// 13 + /// Returns at most `limit` DIDs. 14 + pub fn scan_rbc( 15 + db: &DbRef, 16 + collection: Nsid<'_>, 17 + cursor: Option<Did<'_>>, 18 + limit: usize, 19 + ) -> StorageResult<(Vec<Did<'static>>, Option<Did<'static>>)> { 20 + let prefix = keys::rbc_prefix(collection.clone()); 21 + let prefix_len = prefix.len(); 22 + 23 + let lower_did = cursor.clone().map(keys::rbc_suffix).unwrap_or(vec![]); 24 + let mut ranger = db.ks.range(prefixed_range(prefix.clone(), lower_did..)); 25 + 26 + let mut dids = Vec::with_capacity(limit); 27 + for guard in ranger.by_ref() { 28 + let (k, _v) = guard.into_inner()?; 29 + assert!(k.starts_with(&prefix)); 30 + let did = keys::rbc_parse_did(&k, prefix_len)?; 31 + dids.push(did); 32 + if dids.len() >= limit { 33 + break; 34 + } 35 + } 36 + 37 + let next = if let Some(guard) = ranger.next() { 38 + let key = guard.key()?; 39 + let did = keys::rbc_parse_did(&key, prefix_len)?; 40 + Some(did) 41 + } else { 42 + None 43 + }; 44 + 45 + Ok((dids, next)) 46 + } 47 + 48 + /// Iterate over collections in the cbr index for `did`, starting after `cursor`. 49 + /// 50 + /// Returns at most `limit` NSIDs. 51 + /// 52 + /// TODO: we can fjall range to the collection's next-after-max (might even be 53 + /// exposed now?) or maybe use prefix + seek for the start? 54 + pub fn scan_cbr( 55 + db: &DbRef, 56 + did: Did<'_>, 57 + cursor: Option<Nsid<'_>>, 58 + limit: usize, 59 + ) -> StorageResult<Vec<Nsid<'static>>> { 60 + let prefix = keys::cbr_prefix(did.clone()); 61 + let prefix_len = prefix.len(); 62 + 63 + let start_key: Vec<u8> = match cursor { 64 + None => prefix.clone(), 65 + Some(ref col) => keys::cbr(did.clone(), col.clone()), 66 + }; 67 + 68 + let mut cols = Vec::with_capacity(limit); 69 + for guard in db.ks.range(start_key..) { 70 + let (k, _v) = guard.into_inner()?; 71 + if !k.starts_with(&prefix) { 72 + break; 73 + } 74 + if cols.len() >= limit { 75 + break; 76 + } 77 + // Skip the cursor key itself. 78 + if cursor 79 + .clone() 80 + .is_some_and(|c| k.as_ref() == keys::cbr(did.clone(), c).as_slice()) 81 + { 82 + continue; 83 + } 84 + if let Some(col) = keys::cbr_parse_collection(&k, prefix_len) { 85 + cols.push(col); 86 + } 87 + } 88 + Ok(cols) 89 + } 9 90 10 91 /// Add a `(collection, did)` pair to both indexes within an existing batch. 11 92 /// ··· 250 331 251 332 assert_eq!( 252 333 scan_rbc(&db, nsid("app.bsky.actor.profile"), None, 10).unwrap(), 253 - vec![] as Vec<Did<'static>>, 334 + (vec![], None), 254 335 "removed collection should not appear in rbc", 255 336 ); 256 337 assert_eq!( 257 338 scan_rbc(&db, nsid("app.bsky.feed.post"), None, 10).unwrap(), 258 - vec![d.clone()], 339 + (vec![d.clone()], None), 259 340 "unchanged collection should remain in rbc", 260 341 ); 261 342 assert_eq!( 262 343 scan_rbc(&db, nsid("app.bsky.graph.follow"), None, 10).unwrap(), 263 - vec![d.clone()], 344 + (vec![d.clone()], None), 264 345 "added collection should appear in rbc", 265 346 ); 266 347 } ··· 281 362 assert_eq!(collections_for(&db, &bob), vec![nsid("app.bsky.feed.post")]); 282 363 } 283 364 } 284 - 285 - /// Iterate over DIDs in the rbc index for `collection`, starting after `cursor`. 286 - /// 287 - /// Returns at most `limit` DIDs. 288 - /// 289 - /// TODO: we can fjall range to the collection's next-after-max (might even be 290 - /// exposed now?) or maybe use prefix + seek for the start? 291 - pub fn scan_rbc( 292 - db: &DbRef, 293 - collection: Nsid<'_>, 294 - cursor: Option<Did<'_>>, 295 - limit: usize, 296 - ) -> StorageResult<Vec<Did<'static>>> { 297 - let prefix = keys::rbc_prefix(collection.clone()); 298 - let prefix_len = prefix.len(); 299 - 300 - let start_key: Vec<u8> = match cursor { 301 - None => prefix.clone(), 302 - Some(ref did) => keys::rbc(collection.clone(), did.clone()), 303 - }; 304 - 305 - let mut dids = Vec::with_capacity(limit); 306 - for guard in db.ks.range(start_key..) { 307 - let (k, _v) = guard.into_inner()?; 308 - if !k.starts_with(&prefix) { 309 - break; 310 - } 311 - if dids.len() >= limit { 312 - break; 313 - } 314 - // Skip the cursor key itself. 315 - if cursor 316 - .clone() 317 - .is_some_and(|c| k.as_ref() == keys::rbc(collection.clone(), c).as_slice()) 318 - { 319 - continue; 320 - } 321 - if let Some(did) = keys::rbc_parse_did(&k, prefix_len) { 322 - dids.push(did); 323 - } 324 - } 325 - Ok(dids) 326 - } 327 - 328 - /// Iterate over collections in the cbr index for `did`, starting after `cursor`. 329 - /// 330 - /// Returns at most `limit` NSIDs. 331 - /// 332 - /// TODO: we can fjall range to the collection's next-after-max (might even be 333 - /// exposed now?) or maybe use prefix + seek for the start? 334 - pub fn scan_cbr( 335 - db: &DbRef, 336 - did: Did<'_>, 337 - cursor: Option<Nsid<'_>>, 338 - limit: usize, 339 - ) -> StorageResult<Vec<Nsid<'static>>> { 340 - let prefix = keys::cbr_prefix(did.clone()); 341 - let prefix_len = prefix.len(); 342 - 343 - let start_key: Vec<u8> = match cursor { 344 - None => prefix.clone(), 345 - Some(ref col) => keys::cbr(did.clone(), col.clone()), 346 - }; 347 - 348 - let mut cols = Vec::with_capacity(limit); 349 - for guard in db.ks.range(start_key..) { 350 - let (k, _v) = guard.into_inner()?; 351 - if !k.starts_with(&prefix) { 352 - break; 353 - } 354 - if cols.len() >= limit { 355 - break; 356 - } 357 - // Skip the cursor key itself. 358 - if cursor 359 - .clone() 360 - .is_some_and(|c| k.as_ref() == keys::cbr(did.clone(), c).as_slice()) 361 - { 362 - continue; 363 - } 364 - if let Some(col) = keys::cbr_parse_collection(&k, prefix_len) { 365 - cols.push(col); 366 - } 367 - } 368 - Ok(cols) 369 - }
+262 -23
src/storage/repo.rs
··· 1 1 //! Per-repo state storage. 2 2 3 3 use fjall::Readable; 4 - use jacquard_common::types::{string::Cid, string::Did, tid::Tid}; 4 + use jacquard_common::types::{string::Did, tid::Tid}; 5 5 6 - use crate::storage::{DbRef, error::StorageResult, keys}; 6 + use crate::storage::{ 7 + DbRef, 8 + error::{StorageError, StorageResult}, 9 + keys, 10 + }; 7 11 8 12 /// tap's "RepoState" type 9 13 #[derive(Debug, Clone, PartialEq, Eq)] ··· 31 35 RepoState::Error => "error", 32 36 } 33 37 } 38 + 39 + fn from_str(s: &str) -> Option<Self> { 40 + Some(match s { 41 + "pending" => RepoState::Pending, 42 + "desynchronized" => RepoState::Desynchronized, 43 + "resyncing" => RepoState::Resyncing, 44 + "active" => RepoState::Active, 45 + "takendown" => RepoState::Takendown, 46 + "suspended" => RepoState::Suspended, 47 + "deactivated" => RepoState::Deactivated, 48 + "error" => RepoState::Error, 49 + _ => return None, 50 + }) 51 + } 34 52 } 35 53 36 54 /// tap's "AccountStatus" type ··· 48 66 pub fn is_active(&self) -> bool { 49 67 matches!(self, AccountStatus::Active) 50 68 } 69 + 51 70 pub fn status(&self) -> Option<&str> { 52 71 match self { 53 72 AccountStatus::Active => None, ··· 57 76 AccountStatus::Deleted => Some("deleted"), 58 77 } 59 78 } 79 + 80 + pub fn as_str(&self) -> &str { 81 + match self { 82 + AccountStatus::Active => "active", 83 + AccountStatus::Takendown => "takendown", 84 + AccountStatus::Suspended => "suspended", 85 + AccountStatus::Deactivated => "deactivated", 86 + AccountStatus::Deleted => "deleted", 87 + } 88 + } 89 + 90 + fn from_str(s: &str) -> Option<Self> { 91 + Some(match s { 92 + "active" => AccountStatus::Active, 93 + "takendown" => AccountStatus::Takendown, 94 + "suspended" => AccountStatus::Suspended, 95 + "deactivated" => AccountStatus::Deactivated, 96 + "deleted" => AccountStatus::Deleted, 97 + _ => return None, 98 + }) 99 + } 60 100 } 61 101 62 102 /// Stored info for a repository. ··· 67 107 pub error: Option<String>, 68 108 } 69 109 110 + /// Wire format: `<state>\0<status>[\0<error>]` 111 + /// 112 + /// Neither state nor status strings contain `\0`, so splitting on the first two 113 + /// `\0` bytes is unambiguous. The error field is absent when state != Error. 114 + fn encode_repo_info(info: &RepoInfo) -> Vec<u8> { 115 + let mut v = Vec::new(); 116 + v.extend_from_slice(info.state.as_str().as_bytes()); 117 + v.push(b'\0'); 118 + v.extend_from_slice(info.status.as_str().as_bytes()); 119 + if let Some(err) = &info.error { 120 + v.push(b'\0'); 121 + v.extend_from_slice(err.as_bytes()); 122 + } 123 + v 124 + } 125 + 126 + fn decode_repo_info(bytes: &[u8], key: &str) -> StorageResult<RepoInfo> { 127 + let s = std::str::from_utf8(bytes).map_err(|_| StorageError::Corrupt { 128 + key: key.to_owned(), 129 + reason: "not valid UTF-8", 130 + })?; 131 + let mut parts = s.splitn(3, '\0'); 132 + let state = parts 133 + .next() 134 + .and_then(RepoState::from_str) 135 + .ok_or(StorageError::Corrupt { 136 + key: key.to_owned(), 137 + reason: "invalid state", 138 + })?; 139 + let status = parts 140 + .next() 141 + .and_then(AccountStatus::from_str) 142 + .ok_or(StorageError::Corrupt { 143 + key: key.to_owned(), 144 + reason: "invalid status", 145 + })?; 146 + let error = parts.next().map(str::to_owned); 147 + Ok(RepoInfo { 148 + state, 149 + status, 150 + error, 151 + }) 152 + } 153 + 154 + /// Transient sync state for proof validation. 155 + /// 156 + /// Updated on every firehose commit — the hot write path. 157 + #[derive(Debug, Clone)] 158 + pub struct RepoPrev { 159 + /// The last-seen `rev`. 160 + pub rev: Tid, 161 + /// The last-seen commit CID as raw multihash bytes (from firehose CBOR). 162 + pub prev_data: Vec<u8>, 163 + } 164 + 165 + /// Wire format: `[13 bytes ASCII TID][raw CID bytes]` 166 + /// 167 + /// TIDs are always exactly 13 base32-sortable ASCII characters, so the TID 168 + /// length is a fixed-width prefix — no separator byte needed. 169 + fn encode_repo_prev(prev: &RepoPrev) -> Vec<u8> { 170 + let rev = prev.rev.as_str().as_bytes(); 171 + debug_assert_eq!(rev.len(), 13, "TID must be exactly 13 bytes"); 172 + let mut v = Vec::with_capacity(13 + prev.prev_data.len()); 173 + v.extend_from_slice(rev); 174 + v.extend_from_slice(&prev.prev_data); 175 + v 176 + } 177 + 178 + fn decode_repo_prev(bytes: &[u8], key: &str) -> StorageResult<RepoPrev> { 179 + if bytes.len() < 13 { 180 + return Err(StorageError::Corrupt { 181 + key: key.to_owned(), 182 + reason: "too short for TID", 183 + }); 184 + } 185 + let rev_str = std::str::from_utf8(&bytes[..13]).map_err(|_| StorageError::Corrupt { 186 + key: key.to_owned(), 187 + reason: "TID not UTF-8", 188 + })?; 189 + let rev = Tid::new(rev_str).map_err(|_| StorageError::Corrupt { 190 + key: key.to_owned(), 191 + reason: "invalid TID", 192 + })?; 193 + let prev_data = bytes[13..].to_vec(); 194 + Ok(RepoPrev { rev, prev_data }) 195 + } 196 + 70 197 /// Retrieve both [`RepoInfo`] and [`RepoPrev`] for a `did`. 71 198 /// 72 199 /// `None` if the repo is not indexed. 73 200 pub fn get(db: &DbRef, did: Did<'_>) -> StorageResult<Option<(RepoInfo, Option<RepoPrev>)>> { 201 + let info_key = keys::repo(did.clone()); 202 + let prev_key = keys::repo_prev(did); 74 203 let snapshot = db.database.snapshot(); 75 - Ok(snapshot 76 - .get(&db.ks, keys::repo(did.clone()))? 77 - .map(|info| { 78 - let prev = snapshot.get(&db.ks, keys::repo_prev(did))?; 79 - Ok((info, prev)) 204 + let Some(info_bytes) = snapshot.get(&db.ks, &info_key)? else { 205 + return Ok(None); 206 + }; 207 + let key_str = String::from_utf8_lossy(&info_key); 208 + let info = decode_repo_info(&info_bytes, &key_str)?; 209 + let prev = snapshot 210 + .get(&db.ks, &prev_key)? 211 + .map(|b| { 212 + let key_str = String::from_utf8_lossy(&prev_key); 213 + decode_repo_prev(&b, &key_str) 80 214 }) 81 - .transpose()?) 215 + .transpose()?; 216 + Ok(Some((info, prev))) 82 217 } 83 218 84 219 /// Write a [`RepoInfo`] for `did`. 85 - pub fn put_info(_db: &DbRef, _did: Did<'_>, _record: &RepoInfo) -> StorageResult<()> { 86 - todo!("serialize RepoInfo and write to fjall") 87 - } 88 - 89 - /// Transient sync state for proof validation. 90 - #[derive(Debug, Clone)] 91 - pub struct RepoPrev { 92 - /// The last-seen `rev` string. 93 - pub rev: Tid, 94 - /// The last-seen `prevData` CID (as raw bytes). 95 - pub prev_data: Cid<'static>, 220 + pub fn put_info(db: &DbRef, did: Did<'_>, info: &RepoInfo) -> StorageResult<()> { 221 + let key = keys::repo(did); 222 + db.ks.insert(key, encode_repo_info(info))?; 223 + Ok(()) 96 224 } 97 225 98 226 /// Read the transient [`RepoPrev`] for `did`. 99 - pub fn get_prev(_db: &DbRef, _did: Did<'_>) -> StorageResult<Option<RepoPrev>> { 100 - todo!("deserialize RepoPrev") 227 + pub fn get_prev(db: &DbRef, did: Did<'_>) -> StorageResult<Option<RepoPrev>> { 228 + let key = keys::repo_prev(did); 229 + db.ks 230 + .get(&key)? 231 + .map(|b| { 232 + let k = String::from_utf8_lossy(&key).into_owned(); 233 + decode_repo_prev(&b, &k) 234 + }) 235 + .transpose() 101 236 } 102 237 103 238 /// Write the transient [`RepoPrev`] for `did`. 104 - pub fn put_prev(_db: &DbRef, _did: Did<'_>, _prev: &RepoPrev) -> StorageResult<()> { 105 - todo!("serialize and write RepoPrev") 239 + pub fn put_prev(db: &DbRef, did: Did<'_>, prev: &RepoPrev) -> StorageResult<()> { 240 + let key = keys::repo_prev(did); 241 + db.ks.insert(key, encode_repo_prev(prev))?; 242 + Ok(()) 106 243 } 107 244 108 245 /// Delete the transient [`RepoPrev`] for `did`. ··· 111 248 db.ks.remove(key)?; 112 249 Ok(()) 113 250 } 251 + 252 + #[cfg(test)] 253 + mod tests { 254 + use super::*; 255 + use crate::storage::open_temporary; 256 + 257 + fn did(s: &str) -> Did<'static> { 258 + Did::new_owned(s).unwrap() 259 + } 260 + 261 + fn tid(s: &str) -> Tid { 262 + Tid::new(s).unwrap() 263 + } 264 + 265 + #[test] 266 + fn repo_info_roundtrips_active() { 267 + let info = RepoInfo { 268 + state: RepoState::Active, 269 + status: AccountStatus::Active, 270 + error: None, 271 + }; 272 + let encoded = encode_repo_info(&info); 273 + let decoded = decode_repo_info(&encoded, "test").unwrap(); 274 + assert_eq!(decoded.state, RepoState::Active); 275 + assert_eq!(decoded.status, AccountStatus::Active); 276 + assert!(decoded.error.is_none()); 277 + } 278 + 279 + #[test] 280 + fn repo_info_roundtrips_error_with_message() { 281 + let info = RepoInfo { 282 + state: RepoState::Error, 283 + status: AccountStatus::Suspended, 284 + error: Some("something went wrong".to_owned()), 285 + }; 286 + let encoded = encode_repo_info(&info); 287 + let decoded = decode_repo_info(&encoded, "test").unwrap(); 288 + assert_eq!(decoded.state, RepoState::Error); 289 + assert_eq!(decoded.status, AccountStatus::Suspended); 290 + assert_eq!(decoded.error.as_deref(), Some("something went wrong")); 291 + } 292 + 293 + #[test] 294 + fn repo_prev_roundtrips() { 295 + let prev = RepoPrev { 296 + rev: tid("3lczouzaqmo2e"), 297 + prev_data: vec![0x01, 0x71, 0x12, 0x20, 0xde, 0xad, 0xbe, 0xef], 298 + }; 299 + let encoded = encode_repo_prev(&prev); 300 + let decoded = decode_repo_prev(&encoded, "test").unwrap(); 301 + assert_eq!(decoded.rev.as_str(), "3lczouzaqmo2e"); 302 + assert_eq!( 303 + decoded.prev_data, 304 + vec![0x01, 0x71, 0x12, 0x20, 0xde, 0xad, 0xbe, 0xef] 305 + ); 306 + } 307 + 308 + #[test] 309 + fn put_and_get_repo_info() { 310 + let db = open_temporary().unwrap(); 311 + let d = did("did:web:example.com"); 312 + let info = RepoInfo { 313 + state: RepoState::Active, 314 + status: AccountStatus::Active, 315 + error: None, 316 + }; 317 + put_info(&db, d.clone(), &info).unwrap(); 318 + let (retrieved, prev) = get(&db, d).unwrap().unwrap(); 319 + assert_eq!(retrieved.state, RepoState::Active); 320 + assert!(prev.is_none()); 321 + } 322 + 323 + #[test] 324 + fn put_and_get_prev() { 325 + let db = open_temporary().unwrap(); 326 + let d = did("did:web:example.com"); 327 + // Need a RepoInfo first for get() to return Some. 328 + put_info( 329 + &db, 330 + d.clone(), 331 + &RepoInfo { 332 + state: RepoState::Active, 333 + status: AccountStatus::Active, 334 + error: None, 335 + }, 336 + ) 337 + .unwrap(); 338 + let prev = RepoPrev { 339 + rev: tid("3lczouzaqmo2e"), 340 + prev_data: vec![1, 2, 3, 4], 341 + }; 342 + put_prev(&db, d.clone(), &prev).unwrap(); 343 + let (_, stored_prev) = get(&db, d.clone()).unwrap().unwrap(); 344 + let stored_prev = stored_prev.unwrap(); 345 + assert_eq!(stored_prev.rev.as_str(), "3lczouzaqmo2e"); 346 + assert_eq!(stored_prev.prev_data, vec![1, 2, 3, 4]); 347 + 348 + delete_prev(&db, d.clone()).unwrap(); 349 + let (_, no_prev) = get(&db, d).unwrap().unwrap(); 350 + assert!(no_prev.is_none()); 351 + } 352 + }
+79 -11
src/storage/resync.rs
··· 1 1 //! Timestamp-ordered resync queue. 2 2 //! 3 3 //! Keys: `"repoResyncQueue"\0<ts_be:u64>\0<did>` 4 - //! Values: CBOR payload with the triggering commit, retry count, and retry reason. 4 + //! Values: `[u16 BE retry_count][u16 BE reason_len][reason_bytes][commit_cbor_bytes]` 5 5 6 6 use jacquard_common::types::string::Did; 7 7 8 - use crate::storage::{DbRef, StorageResult}; 8 + use crate::storage::{ 9 + DbRef, 10 + error::{StorageError, StorageResult}, 11 + keys, 12 + }; 9 13 10 14 /// An item waiting in the resync queue. 11 15 #[derive(Debug, Clone)] ··· 17 21 pub commit_cbor: Vec<u8>, 18 22 } 19 23 24 + /// Wire format: `[u16 BE retry_count][u16 BE reason_len][reason_bytes][commit_cbor_bytes]` 25 + fn encode(item: &ResyncItem) -> Vec<u8> { 26 + let reason = item.retry_reason.as_bytes(); 27 + let mut v = Vec::with_capacity(2 + 2 + reason.len() + item.commit_cbor.len()); 28 + v.extend_from_slice(&item.retry_count.to_be_bytes()); 29 + v.extend_from_slice(&(reason.len() as u16).to_be_bytes()); 30 + v.extend_from_slice(reason); 31 + v.extend_from_slice(&item.commit_cbor); 32 + v 33 + } 34 + 35 + fn decode(bytes: &[u8], key: &str, did: Did<'static>) -> StorageResult<ResyncItem> { 36 + if bytes.len() < 4 { 37 + return Err(StorageError::Corrupt { 38 + key: key.to_owned(), 39 + reason: "value too short", 40 + }); 41 + } 42 + let retry_count = u16::from_be_bytes([bytes[0], bytes[1]]); 43 + let reason_len = u16::from_be_bytes([bytes[2], bytes[3]]) as usize; 44 + let rest = &bytes[4..]; 45 + if rest.len() < reason_len { 46 + return Err(StorageError::Corrupt { 47 + key: key.to_owned(), 48 + reason: "reason truncated", 49 + }); 50 + } 51 + let retry_reason = std::str::from_utf8(&rest[..reason_len]) 52 + .map_err(|_| StorageError::Corrupt { 53 + key: key.to_owned(), 54 + reason: "reason not UTF-8", 55 + })? 56 + .to_owned(); 57 + let commit_cbor = rest[reason_len..].to_vec(); 58 + Ok(ResyncItem { 59 + did, 60 + retry_count, 61 + retry_reason, 62 + commit_cbor, 63 + }) 64 + } 65 + 20 66 /// Enqueue a repo for resync at the given Unix timestamp (seconds). 21 - pub fn enqueue(_db: &DbRef, _ts: u64, _item: &ResyncItem) -> StorageResult<()> { 22 - todo!("serialize ResyncItem to CBOR and insert into resync partition") 67 + pub fn enqueue(db: &DbRef, ts: u64, item: &ResyncItem) -> StorageResult<()> { 68 + let key = keys::resync_queue(ts, item.did.clone()); 69 + db.ks.insert(key, encode(item))?; 70 + Ok(()) 23 71 } 24 72 25 73 /// Dequeue and return the next item whose timestamp is ≤ `now`. 26 74 /// 27 75 /// Removes the entry from the queue atomically before returning it. 28 - pub fn dequeue_ready(_db: &DbRef, _now: u64) -> StorageResult<Option<ResyncItem>> { 29 - todo!("scan resync partition up to `now`, remove and return the first entry") 30 - } 31 - 32 - /// Remove all queue entries for `did` (e.g., after a successful resync). 33 - pub fn remove_did(_db: &DbRef, _did: Did<'_>) -> StorageResult<()> { 34 - todo!("scan resync partition and remove all entries matching did") 76 + /// 77 + /// TODO: no, this is not atomic currently 78 + /// 79 + /// note: deleted accounts aren't removed from the resync queue so we need to 80 + /// check that (or does the caller deal with it?) 81 + /// 82 + /// TODO: we actually want to pass in an optional cursor so we can efficiently 83 + /// skip over tombstones. we don't have to persist the cursor to disk, but the 84 + /// caller can hold it in memory over the app's lifetime so we only pay the tomb 85 + /// scan cost once on startup. 86 + pub fn dequeue_ready(db: &DbRef, now: u64) -> StorageResult<Option<ResyncItem>> { 87 + let prefix = keys::resync_queue_prefix_all(); 88 + for guard in db.ks.prefix(&prefix) { 89 + let (key_slice, val_slice) = guard.into_inner()?; 90 + let key_bytes: &[u8] = key_slice.as_ref(); 91 + let Some((ts, did)) = keys::resync_queue_parse(key_bytes) else { 92 + continue; 93 + }; 94 + if ts > now { 95 + break; // queue is ordered; no earlier entries remain 96 + } 97 + let key_str = String::from_utf8_lossy(key_bytes).into_owned(); 98 + let item = decode(val_slice.as_ref(), &key_str, did)?; 99 + db.ks.remove(key_bytes)?; 100 + return Ok(Some(item)); 101 + } 102 + Ok(None) 35 103 }