lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

filter out inactive accounts

phil 2d1da340 f6913a96

+162 -8
+8 -7
hacking.md
··· 34 34 - [x] #account: update account state 35 35 - [x] #commit and #sync 36 36 - [x] make sure blocking db calls are in `spawn_blocking`!! 37 - - [~] db queries 37 + - [x] db queries 38 38 - [~] configuration 39 39 - [~] copy applicable from tap 40 40 - [ ] copy applicable from collectiondir 41 - - [~] sync1.1!!! 41 + - [x] sync1.1!!! 42 42 - [x] verify #commit event 43 - - [ ] verify #sync event 44 - - [ ] inductive proof for #commits 45 - - [ ] actually firehose-index!! 46 - - [ ] extract collections-added/removed directly from CAR slice 43 + - [x] verify #sync event 44 + - [x] inductive proof for #commits 45 + - [~] actually firehose-index!! 46 + - [x] extract collections-added/removed directly from CAR slice 47 47 - [ ] (spend some time on tests here) 48 - - [ ] do the thing (write them to the db) 48 + - [x] do the thing (write them to the db) 49 + - [ ] swap in repo-stream 49 50 - [ ] lenient sync1.1 50 51 - [ ] *don't* allow non-validating commits that look like sync1.1 51 52 - [ ] rachet by PDS host: be lenient if we have never seen a sync1.1-looking commit, always strict after we see one.
+94
src/server/list_repos.rs
··· 1 + //! XRPC handlers. 2 + //! 3 + //! Uses `jacquard-axum`'s `ExtractXrpc` extractor to deserialise query parameters 4 + //! directly into the lexicon-generated request types. 5 + 6 + use axum::{ 7 + Json, 8 + extract::State, 9 + http::StatusCode, 10 + response::{IntoResponse, Response}, 11 + }; 12 + use jacquard_api::com_atproto::sync::list_repos::{ListReposOutput, ListReposRequest, Repo}; 13 + use jacquard_axum::ExtractXrpc; 14 + use jacquard_common::types::string::Did; 15 + use serde_json::json; 16 + 17 + use crate::storage::{DbRef, error::StorageError}; 18 + 19 + pub enum ListReposError { 20 + BadCursor, 21 + StorageError, 22 + } 23 + 24 + impl From<StorageError> for ListReposError { 25 + fn from(e: StorageError) -> Self { 26 + tracing::error!("Storage error: {e:#}"); 27 + Self::StorageError 28 + } 29 + } 30 + 31 + impl IntoResponse for ListReposError { 32 + fn into_response(self) -> Response { 33 + match self { 34 + ListReposError::BadCursor => ( 35 + StatusCode::BAD_REQUEST, 36 + Json(json!({ 37 + "error": "InvalidRequest", 38 + "message": "the provided cursor was not valid", 39 + })), 40 + ), 41 + ListReposError::StorageError => ( 42 + StatusCode::INTERNAL_SERVER_ERROR, 43 + Json(json!({ 44 + "error": "InternalError", 45 + "message": "Storage issue", 46 + })), 47 + ), 48 + } 49 + .into_response() 50 + } 51 + } 52 + 53 + /// Handler for `GET /xrpc/com.atproto.sync.listReposByCollection`. 54 + /// 55 + /// Performs a cursor-paginated prefix scan over the rbc keyspace, returning 56 + /// up to `limit` DIDs that have at least one record in `collection`. 57 + /// 58 + /// The cursor is the last DID from the previous page. On each request we 59 + /// scan for `limit + 1` results: if the extra result appears there is a next 60 + /// page, and we return the last DID of the current page as the next cursor. 61 + /// 62 + /// the `limit` parameter is clamped at 10,000 instead of 2,000 as defined in 63 + /// the lexicon, because bluesky's own collectiondir only clamps at 10k. 64 + pub async fn list_repos( 65 + State(db): State<DbRef>, 66 + ExtractXrpc(req): ExtractXrpc<ListReposRequest>, 67 + ) -> Result<Json<ListReposOutput<'static>>, ListReposError> { 68 + let limit = req.limit.unwrap_or(500).clamp(1, 10_000) as usize; 69 + 70 + // Parse the cursor as a DID, if one was provided. 71 + let cursor = req 72 + .cursor 73 + .map(Did::new_owned) 74 + .transpose() 75 + .map_err(|_| ListReposError::BadCursor)?; 76 + 77 + let (dids, next) = crate::storage::collection_index::scan_rbc_active(&db, None, cursor, limit)?; 78 + 79 + let repos = dids 80 + .into_iter() 81 + .map(|did| Repo { 82 + did, 83 + extra_data: None, 84 + }) 85 + .collect(); 86 + 87 + let next_cursor = next.map(|cursor| cursor.into()); 88 + 89 + Ok(Json(ListReposOutput { 90 + cursor: next_cursor, 91 + repos, 92 + extra_data: None, 93 + })) 94 + }
+1 -1
src/server/list_repos_by_collection.rs
··· 77 77 .map_err(|_| ListReposByCollectionError::BadCursor)?; 78 78 79 79 let (dids, next) = 80 - crate::storage::collection_index::scan_rbc(&db, req.collection, cursor, limit)?; 80 + crate::storage::collection_index::scan_rbc_active(&db, req.collection, cursor, limit)?; 81 81 82 82 let repos = dids 83 83 .into_iter()
+1
src/server/mod.rs
··· 5 5 6 6 mod get_repo_status; 7 7 mod hello; 8 + // mod list_repos; 8 9 mod list_repos_by_collection; 9 10 10 11 use get_repo_status::get_repo_status;
+44
src/storage/collection_index.rs
··· 212 212 Ok(db.ks.get(cbr(did, collection))?.is_some()) 213 213 } 214 214 215 + /// Like [`scan_rbc`] but skips DIDs whose [`AccountStatus`] is not `Active`. 216 + /// 217 + /// Each DID in the rbc scan receives a point-read against the repo info key to 218 + /// check its status before being included in the result. The returned cursor is 219 + /// the first DID of the next page (inclusive), so inactive DIDs interspersed 220 + /// with subsequent pages are filtered on the next call. 221 + pub fn scan_rbc_active( 222 + db: &DbRef, 223 + collection: Nsid<'_>, 224 + cursor: Option<Did<'_>>, 225 + limit: usize, 226 + ) -> StorageResult<(Vec<Did<'static>>, Option<Did<'static>>)> { 227 + use crate::storage::repo::{AccountStatus, get_status}; 228 + 229 + let prefix = rbc_prefix(collection.clone()); 230 + let prefix_len = prefix.len(); 231 + let lower_did = cursor.map(rbc_suffix).unwrap_or_default(); 232 + let mut ranger = db.ks.range(prefixed_range(prefix.clone(), lower_did..)); 233 + 234 + let mut dids = Vec::with_capacity(limit); 235 + for guard in ranger.by_ref() { 236 + let (k, _v) = guard.into_inner()?; 237 + let did = rbc_parse_did(&k, prefix_len)?; 238 + 239 + if !matches!(get_status(db, did.clone())?, Some(AccountStatus::Active)) { 240 + continue; 241 + } 242 + 243 + dids.push(did); 244 + if dids.len() >= limit { 245 + break; 246 + } 247 + } 248 + 249 + let next = if let Some(guard) = ranger.next() { 250 + let key = guard.key()?; 251 + Some(rbc_parse_did(&key, prefix_len)?) 252 + } else { 253 + None 254 + }; 255 + 256 + Ok((dids, next)) 257 + } 258 + 215 259 /// Insert a `(collection, did)` pair into both the rbc and cbr indexes. 216 260 pub fn insert(db: &DbRef, did: Did<'_>, collection: Nsid<'_>) -> StorageResult<()> { 217 261 let mut batch = db.database.batch();
+14
src/storage/repo.rs
··· 222 222 Ok(RepoPrev { rev, prev_data }) 223 223 } 224 224 225 + /// Read only the [`AccountStatus`] for `did` — cheaper than [`get`] because it 226 + /// skips the `RepoPrev` read and doesn't take a snapshot. 227 + /// 228 + /// Returns `None` if the repo is not indexed. 229 + pub fn get_status(db: &DbRef, did: Did<'_>) -> StorageResult<Option<AccountStatus>> { 230 + let k = key(did); 231 + let Some(bytes) = db.ks.get(&k)? else { 232 + return Ok(None); 233 + }; 234 + let key_str = String::from_utf8_lossy(&k); 235 + let info = decode_repo_info(&bytes, &key_str)?; 236 + Ok(Some(info.status)) 237 + } 238 + 225 239 /// Insert a [`RepoInfo`] with `state = Pending` for `did` if no record exists. 226 240 /// 227 241 /// Returns `true` if a new record was inserted, `false` if one already existed.