lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

split handlers into their own files

phil 979f31a1 5ad3b725

+180 -161
+74
src/server/get_repo_status.rs
··· 1 + //! XRPC handlers. 2 + //! 3 + //! Uses `jacquard-axum`'s `ExtractXrpc` extractor to deserialise query parameters 4 + //! directly into the lexicon-generated request types. 5 + 6 + use axum::{ 7 + Json, 8 + extract::State, 9 + http::StatusCode, 10 + response::{IntoResponse, Response}, 11 + }; 12 + use jacquard_api::com_atproto::sync::get_repo_status::{GetRepoStatusOutput, GetRepoStatusRequest}; 13 + use jacquard_axum::ExtractXrpc; 14 + use serde_json::json; 15 + 16 + use crate::storage::{DbRef, error::StorageError}; 17 + 18 + pub enum GetRepoStatusError { 19 + StorageError, 20 + RepoNotFound, 21 + } 22 + 23 + impl From<StorageError> for GetRepoStatusError { 24 + fn from(e: StorageError) -> Self { 25 + tracing::error!("Storage error: {e:#}"); 26 + Self::StorageError 27 + } 28 + } 29 + 30 + impl IntoResponse for GetRepoStatusError { 31 + fn into_response(self) -> Response { 32 + match self { 33 + GetRepoStatusError::StorageError => ( 34 + StatusCode::INTERNAL_SERVER_ERROR, 35 + Json(json!({ 36 + "error": "InternalError", 37 + "message": "Storage issue", 38 + })), 39 + ), 40 + GetRepoStatusError::RepoNotFound => ( 41 + StatusCode::BAD_REQUEST, 42 + Json(json!({ 43 + "error": "RepoNotFound", 44 + "message": "the requested repo does not exist or has not been indexed.", 45 + })), 46 + ), 47 + } 48 + .into_response() 49 + } 50 + } 51 + 52 + /// Handler for `GET /xrpc/com.atproto.sync.getRepoStatus`. 53 + /// 54 + /// Returns the active/status of the given repo. Returns 404 (XRPC RepoNotFound) 55 + /// if the DID has never been indexed. 56 + /// 57 + /// `active` reflects whether the account is usable. `status` carries the 58 + /// reason when inactive ("takendown", "suspended", "deactivated", "deleted"). 59 + /// `rev` is currently omitted (TODO: store latest rev in RepoRecord). 60 + pub async fn get_repo_status( 61 + State(db): State<DbRef>, 62 + ExtractXrpc(req): ExtractXrpc<GetRepoStatusRequest>, 63 + ) -> Result<Json<GetRepoStatusOutput<'static>>, GetRepoStatusError> { 64 + let (info, prev) = 65 + crate::storage::repo::get(&db, req.did.clone())?.ok_or(GetRepoStatusError::RepoNotFound)?; 66 + 67 + Ok(Json(GetRepoStatusOutput { 68 + did: req.did, 69 + rev: prev.map(|p| p.rev), 70 + active: info.status.is_active(), 71 + status: info.status.status().map(|s| s.to_owned().into()), 72 + extra_data: None, 73 + })) 74 + }
-157
src/server/handler.rs
··· 1 - //! XRPC handlers. 2 - //! 3 - //! Uses `jacquard-axum`'s `ExtractXrpc` extractor to deserialise query parameters 4 - //! directly into the lexicon-generated request types. 5 - //! 6 - //! TODO: xrpc-style error handling 7 - 8 - use axum::{ 9 - Json, 10 - extract::State, 11 - http::StatusCode, 12 - response::{IntoResponse, Response}, 13 - }; 14 - use jacquard_api::com_atproto::sync::{ 15 - get_repo_status::{GetRepoStatusOutput, GetRepoStatusRequest}, 16 - list_repos_by_collection::{ListReposByCollectionOutput, ListReposByCollectionRequest, Repo}, 17 - }; 18 - use jacquard_axum::ExtractXrpc; 19 - use jacquard_common::types::string::Did; 20 - use serde_json::json; 21 - 22 - use crate::storage::{DbRef, error::StorageError}; 23 - 24 - pub enum ListReposByCollectionError { 25 - BadCursor, 26 - StorageError, 27 - } 28 - 29 - impl From<StorageError> for ListReposByCollectionError { 30 - fn from(e: StorageError) -> Self { 31 - tracing::error!("Storage error: {e:#}"); 32 - Self::StorageError 33 - } 34 - } 35 - 36 - impl IntoResponse for ListReposByCollectionError { 37 - fn into_response(self) -> Response { 38 - match self { 39 - ListReposByCollectionError::BadCursor => ( 40 - StatusCode::BAD_REQUEST, 41 - Json(json!({ 42 - "error": "InvalidRequest", 43 - "message": "the provided cursor was not valid", 44 - })), 45 - ), 46 - ListReposByCollectionError::StorageError => ( 47 - StatusCode::INTERNAL_SERVER_ERROR, 48 - Json(json!({ 49 - "error": "InternalError", 50 - "message": "Storage issue", 51 - })), 52 - ), 53 - } 54 - .into_response() 55 - } 56 - } 57 - 58 - /// Handler for `GET /xrpc/com.atproto.sync.listReposByCollection`. 59 - /// 60 - /// Performs a cursor-paginated prefix scan over the rbc keyspace, returning 61 - /// up to `limit` DIDs that have at least one record in `collection`. 62 - /// 63 - /// The cursor is the last DID from the previous page. On each request we 64 - /// scan for `limit + 1` results: if the extra result appears there is a next 65 - /// page, and we return the last DID of the current page as the next cursor. 66 - /// 67 - /// the `limit` parameter is clamped at 10,000 instead of 2,000 as defined in 68 - /// the lexicon, because bluesky's own collectiondir only clamps at 10k. 69 - pub async fn list_repos_by_collection( 70 - State(db): State<DbRef>, 71 - ExtractXrpc(req): ExtractXrpc<ListReposByCollectionRequest>, 72 - ) -> Result<Json<ListReposByCollectionOutput<'static>>, ListReposByCollectionError> { 73 - let limit = req.limit.unwrap_or(500).clamp(1, 10_000) as usize; 74 - 75 - // Parse the cursor as a DID, if one was provided. 76 - let cursor = req 77 - .cursor 78 - .map(Did::new_owned) 79 - .transpose() 80 - .map_err(|_| ListReposByCollectionError::BadCursor)?; 81 - 82 - let (dids, next) = crate::storage::list_by::scan_rbc(&db, req.collection, cursor, limit)?; 83 - 84 - let repos = dids 85 - .into_iter() 86 - .map(|did| Repo { 87 - did, 88 - extra_data: None, 89 - }) 90 - .collect(); 91 - 92 - let next_cursor = next.map(|cursor| cursor.into()); 93 - 94 - Ok(Json(ListReposByCollectionOutput { 95 - cursor: next_cursor, 96 - repos, 97 - extra_data: None, 98 - })) 99 - } 100 - 101 - pub enum GetRepoStatusError { 102 - StorageError, 103 - RepoNotFound, 104 - } 105 - 106 - impl From<StorageError> for GetRepoStatusError { 107 - fn from(e: StorageError) -> Self { 108 - tracing::error!("Storage error: {e:#}"); 109 - Self::StorageError 110 - } 111 - } 112 - 113 - impl IntoResponse for GetRepoStatusError { 114 - fn into_response(self) -> Response { 115 - match self { 116 - GetRepoStatusError::StorageError => ( 117 - StatusCode::INTERNAL_SERVER_ERROR, 118 - Json(json!({ 119 - "error": "InternalError", 120 - "message": "Storage issue", 121 - })), 122 - ), 123 - GetRepoStatusError::RepoNotFound => ( 124 - StatusCode::BAD_REQUEST, 125 - Json(json!({ 126 - "error": "RepoNotFound", 127 - "message": "the requested repo does not exist or has not been indexed.", 128 - })), 129 - ), 130 - } 131 - .into_response() 132 - } 133 - } 134 - 135 - /// Handler for `GET /xrpc/com.atproto.sync.getRepoStatus`. 136 - /// 137 - /// Returns the active/status of the given repo. Returns 404 (XRPC RepoNotFound) 138 - /// if the DID has never been indexed. 139 - /// 140 - /// `active` reflects whether the account is usable. `status` carries the 141 - /// reason when inactive ("takendown", "suspended", "deactivated", "deleted"). 142 - /// `rev` is currently omitted (TODO: store latest rev in RepoRecord). 143 - pub async fn get_repo_status( 144 - State(db): State<DbRef>, 145 - ExtractXrpc(req): ExtractXrpc<GetRepoStatusRequest>, 146 - ) -> Result<Json<GetRepoStatusOutput<'static>>, GetRepoStatusError> { 147 - let (info, prev) = 148 - crate::storage::repo::get(&db, req.did.clone())?.ok_or(GetRepoStatusError::RepoNotFound)?; 149 - 150 - Ok(Json(GetRepoStatusOutput { 151 - did: req.did, 152 - rev: prev.map(|p| p.rev), 153 - active: info.status.is_active(), 154 - status: info.status.status().map(|s| s.to_owned().into()), 155 - extra_data: None, 156 - })) 157 - }
+96
src/server/list_repos_by_collection.rs
··· 1 + //! XRPC handlers. 2 + //! 3 + //! Uses `jacquard-axum`'s `ExtractXrpc` extractor to deserialise query parameters 4 + //! directly into the lexicon-generated request types. 5 + 6 + use axum::{ 7 + Json, 8 + extract::State, 9 + http::StatusCode, 10 + response::{IntoResponse, Response}, 11 + }; 12 + use jacquard_api::com_atproto::sync::list_repos_by_collection::{ 13 + ListReposByCollectionOutput, ListReposByCollectionRequest, Repo, 14 + }; 15 + use jacquard_axum::ExtractXrpc; 16 + use jacquard_common::types::string::Did; 17 + use serde_json::json; 18 + 19 + use crate::storage::{DbRef, error::StorageError}; 20 + 21 + pub enum ListReposByCollectionError { 22 + BadCursor, 23 + StorageError, 24 + } 25 + 26 + impl From<StorageError> for ListReposByCollectionError { 27 + fn from(e: StorageError) -> Self { 28 + tracing::error!("Storage error: {e:#}"); 29 + Self::StorageError 30 + } 31 + } 32 + 33 + impl IntoResponse for ListReposByCollectionError { 34 + fn into_response(self) -> Response { 35 + match self { 36 + ListReposByCollectionError::BadCursor => ( 37 + StatusCode::BAD_REQUEST, 38 + Json(json!({ 39 + "error": "InvalidRequest", 40 + "message": "the provided cursor was not valid", 41 + })), 42 + ), 43 + ListReposByCollectionError::StorageError => ( 44 + StatusCode::INTERNAL_SERVER_ERROR, 45 + Json(json!({ 46 + "error": "InternalError", 47 + "message": "Storage issue", 48 + })), 49 + ), 50 + } 51 + .into_response() 52 + } 53 + } 54 + 55 + /// Handler for `GET /xrpc/com.atproto.sync.listReposByCollection`. 56 + /// 57 + /// Performs a cursor-paginated prefix scan over the rbc keyspace, returning 58 + /// up to `limit` DIDs that have at least one record in `collection`. 59 + /// 60 + /// The cursor is the last DID from the previous page. On each request we 61 + /// scan for `limit + 1` results: if the extra result appears there is a next 62 + /// page, and we return the last DID of the current page as the next cursor. 63 + /// 64 + /// the `limit` parameter is clamped at 10,000 instead of 2,000 as defined in 65 + /// the lexicon, because bluesky's own collectiondir only clamps at 10k. 66 + pub async fn list_repos_by_collection( 67 + State(db): State<DbRef>, 68 + ExtractXrpc(req): ExtractXrpc<ListReposByCollectionRequest>, 69 + ) -> Result<Json<ListReposByCollectionOutput<'static>>, ListReposByCollectionError> { 70 + let limit = req.limit.unwrap_or(500).clamp(1, 10_000) as usize; 71 + 72 + // Parse the cursor as a DID, if one was provided. 73 + let cursor = req 74 + .cursor 75 + .map(Did::new_owned) 76 + .transpose() 77 + .map_err(|_| ListReposByCollectionError::BadCursor)?; 78 + 79 + let (dids, next) = crate::storage::list_by::scan_rbc(&db, req.collection, cursor, limit)?; 80 + 81 + let repos = dids 82 + .into_iter() 83 + .map(|did| Repo { 84 + did, 85 + extra_data: None, 86 + }) 87 + .collect(); 88 + 89 + let next_cursor = next.map(|cursor| cursor.into()); 90 + 91 + Ok(Json(ListReposByCollectionOutput { 92 + cursor: next_cursor, 93 + repos, 94 + extra_data: None, 95 + })) 96 + }
+10 -4
src/server/mod.rs
··· 3 3 //! Serves XRPC endpoints via axum routers built with `jacquard-axum`'s 4 4 //! `IntoRouter` helper. 5 5 6 - mod handler; 6 + mod get_repo_status; 7 + mod list_repos_by_collection; 8 + 9 + use get_repo_status::get_repo_status; 10 + use list_repos_by_collection::list_repos_by_collection; 7 11 8 12 use std::net::SocketAddr; 9 13 ··· 18 22 /// Build and serve the axum application on `addr`. 19 23 /// 20 24 /// Routes: 21 - /// GET /xrpc/com.atproto.sync.listReposByCollection 22 25 /// GET /xrpc/com.atproto.sync.getRepoStatus 26 + /// GET /xrpc/com.atproto.sync.listReposByCollection 23 27 pub async fn serve(addr: SocketAddr, db: DbRef) -> Result<()> { 24 - let app = ListReposByCollectionRequest::into_router(handler::list_repos_by_collection) 25 - .merge(GetRepoStatusRequest::into_router(handler::get_repo_status)) 28 + let app = GetRepoStatusRequest::into_router(get_repo_status) 29 + .merge(ListReposByCollectionRequest::into_router( 30 + list_repos_by_collection, 31 + )) 26 32 .with_state(db); 27 33 28 34 let listener = tokio::net::TcpListener::bind(addr).await?;