very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
59
fork

Configure Feed

Select the types of activity you want to include in your feed.

[api] refactor xrpcs into modules

dawn b71d5223 342c24a3

+244 -241
+3 -3
src/api/debug.rs
··· 287 287 let state_clone = state.clone(); 288 288 289 289 tokio::task::spawn_blocking(move || { 290 - let _ = ks.remove(b"dummy_tombstone123"); 291 - let _ = state_clone.db.persist(); 292 - let _ = ks.rotate_memtable_and_wait(); 290 + ks.remove(b"dummy_tombstone123")?; 291 + state_clone.db.inner.persist(fjall::PersistMode::Buffer)?; 292 + ks.rotate_memtable_and_wait()?; 293 293 ks.major_compact() 294 294 }) 295 295 .await
-238
src/api/xrpc.rs
··· 1 - use crate::control::Hydrant; 2 - use axum::extract::FromRequest; 3 - use axum::response::IntoResponse; 4 - use axum::{Json, Router, extract::State, http::StatusCode}; 5 - use jacquard_api::com_atproto::repo::{ 6 - get_record::{GetRecordError, GetRecordOutput, GetRecordRequest}, 7 - list_records::{ListRecordsOutput, ListRecordsRequest, Record as RepoRecord}, 8 - }; 9 - use jacquard_common::types::ident::AtIdentifier; 10 - use jacquard_common::xrpc::{XrpcEndpoint, XrpcMethod}; 11 - use jacquard_common::{IntoStatic, xrpc::XrpcRequest}; 12 - use jacquard_common::{ 13 - types::string::AtUri, 14 - xrpc::{GenericXrpcError, XrpcError}, 15 - }; 16 - use serde::{Deserialize, Serialize}; 17 - use smol_str::ToSmolStr; 18 - use std::fmt::Display; 19 - 20 - pub fn router() -> Router<Hydrant> { 21 - Router::new() 22 - .route( 23 - GetRecordRequest::PATH, 24 - axum::routing::get(handle_get_record), 25 - ) 26 - .route( 27 - ListRecordsRequest::PATH, 28 - axum::routing::get(handle_list_records), 29 - ) 30 - .route(CountRecords::PATH, axum::routing::get(handle_count_records)) 31 - } 32 - 33 - #[derive(Debug)] 34 - pub struct XrpcErrorResponse<E: IntoStatic + std::error::Error = GenericXrpcError> { 35 - pub status: StatusCode, 36 - pub error: XrpcError<E>, 37 - } 38 - 39 - impl<E: Serialize + IntoStatic + std::error::Error> IntoResponse for XrpcErrorResponse<E> { 40 - fn into_response(self) -> axum::response::Response { 41 - (self.status, Json(self.error)).into_response() 42 - } 43 - } 44 - 45 - pub type XrpcResult<T, E = GenericXrpcError> = Result<T, XrpcErrorResponse<E>>; 46 - 47 - pub struct ExtractXrpc<E: XrpcEndpoint>(pub E::Request<'static>); 48 - 49 - impl<S, E> FromRequest<S> for ExtractXrpc<E> 50 - where 51 - S: Send + Sync, 52 - E: XrpcEndpoint, 53 - E::Request<'static>: Send, 54 - for<'de> E::Request<'de>: Deserialize<'de> + IntoStatic<Output = E::Request<'static>>, 55 - { 56 - type Rejection = XrpcErrorResponse<GenericXrpcError>; 57 - 58 - async fn from_request( 59 - req: axum::extract::Request, 60 - _state: &S, 61 - ) -> Result<Self, Self::Rejection> { 62 - let nsid = E::Request::<'static>::NSID; 63 - match E::METHOD { 64 - XrpcMethod::Query => { 65 - let query = req.uri().query().unwrap_or(""); 66 - let res: E::Request<'_> = 67 - serde_urlencoded::from_str(query).map_err(|e| bad_request(nsid, e))?; 68 - Ok(ExtractXrpc(res.into_static())) 69 - } 70 - XrpcMethod::Procedure(_) => { 71 - let body = axum::body::to_bytes(req.into_body(), usize::MAX) 72 - .await 73 - .map_err(|e| internal_error(nsid, e))?; 74 - let res: E::Request<'_> = 75 - serde_json::from_slice(&body).map_err(|e| bad_request(nsid, e))?; 76 - Ok(ExtractXrpc(res.into_static())) 77 - } 78 - } 79 - } 80 - } 81 - 82 - fn internal_error<E: std::error::Error + IntoStatic>( 83 - nsid: &'static str, 84 - message: impl Display, 85 - ) -> XrpcErrorResponse<E> { 86 - XrpcErrorResponse { 87 - status: StatusCode::INTERNAL_SERVER_ERROR, 88 - error: XrpcError::Generic(GenericXrpcError { 89 - error: "InternalError".into(), 90 - message: Some(message.to_smolstr()), 91 - nsid, 92 - method: "GET", 93 - http_status: StatusCode::INTERNAL_SERVER_ERROR, 94 - }), 95 - } 96 - } 97 - 98 - fn bad_request<E: std::error::Error + IntoStatic>( 99 - nsid: &'static str, 100 - message: impl Display, 101 - ) -> XrpcErrorResponse<E> { 102 - XrpcErrorResponse { 103 - status: StatusCode::BAD_REQUEST, 104 - error: XrpcError::Generic(GenericXrpcError { 105 - error: "InvalidRequest".into(), 106 - message: Some(message.to_smolstr()), 107 - nsid, 108 - method: "GET", 109 - http_status: StatusCode::BAD_REQUEST, 110 - }), 111 - } 112 - } 113 - 114 - pub async fn handle_get_record( 115 - State(hydrant): State<Hydrant>, 116 - ExtractXrpc(req): ExtractXrpc<GetRecordRequest>, 117 - ) -> Result<Json<GetRecordOutput<'static>>, XrpcErrorResponse<GetRecordError<'static>>> { 118 - let record = hydrant 119 - .repos 120 - .resolve(&req.repo) 121 - .await 122 - .map_err(|e| internal_error(GetRecordRequest::PATH, e))? 123 - .get_record(&req.collection, &req.rkey.0) 124 - .await 125 - .map_err(|e| internal_error(GetRecordRequest::PATH, e))?; 126 - let Some(record) = record else { 127 - return Err(XrpcErrorResponse { 128 - status: StatusCode::NOT_FOUND, 129 - error: XrpcError::Xrpc(GetRecordError::RecordNotFound(None)), 130 - }); 131 - }; 132 - 133 - Ok(Json(GetRecordOutput { 134 - uri: AtUri::from_parts_owned( 135 - record.did.as_str(), 136 - req.collection.as_str(), 137 - req.rkey.0.as_str(), 138 - ) 139 - .unwrap(), 140 - cid: Some(record.cid), 141 - value: record.value, 142 - extra_data: Default::default(), 143 - })) 144 - } 145 - 146 - pub async fn handle_list_records( 147 - State(hydrant): State<Hydrant>, 148 - ExtractXrpc(req): ExtractXrpc<ListRecordsRequest>, 149 - ) -> Result<Json<ListRecordsOutput<'static>>, XrpcErrorResponse<GenericXrpcError>> { 150 - let limit = req.limit.unwrap_or(50).min(100) as usize; 151 - let reverse = req.reverse.unwrap_or(false); 152 - let cursor = req.cursor.as_deref(); 153 - 154 - let repo = hydrant 155 - .repos 156 - .resolve(&req.repo) 157 - .await 158 - .map_err(|e| internal_error(GetRecordRequest::PATH, e))?; 159 - let list = repo 160 - .list_records(req.collection.as_str(), limit, reverse, cursor) 161 - .await 162 - .map_err(|e| bad_request(ListRecordsRequest::PATH, e))?; 163 - 164 - let records = list 165 - .records 166 - .into_iter() 167 - .filter_map(|r| { 168 - let uri = AtUri::from_parts_owned( 169 - repo.did.as_str(), 170 - req.collection.as_str(), 171 - r.rkey.as_str(), 172 - ) 173 - .ok()?; 174 - Some(RepoRecord { 175 - uri, 176 - cid: r.cid, 177 - value: r.value, 178 - extra_data: Default::default(), 179 - }) 180 - }) 181 - .collect(); 182 - 183 - Ok(Json(ListRecordsOutput { 184 - records, 185 - cursor: list.cursor.map(|r| r.into()), 186 - extra_data: Default::default(), 187 - })) 188 - } 189 - 190 - #[derive(Serialize, Deserialize, jacquard_derive::IntoStatic)] 191 - pub struct CountRecordsOutput { 192 - pub count: u64, 193 - } 194 - 195 - pub struct CountRecordsResponse; 196 - impl jacquard_common::xrpc::XrpcResp for CountRecordsResponse { 197 - const NSID: &'static str = "systems.gaze.hydrant.countRecords"; 198 - const ENCODING: &'static str = "application/json"; 199 - type Output<'de> = CountRecordsOutput; 200 - type Err<'de> = GenericXrpcError; 201 - } 202 - 203 - #[derive(Serialize, Deserialize, jacquard_derive::IntoStatic)] 204 - pub struct CountRecordsRequestData<'i> { 205 - #[serde(borrow)] 206 - pub identifier: AtIdentifier<'i>, 207 - pub collection: String, 208 - } 209 - 210 - impl<'a> jacquard_common::xrpc::XrpcRequest for CountRecordsRequestData<'a> { 211 - const NSID: &'static str = "systems.gaze.hydrant.countRecords"; 212 - const METHOD: jacquard_common::xrpc::XrpcMethod = jacquard_common::xrpc::XrpcMethod::Query; 213 - type Response = CountRecordsResponse; 214 - } 215 - 216 - pub struct CountRecords; 217 - impl jacquard_common::xrpc::XrpcEndpoint for CountRecords { 218 - const PATH: &'static str = "/xrpc/systems.gaze.hydrant.countRecords"; 219 - const METHOD: jacquard_common::xrpc::XrpcMethod = jacquard_common::xrpc::XrpcMethod::Query; 220 - type Request<'de> = CountRecordsRequestData<'de>; 221 - type Response = CountRecordsResponse; 222 - } 223 - 224 - pub async fn handle_count_records( 225 - State(hydrant): State<Hydrant>, 226 - ExtractXrpc(req): ExtractXrpc<CountRecords>, 227 - ) -> XrpcResult<Json<CountRecordsOutput>> { 228 - let count = hydrant 229 - .repos 230 - .resolve(&req.identifier) 231 - .await 232 - .map_err(|e| internal_error(GetRecordRequest::PATH, e))? 233 - .count_records(&req.collection) 234 - .await 235 - .map_err(|e| internal_error(CountRecords::PATH, e))?; 236 - 237 - Ok(Json(CountRecordsOutput { count })) 238 - }
+51
src/api/xrpc/count_records.rs
··· 1 + use super::*; 2 + 3 + #[derive(Serialize, Deserialize, jacquard_derive::IntoStatic)] 4 + pub struct CountRecordsOutput { 5 + pub count: u64, 6 + } 7 + 8 + pub struct CountRecordsResponse; 9 + impl jacquard_common::xrpc::XrpcResp for CountRecordsResponse { 10 + const NSID: &'static str = "systems.gaze.hydrant.countRecords"; 11 + const ENCODING: &'static str = "application/json"; 12 + type Output<'de> = CountRecordsOutput; 13 + type Err<'de> = GenericXrpcError; 14 + } 15 + 16 + #[derive(Serialize, Deserialize, jacquard_derive::IntoStatic)] 17 + pub struct CountRecordsRequestData<'i> { 18 + #[serde(borrow)] 19 + pub identifier: AtIdentifier<'i>, 20 + pub collection: String, 21 + } 22 + 23 + impl<'a> jacquard_common::xrpc::XrpcRequest for CountRecordsRequestData<'a> { 24 + const NSID: &'static str = "systems.gaze.hydrant.countRecords"; 25 + const METHOD: jacquard_common::xrpc::XrpcMethod = jacquard_common::xrpc::XrpcMethod::Query; 26 + type Response = CountRecordsResponse; 27 + } 28 + 29 + pub struct CountRecords; 30 + impl jacquard_common::xrpc::XrpcEndpoint for CountRecords { 31 + const PATH: &'static str = "/xrpc/systems.gaze.hydrant.countRecords"; 32 + const METHOD: jacquard_common::xrpc::XrpcMethod = jacquard_common::xrpc::XrpcMethod::Query; 33 + type Request<'de> = CountRecordsRequestData<'de>; 34 + type Response = CountRecordsResponse; 35 + } 36 + 37 + pub async fn handle( 38 + State(hydrant): State<Hydrant>, 39 + ExtractXrpc(req): ExtractXrpc<CountRecords>, 40 + ) -> XrpcResult<Json<CountRecordsOutput>> { 41 + let count = hydrant 42 + .repos 43 + .resolve(&req.identifier) 44 + .await 45 + .map_err(|e| internal_error(GetRecordRequest::PATH, e))? 46 + .count_records(&req.collection) 47 + .await 48 + .map_err(|e| internal_error(CountRecords::PATH, e))?; 49 + 50 + Ok(Json(CountRecordsOutput { count })) 51 + }
+33
src/api/xrpc/get_record.rs
··· 1 + use super::*; 2 + 3 + pub async fn handle( 4 + State(hydrant): State<Hydrant>, 5 + ExtractXrpc(req): ExtractXrpc<GetRecordRequest>, 6 + ) -> Result<Json<GetRecordOutput<'static>>, XrpcErrorResponse<GetRecordError<'static>>> { 7 + let record = hydrant 8 + .repos 9 + .resolve(&req.repo) 10 + .await 11 + .map_err(|e| internal_error(GetRecordRequest::PATH, e))? 12 + .get_record(&req.collection, &req.rkey.0) 13 + .await 14 + .map_err(|e| internal_error(GetRecordRequest::PATH, e))?; 15 + let Some(record) = record else { 16 + return Err(XrpcErrorResponse { 17 + status: StatusCode::NOT_FOUND, 18 + error: XrpcError::Xrpc(GetRecordError::RecordNotFound(None)), 19 + }); 20 + }; 21 + 22 + Ok(Json(GetRecordOutput { 23 + uri: AtUri::from_parts_owned( 24 + record.did.as_str(), 25 + req.collection.as_str(), 26 + req.rkey.0.as_str(), 27 + ) 28 + .unwrap(), 29 + cid: Some(record.cid), 30 + value: record.value, 31 + extra_data: Default::default(), 32 + })) 33 + }
+45
src/api/xrpc/list_records.rs
··· 1 + use super::*; 2 + 3 + pub async fn handle( 4 + State(hydrant): State<Hydrant>, 5 + ExtractXrpc(req): ExtractXrpc<ListRecordsRequest>, 6 + ) -> Result<Json<ListRecordsOutput<'static>>, XrpcErrorResponse<GenericXrpcError>> { 7 + let limit = req.limit.unwrap_or(50).min(100) as usize; 8 + let reverse = req.reverse.unwrap_or(false); 9 + let cursor = req.cursor.as_deref(); 10 + 11 + let repo = hydrant 12 + .repos 13 + .resolve(&req.repo) 14 + .await 15 + .map_err(|e| internal_error(GetRecordRequest::PATH, e))?; 16 + let list = repo 17 + .list_records(req.collection.as_str(), limit, reverse, cursor) 18 + .await 19 + .map_err(|e| bad_request(ListRecordsRequest::PATH, e))?; 20 + 21 + let records = list 22 + .records 23 + .into_iter() 24 + .filter_map(|r| { 25 + let uri = AtUri::from_parts_owned( 26 + repo.did.as_str(), 27 + req.collection.as_str(), 28 + r.rkey.as_str(), 29 + ) 30 + .ok()?; 31 + Some(RepoRecord { 32 + uri, 33 + cid: r.cid, 34 + value: r.value, 35 + extra_data: Default::default(), 36 + }) 37 + }) 38 + .collect(); 39 + 40 + Ok(Json(ListRecordsOutput { 41 + records, 42 + cursor: list.cursor.map(|r| r.into()), 43 + extra_data: Default::default(), 44 + })) 45 + }
+112
src/api/xrpc/mod.rs
··· 1 + use crate::api::xrpc::count_records::CountRecords; 2 + use crate::control::Hydrant; 3 + use axum::extract::FromRequest; 4 + use axum::response::IntoResponse; 5 + use axum::routing::get; 6 + use axum::{Json, Router, extract::State, http::StatusCode}; 7 + use jacquard_api::com_atproto::repo::{ 8 + get_record::{GetRecordError, GetRecordOutput, GetRecordRequest}, 9 + list_records::{ListRecordsOutput, ListRecordsRequest, Record as RepoRecord}, 10 + }; 11 + use jacquard_common::types::ident::AtIdentifier; 12 + use jacquard_common::xrpc::{XrpcEndpoint, XrpcMethod}; 13 + use jacquard_common::{IntoStatic, xrpc::XrpcRequest}; 14 + use jacquard_common::{ 15 + types::string::AtUri, 16 + xrpc::{GenericXrpcError, XrpcError}, 17 + }; 18 + use serde::{Deserialize, Serialize}; 19 + use smol_str::ToSmolStr; 20 + use std::fmt::Display; 21 + 22 + mod count_records; 23 + mod get_record; 24 + mod list_records; 25 + 26 + pub fn router() -> Router<Hydrant> { 27 + Router::new() 28 + .route(GetRecordRequest::PATH, get(get_record::handle)) 29 + .route(ListRecordsRequest::PATH, get(list_records::handle)) 30 + .route(CountRecords::PATH, get(count_records::handle)) 31 + } 32 + 33 + #[derive(Debug)] 34 + pub struct XrpcErrorResponse<E: IntoStatic + std::error::Error = GenericXrpcError> { 35 + pub status: StatusCode, 36 + pub error: XrpcError<E>, 37 + } 38 + 39 + impl<E: Serialize + IntoStatic + std::error::Error> IntoResponse for XrpcErrorResponse<E> { 40 + fn into_response(self) -> axum::response::Response { 41 + (self.status, Json(self.error)).into_response() 42 + } 43 + } 44 + 45 + pub type XrpcResult<T, E = GenericXrpcError> = Result<T, XrpcErrorResponse<E>>; 46 + 47 + pub struct ExtractXrpc<E: XrpcEndpoint>(pub E::Request<'static>); 48 + 49 + impl<S, E> FromRequest<S> for ExtractXrpc<E> 50 + where 51 + S: Send + Sync, 52 + E: XrpcEndpoint, 53 + E::Request<'static>: Send, 54 + for<'de> E::Request<'de>: Deserialize<'de> + IntoStatic<Output = E::Request<'static>>, 55 + { 56 + type Rejection = XrpcErrorResponse<GenericXrpcError>; 57 + 58 + async fn from_request( 59 + req: axum::extract::Request, 60 + _state: &S, 61 + ) -> Result<Self, Self::Rejection> { 62 + let nsid = E::Request::<'static>::NSID; 63 + match E::METHOD { 64 + XrpcMethod::Query => { 65 + let query = req.uri().query().unwrap_or(""); 66 + let res: E::Request<'_> = 67 + serde_urlencoded::from_str(query).map_err(|e| bad_request(nsid, e))?; 68 + Ok(ExtractXrpc(res.into_static())) 69 + } 70 + XrpcMethod::Procedure(_) => { 71 + let body = axum::body::to_bytes(req.into_body(), usize::MAX) 72 + .await 73 + .map_err(|e| internal_error(nsid, e))?; 74 + let res: E::Request<'_> = 75 + serde_json::from_slice(&body).map_err(|e| bad_request(nsid, e))?; 76 + Ok(ExtractXrpc(res.into_static())) 77 + } 78 + } 79 + } 80 + } 81 + 82 + fn internal_error<E: std::error::Error + IntoStatic>( 83 + nsid: &'static str, 84 + message: impl Display, 85 + ) -> XrpcErrorResponse<E> { 86 + XrpcErrorResponse { 87 + status: StatusCode::INTERNAL_SERVER_ERROR, 88 + error: XrpcError::Generic(GenericXrpcError { 89 + error: "InternalError".into(), 90 + message: Some(message.to_smolstr()), 91 + nsid, 92 + method: "GET", 93 + http_status: StatusCode::INTERNAL_SERVER_ERROR, 94 + }), 95 + } 96 + } 97 + 98 + fn bad_request<E: std::error::Error + IntoStatic>( 99 + nsid: &'static str, 100 + message: impl Display, 101 + ) -> XrpcErrorResponse<E> { 102 + XrpcErrorResponse { 103 + status: StatusCode::BAD_REQUEST, 104 + error: XrpcError::Generic(GenericXrpcError { 105 + error: "InvalidRequest".into(), 106 + message: Some(message.to_smolstr()), 107 + nsid, 108 + method: "GET", 109 + http_status: StatusCode::BAD_REQUEST, 110 + }), 111 + } 112 + }