lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

endpoints fixup

phil 9e2afd33 2d1da340

+152 -24
+11 -3
src/server/get_repo_status.rs
··· 11 11 }; 12 12 use jacquard_api::com_atproto::sync::get_repo_status::{GetRepoStatusOutput, GetRepoStatusRequest}; 13 13 use jacquard_axum::ExtractXrpc; 14 + use jacquard_common::IntoStatic; 14 15 use serde_json::json; 15 16 16 17 use crate::storage::{DbRef, error::StorageError}; ··· 61 62 State(db): State<DbRef>, 62 63 ExtractXrpc(req): ExtractXrpc<GetRepoStatusRequest>, 63 64 ) -> Result<Json<GetRepoStatusOutput<'static>>, GetRepoStatusError> { 64 - let (info, prev) = 65 - crate::storage::repo::get(&db, req.did.clone())?.ok_or(GetRepoStatusError::RepoNotFound)?; 65 + let did = req.did.into_static(); 66 + let (info, prev) = tokio::task::spawn_blocking({ 67 + let db = db.clone(); 68 + let did = did.clone(); 69 + move || crate::storage::repo::get(&db, did) 70 + }) 71 + .await 72 + .map_err(|_| GetRepoStatusError::StorageError)?? 73 + .ok_or(GetRepoStatusError::RepoNotFound)?; 66 74 67 75 Ok(Json(GetRepoStatusOutput { 68 - did: req.did, 76 + did, 69 77 rev: prev.map(|p| p.rev), 70 78 active: info.status.is_active(), 71 79 status: info.status.status().map(|s| s.to_owned().into()),
+48 -17
src/server/list_repos.rs
··· 11 11 }; 12 12 use jacquard_api::com_atproto::sync::list_repos::{ListReposOutput, ListReposRequest, Repo}; 13 13 use jacquard_axum::ExtractXrpc; 14 - use jacquard_common::types::string::Did; 14 + use jacquard_common::CowStr; 15 + use jacquard_common::types::string::{Cid as JCid, Did}; 15 16 use serde_json::json; 16 17 17 18 use crate::storage::{DbRef, error::StorageError}; ··· 50 51 } 51 52 } 52 53 53 - /// Handler for `GET /xrpc/com.atproto.sync.listReposByCollection`. 54 + /// Handler for `GET /xrpc/com.atproto.sync.listRepos`. 54 55 /// 55 - /// Performs a cursor-paginated prefix scan over the rbc keyspace, returning 56 - /// up to `limit` DIDs that have at least one record in `collection`. 56 + /// Performs a cursor-paginated scan over the repo keyspace, returning up to 57 + /// `limit` repos with their current `head` CID, `rev` TID, and account status. 57 58 /// 58 - /// The cursor is the last DID from the previous page. On each request we 59 - /// scan for `limit + 1` results: if the extra result appears there is a next 60 - /// page, and we return the last DID of the current page as the next cursor. 59 + /// The cursor is the first DID of the next page (inclusive). On each request we 60 + /// scan for `limit` repos; if the scan yields a next DID, it is returned as 61 + /// the cursor for the following page. 61 62 /// 62 - /// the `limit` parameter is clamped at 10,000 instead of 2,000 as defined in 63 - /// the lexicon, because bluesky's own collectiondir only clamps at 10k. 63 + /// Repos without a chain tip (not yet synced, so no `head`/`rev` available) 64 + /// are omitted from the response, but still count against the scan window for 65 + /// cursor advancement. 66 + /// 67 + /// The `limit` parameter is clamped at 10,000 instead of 2,000 as defined in 68 + /// the lexicon, because bluesky's own implementation clamps at 10k. 64 69 pub async fn list_repos( 65 70 State(db): State<DbRef>, 66 71 ExtractXrpc(req): ExtractXrpc<ListReposRequest>, 67 72 ) -> Result<Json<ListReposOutput<'static>>, ListReposError> { 68 - let limit = req.limit.unwrap_or(500).clamp(1, 10_000) as usize; 73 + let limit = req.limit.unwrap_or(500).clamp(1, 1000) as usize; 69 74 70 - // Parse the cursor as a DID, if one was provided. 71 75 let cursor = req 72 76 .cursor 73 77 .map(Did::new_owned) 74 78 .transpose() 75 79 .map_err(|_| ListReposError::BadCursor)?; 76 80 77 - let (dids, next) = crate::storage::collection_index::scan_rbc_active(&db, None, cursor, limit)?; 81 + let (entries, next) = tokio::task::spawn_blocking({ 82 + let db = db.clone(); 83 + move || crate::storage::repo::scan_repos(&db, cursor, limit) 84 + }) 85 + .await 86 + .map_err(|_| ListReposError::StorageError)??; 78 87 79 - let repos = dids 88 + let repos: Vec<Repo<'static>> = entries 80 89 .into_iter() 81 - .map(|did| Repo { 82 - did, 83 - extra_data: None, 90 + .filter_map(|(did, info, prev)| { 91 + let prev = prev?; 92 + // Parse the stored MST-root CID bytes back into a typed CID. 93 + // Use the Str variant explicitly: jacquard-common's Cid::Ipld 94 + // delegates serialization to the `cid` crate which emits raw bytes 95 + // in JSON rather than a string. 96 + let head = match JCid::new_owned(&prev.prev_data) { 97 + Ok(c) => JCid::Str(CowStr::copy_from_str(c.as_str())), 98 + Err(e) => { 99 + tracing::warn!( 100 + did = %did.as_str(), 101 + error = %e, 102 + "invalid CID bytes in repo prev; skipping repo in listRepos", 103 + ); 104 + return None; 105 + } 106 + }; 107 + Some(Repo { 108 + active: Some(info.status.is_active()), 109 + did, 110 + head, 111 + rev: prev.rev, 112 + status: info.status.status().map(CowStr::copy_from_str), 113 + extra_data: None, 114 + }) 84 115 }) 85 116 .collect(); 86 117 87 - let next_cursor = next.map(|cursor| cursor.into()); 118 + let next_cursor = next.map(|did| did.into()); 88 119 89 120 Ok(Json(ListReposOutput { 90 121 cursor: next_cursor,
+8 -2
src/server/list_repos_by_collection.rs
··· 13 13 ListReposByCollectionOutput, ListReposByCollectionRequest, Repo, 14 14 }; 15 15 use jacquard_axum::ExtractXrpc; 16 + use jacquard_common::IntoStatic; 16 17 use jacquard_common::types::string::Did; 17 18 use serde_json::json; 18 19 ··· 76 77 .transpose() 77 78 .map_err(|_| ListReposByCollectionError::BadCursor)?; 78 79 79 - let (dids, next) = 80 - crate::storage::collection_index::scan_rbc_active(&db, req.collection, cursor, limit)?; 80 + let collection = req.collection.into_static(); 81 + let (dids, next) = tokio::task::spawn_blocking({ 82 + let db = db.clone(); 83 + move || crate::storage::collection_index::scan_rbc_active(&db, collection, cursor, limit) 84 + }) 85 + .await 86 + .map_err(|_| ListReposByCollectionError::StorageError)??; 81 87 82 88 let repos = dids 83 89 .into_iter()
+6 -2
src/server/mod.rs
··· 5 5 6 6 mod get_repo_status; 7 7 mod hello; 8 - // mod list_repos; 8 + mod list_repos; 9 9 mod list_repos_by_collection; 10 10 11 11 use get_repo_status::get_repo_status; 12 + use list_repos::list_repos; 12 13 use list_repos_by_collection::list_repos_by_collection; 13 14 14 15 use std::net::SocketAddr; 15 16 16 17 use jacquard_api::com_atproto::sync::{ 17 - get_repo_status::GetRepoStatusRequest, list_repos_by_collection::ListReposByCollectionRequest, 18 + get_repo_status::GetRepoStatusRequest, list_repos::ListReposRequest, 19 + list_repos_by_collection::ListReposByCollectionRequest, 18 20 }; 19 21 use jacquard_axum::IntoRouter; 20 22 ··· 25 27 /// 26 28 /// Routes: 27 29 /// GET /xrpc/com.atproto.sync.getRepoStatus 30 + /// GET /xrpc/com.atproto.sync.listRepos 28 31 /// GET /xrpc/com.atproto.sync.listReposByCollection 29 32 pub async fn serve( 30 33 addr: SocketAddr, ··· 32 35 token: tokio_util::sync::CancellationToken, 33 36 ) -> Result<()> { 34 37 let app = GetRepoStatusRequest::into_router(get_repo_status) 38 + .merge(ListReposRequest::into_router(list_repos)) 35 39 .merge(ListReposByCollectionRequest::into_router( 36 40 list_repos_by_collection, 37 41 ))
+79
src/storage/repo.rs
··· 331 331 Ok(()) 332 332 } 333 333 334 + /// Iterate over repos in the `rep` keyspace, starting at `cursor` (inclusive). 335 + /// 336 + /// Returns at most `limit` entries. `next` is the first DID of the next page, 337 + /// suitable for use as the cursor on the next request. 338 + /// 339 + /// All repos are returned regardless of account status or sync state. Repos 340 + /// that have not yet been synced have `None` for the [`RepoPrev`] field. 341 + pub fn scan_repos( 342 + db: &DbRef, 343 + cursor: Option<Did<'_>>, 344 + limit: usize, 345 + ) -> StorageResult<( 346 + Vec<(Did<'static>, RepoInfo, Option<RepoPrev>)>, 347 + Option<Did<'static>>, 348 + )> { 349 + let prefix_len = PREFIX_REPO.len(); 350 + 351 + let start_key: Vec<u8> = { 352 + let mut k = PREFIX_REPO.to_vec(); 353 + if let Some(ref did) = cursor { 354 + k.extend_from_slice(did.as_str().as_bytes()); 355 + } 356 + k 357 + }; 358 + 359 + let mut ranger = db.ks.range(start_key..); 360 + let mut entries = Vec::with_capacity(limit); 361 + 362 + for guard in ranger.by_ref() { 363 + let (k, v) = guard.into_inner()?; 364 + if !k.starts_with(&PREFIX_REPO) { 365 + break; 366 + } 367 + let did_str = std::str::from_utf8(&k[prefix_len..]).map_err(|_| StorageError::Corrupt { 368 + key: String::from_utf8_lossy(&k).to_string(), 369 + reason: "non-UTF-8 DID in repo key", 370 + })?; 371 + let did = Did::new_owned(did_str).map_err(|_| StorageError::Corrupt { 372 + key: String::from_utf8_lossy(&k).to_string(), 373 + reason: "invalid DID in repo key", 374 + })?; 375 + let key_str = String::from_utf8_lossy(&k).into_owned(); 376 + let info = decode_repo_info(&v, &key_str)?; 377 + let pk = prev_key(did.clone()); 378 + let prev = db 379 + .ks 380 + .get(&pk)? 381 + .map(|b| { 382 + let pk_str = String::from_utf8_lossy(&pk).into_owned(); 383 + decode_repo_prev(&b, &pk_str) 384 + }) 385 + .transpose()?; 386 + entries.push((did, info, prev)); 387 + if entries.len() >= limit { 388 + break; 389 + } 390 + } 391 + 392 + let next = loop { 393 + let Some(guard) = ranger.next() else { 394 + break None; 395 + }; 396 + let key = guard.key()?; 397 + if !key.starts_with(&PREFIX_REPO) { 398 + break None; 399 + } 400 + let did_str = match std::str::from_utf8(&key[prefix_len..]) { 401 + Ok(s) => s, 402 + Err(_) => continue, 403 + }; 404 + match Did::new_owned(did_str) { 405 + Ok(did) => break Some(did), 406 + Err(_) => continue, 407 + } 408 + }; 409 + 410 + Ok((entries, next)) 411 + } 412 + 334 413 #[cfg(test)] 335 414 mod tests { 336 415 use super::*;