very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[lib,api] implement getRepoStatus and listRepos

dawn a85020df 3942ea62

+171 -12
+2
README.md
··· 312 312 - `com.atproto.repo.listRecords` 313 313 - `com.atproto.sync.getHostStatus` 314 314 - `com.atproto.sync.listHosts` 315 + - `com.atproto.sync.getRepoStatus` 316 + - `com.atproto.sync.listRepos` 315 317 316 318 ### systems.gaze.hydrant.* 317 319
+68
src/api/xrpc/get_repo_status.rs
··· 1 + use jacquard_api::com_atproto::sync::{ 2 + get_repo_status::{ 3 + GetRepoStatusError, GetRepoStatusOutput, GetRepoStatusOutputStatus, GetRepoStatusRequest, 4 + GetRepoStatusResponse, 5 + }, 6 + list_repos::RepoStatus as ApiRepoStatus, 7 + }; 8 + 9 + use crate::types::RepoStatus; 10 + 11 + use super::*; 12 + 13 + pub async fn handle( 14 + State(hydrant): State<Hydrant>, 15 + ExtractXrpc(req): ExtractXrpc<GetRepoStatusRequest>, 16 + ) -> XrpcResult<Json<GetRepoStatusOutput<'static>>, GetRepoStatusError<'static>> { 17 + let nsid = GetRepoStatusResponse::NSID; 18 + 19 + let Some(state) = hydrant 20 + .repos 21 + .get(&req.did) 22 + .state() 23 + .await 24 + .map_err(|e| internal_error(nsid, e))? 25 + else { 26 + return Err(XrpcErrorResponse { 27 + status: StatusCode::NOT_FOUND, 28 + error: XrpcError::Xrpc(GetRepoStatusError::RepoNotFound(None)), 29 + }); 30 + }; 31 + 32 + let (active, status) = repo_status_to_api(state.status); 33 + 34 + // rev is only meaningful when the repo is active and has been synced at least once 35 + let rev = active.then(|| state.rev.map(|r| r.to_tid())).flatten(); 36 + 37 + Ok(Json(GetRepoStatusOutput { 38 + active, 39 + did: req.did, 40 + rev, 41 + status: status.map(|s| match s { 42 + ApiRepoStatus::Takendown => GetRepoStatusOutputStatus::Takendown, 43 + ApiRepoStatus::Suspended => GetRepoStatusOutputStatus::Suspended, 44 + ApiRepoStatus::Deleted => GetRepoStatusOutputStatus::Deleted, 45 + ApiRepoStatus::Deactivated => GetRepoStatusOutputStatus::Deactivated, 46 + ApiRepoStatus::Desynchronized => GetRepoStatusOutputStatus::Desynchronized, 47 + ApiRepoStatus::Throttled => GetRepoStatusOutputStatus::Throttled, 48 + ApiRepoStatus::Other(v) => GetRepoStatusOutputStatus::Other(v), 49 + }), 50 + extra_data: None, 51 + })) 52 + } 53 + 54 + pub(super) fn repo_status_to_api(status: RepoStatus) -> (bool, Option<ApiRepoStatus<'static>>) { 55 + match status { 56 + RepoStatus::Synced => (true, None), 57 + RepoStatus::Deactivated => (false, Some(ApiRepoStatus::Deactivated)), 58 + RepoStatus::Takendown => (false, Some(ApiRepoStatus::Takendown)), 59 + RepoStatus::Suspended => (false, Some(ApiRepoStatus::Suspended)), 60 + // we lost sync with this repo! report desynchronized 61 + // technicalllyyyy backfilling can mean the repo is active 62 + // because we are syncing it from the pds, but like also it is currently 63 + // desync'ed so... 64 + RepoStatus::Backfilling | RepoStatus::Error(_) => { 65 + (false, Some(ApiRepoStatus::Desynchronized)) 66 + } 67 + } 68 + }
+68
src/api/xrpc/list_repos.rs
··· 1 + use std::str::FromStr; 2 + 3 + use jacquard_api::com_atproto::sync::list_repos::{ 4 + ListReposOutput, ListReposRequest, ListReposResponse, Repo, 5 + }; 6 + use jacquard_common::CowStr; 7 + use jacquard_common::types::cid::Cid; 8 + use jacquard_common::types::string::Did; 9 + use smol_str::ToSmolStr; 10 + 11 + use crate::api::xrpc::get_repo_status::repo_status_to_api; 12 + 13 + use super::*; 14 + 15 + pub async fn handle( 16 + State(hydrant): State<Hydrant>, 17 + ExtractXrpc(req): ExtractXrpc<ListReposRequest>, 18 + ) -> XrpcResult<Json<ListReposOutput<'static>>> { 19 + let nsid = ListReposResponse::NSID; 20 + let limit = req.limit.unwrap_or(500).clamp(1, 1000) as usize; 21 + 22 + let cursor = req 23 + .cursor 24 + .as_deref() 25 + .map(Did::from_str) 26 + .transpose() 27 + .map_err(|e| bad_request(nsid, e))?; 28 + 29 + let (repos, next_cursor) = tokio::task::spawn_blocking(move || { 30 + let mut repos: Vec<Repo<'static>> = Vec::new(); 31 + let mut next_cursor: Option<Did<'static>> = None; 32 + 33 + for item in hydrant.repos.iter_states(cursor.as_ref()) { 34 + let (did, state) = item?; 35 + 36 + // skip repos that haven't been synced at least once 37 + let (Some(data), Some(rev_db)) = (state.data, state.rev) else { 38 + continue; 39 + }; 40 + 41 + let (active, status) = repo_status_to_api(state.status); 42 + repos.push(Repo { 43 + active: Some(active), 44 + did: did.clone(), 45 + head: Cid::from(data), 46 + rev: rev_db.to_tid(), 47 + status, 48 + extra_data: None, 49 + }); 50 + 51 + if repos.len() >= limit { 52 + next_cursor = Some(did); 53 + break; 54 + } 55 + } 56 + 57 + Ok::<_, miette::Report>((repos, next_cursor)) 58 + }) 59 + .await 60 + .map_err(|e| internal_error(nsid, e))? 61 + .map_err(|e| internal_error(nsid, e))?; 62 + 63 + Ok(Json(ListReposOutput { 64 + cursor: next_cursor.map(|d| CowStr::Owned(d.as_str().to_smolstr())), 65 + repos, 66 + extra_data: None, 67 + })) 68 + }
+6
src/api/xrpc/mod.rs
··· 10 10 list_records::{ListRecordsOutput, ListRecordsRequest, Record as RepoRecord}, 11 11 }; 12 12 use jacquard_api::com_atproto::sync::get_host_status::GetHostStatusRequest; 13 + use jacquard_api::com_atproto::sync::get_repo_status::GetRepoStatusRequest; 13 14 use jacquard_api::com_atproto::sync::list_hosts::ListHostsRequest; 15 + use jacquard_api::com_atproto::sync::list_repos::ListReposRequest; 14 16 use jacquard_common::types::ident::AtIdentifier; 15 17 use jacquard_common::xrpc::XrpcResp; 16 18 use jacquard_common::xrpc::{XrpcEndpoint, XrpcMethod}; ··· 27 29 mod describe_repo; 28 30 mod get_host_status; 29 31 mod get_record; 32 + mod get_repo_status; 30 33 mod list_hosts; 31 34 mod list_records; 35 + mod list_repos; 32 36 33 37 pub fn router() -> Router<Hydrant> { 34 38 Router::new() ··· 38 42 .route(DescribeRepo::PATH, get(describe_repo::handle)) 39 43 .route(GetHostStatusRequest::PATH, get(get_host_status::handle)) 40 44 .route(ListHostsRequest::PATH, get(list_hosts::handle)) 45 + .route(GetRepoStatusRequest::PATH, get(get_repo_status::handle)) 46 + .route(ListReposRequest::PATH, get(list_repos::handle)) 41 47 } 42 48 43 49 #[derive(Debug)]
+27 -12
src/control/repos.rs
··· 72 72 pub struct ReposControl(pub(super) Arc<AppState>); 73 73 74 74 impl ReposControl { 75 - /// iterates through all repositories, returning their state. 76 - pub fn iter(&self, cursor: Option<&Did<'_>>) -> impl Iterator<Item = Result<RepoInfo>> { 75 + pub(crate) fn iter_states( 76 + &self, 77 + cursor: Option<&Did<'_>>, 78 + ) -> impl Iterator<Item = Result<(Did<'static>, RepoState<'static>)>> { 77 79 let start_bound = if let Some(cursor) = cursor { 78 80 let did_key = keys::repo_key(cursor); 79 81 std::ops::Bound::Excluded(did_key) ··· 87 89 .range((start_bound, std::ops::Bound::Unbounded)) 88 90 .map(|g| { 89 91 let (k, v) = g.into_inner().into_diagnostic()?; 90 - let repo_state = crate::db::deser_repo_state(&v)?; 92 + let repo_state = crate::db::deser_repo_state(&v)?.into_static(); 91 93 let did = TrimmedDid::try_from(k.as_ref())?.to_did(); 92 - Ok(repo_state_to_info(did, repo_state)) 94 + Ok((did, repo_state)) 93 95 }) 96 + } 97 + 98 + /// iterates through all repositories, returning their state. 99 + pub fn iter(&self, cursor: Option<&Did<'_>>) -> impl Iterator<Item = Result<RepoInfo>> { 100 + self.iter_states(cursor) 101 + .map(|r| r.map(|(did, s)| repo_state_to_info(did, s))) 94 102 } 95 103 96 104 #[allow(dead_code)] ··· 445 453 } 446 454 447 455 impl<'i> RepoHandle<'i> { 448 - /// fetch the current state of this repository. 449 - /// returns `None` if hydrant has never seen this repository. 450 - pub async fn info(&self) -> Result<Option<RepoInfo>> { 456 + pub(crate) async fn state(&self) -> Result<Option<RepoState<'static>>> { 451 457 let did_key = keys::repo_key(&self.did); 452 - let state = self.state.clone(); 453 - let did = self.did.clone().into_static(); 458 + let app_state = self.state.clone(); 454 459 455 460 tokio::task::spawn_blocking(move || { 456 - let bytes = state.db.repos.get(&did_key).into_diagnostic()?; 457 - let state = bytes.as_deref().map(db::deser_repo_state).transpose()?; 458 - Ok(state.map(|s| repo_state_to_info(did, s))) 461 + let bytes = app_state.db.repos.get(&did_key).into_diagnostic()?; 462 + bytes 463 + .as_deref() 464 + .map(db::deser_repo_state) 465 + .transpose() 466 + .map(|opt| opt.map(IntoStatic::into_static)) 459 467 }) 460 468 .await 461 469 .into_diagnostic()? 470 + } 471 + 472 + /// fetch the current state of this repository. 473 + /// returns `None` if hydrant has never seen this repository. 474 + pub async fn info(&self) -> Result<Option<RepoInfo>> { 475 + let did = self.did.clone().into_static(); 476 + Ok(self.state().await?.map(|s| repo_state_to_info(did, s))) 462 477 } 463 478 464 479 /// returns the collections of this repository and the number of records it has in each.