very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[api,lib] implement iter in ReposControl, remove partition argument from /repos

dawn cbad49c0 00c75ee2

+108 -85
+1 -2
README.md
··· 269 269 270 270 - `GET /repos`: get a list of repositories and their sync status. supports pagination and filtering: 271 271 - `limit`: max results (default 100, max 1000) 272 - - `cursor`: opaque key for paginating. 273 - - `partition`: `all` (default), `pending` (backfill queue), or `resync` (retries) 272 + - `cursor`: did key for paginating. 274 273 - `GET /repos/{did}`: get the sync status and metadata of a specific repository. 275 274 also returns the handle, PDS URL and the atproto signing key (these won't be 276 275 available before the repo has been backfilled once at least).
+18 -79
src/api/repos.rs
··· 1 - use crate::control::{Hydrant, RepoInfo, repo_state_to_info}; 2 - use crate::db::keys; 1 + use std::str::FromStr; 2 + 3 + use crate::control::{Hydrant, RepoInfo}; 3 4 use axum::{ 4 5 Json, Router, 5 6 body::Body, ··· 9 10 routing::{delete, get, post, put}, 10 11 }; 11 12 use jacquard_common::types::did::Did; 13 + use miette::IntoDiagnostic; 12 14 use serde::Deserialize; 13 15 14 16 pub fn router() -> Router<Hydrant> { ··· 29 31 pub struct GetReposParams { 30 32 pub limit: Option<usize>, 31 33 pub cursor: Option<String>, 32 - pub partition: Option<String>, 33 34 } 34 35 35 36 pub async fn handle_get_repos( ··· 38 39 headers: HeaderMap, 39 40 ) -> Result<Response, (StatusCode, String)> { 40 41 let limit = params.limit.unwrap_or(100).min(1000); 41 - let partition = params.partition.unwrap_or_else(|| "all".to_string()); 42 + let cursor = params 43 + .cursor 44 + .map(|c| Did::from_str(&c)) 45 + .transpose() 46 + .map_err(bad_request)?; 42 47 43 48 let items = tokio::task::spawn_blocking(move || { 44 - let db = &hydrant.state.db; 45 - 46 - let to_info = |k: &[u8], v: &[u8]| -> Result<RepoInfo, (StatusCode, String)> { 47 - let repo_state = crate::db::deser_repo_state(v).map_err(internal)?; 48 - let did = crate::db::types::TrimmedDid::try_from(k) 49 - .map_err(internal)? 50 - .to_did(); 51 - 52 - Ok(repo_state_to_info(did, repo_state)) 53 - }; 54 - 55 - let results = match partition.as_str() { 56 - "all" | "resync" => { 57 - let is_all = partition == "all"; 58 - let ks = if is_all { &db.repos } else { &db.resync }; 59 - 60 - let start_bound = if let Some(cursor) = params.cursor { 61 - let did = Did::new_owned(&cursor).map_err(bad_request)?; 62 - let did_key = keys::repo_key(&did); 63 - std::ops::Bound::Excluded(did_key) 64 - } else { 65 - std::ops::Bound::Unbounded 66 - }; 67 - 68 - let mut items = Vec::new(); 69 - for item in ks 70 - .range((start_bound, std::ops::Bound::Unbounded)) 71 - .take(limit) 72 - { 73 - let (k, v) = item.into_inner().map_err(internal)?; 74 - 75 - let repo_state_bytes = if is_all { 76 - v 77 - } else { 78 - db.repos.get(&k).map_err(internal)?.ok_or_else(|| { 79 - internal(format!("repository state missing for {}", partition)) 80 - })? 81 - }; 82 - 83 - items.push(to_info(&k, &repo_state_bytes)?); 84 - } 85 - Ok::<_, (StatusCode, String)>(items) 86 - } 87 - "pending" => { 88 - let start_bound = if let Some(cursor) = params.cursor { 89 - let id = cursor.parse::<u64>().map_err(bad_request)?; 90 - std::ops::Bound::Excluded(id.to_be_bytes().to_vec()) 91 - } else { 92 - std::ops::Bound::Unbounded 93 - }; 94 - 95 - let mut items = Vec::new(); 96 - for item in db 97 - .pending 98 - .range((start_bound, std::ops::Bound::Unbounded)) 99 - .take(limit) 100 - { 101 - let (_, did_key) = item.into_inner().map_err(internal)?; 102 - 103 - if let Ok(Some(v)) = db.repos.get(&did_key) { 104 - items.push(to_info(&did_key, &v)?); 105 - } 106 - } 107 - Ok(items) 108 - } 109 - _ => Err((StatusCode::BAD_REQUEST, "invalid partition".to_string())), 110 - }?; 111 - 112 - Ok::<_, (StatusCode, String)>(results) 49 + hydrant 50 + .repos 51 + .iter(cursor.as_ref()) 52 + .take(limit) 53 + .collect::<miette::Result<Vec<_>>>() 113 54 }) 114 55 .await 115 - .map_err(internal)??; 56 + .into_diagnostic() 57 + .flatten() 58 + .map_err(internal)?; 116 59 117 60 if prefers_json(&headers) { 118 61 return Ok(Json(items).into_response()); ··· 201 144 .filter_map(|item| Did::new_owned(&item.did).ok()) 202 145 .collect(); 203 146 204 - let queued = hydrant 205 - .repos 206 - .resync(dids) 207 - .await 208 - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 147 + let queued = hydrant.repos.resync(dids).await.map_err(internal)?; 209 148 210 149 Ok(did_list_response(queued, &headers)) 211 150 }
-1
src/control/mod.rs
··· 7 7 pub use crawler::{CrawlerHandle, CrawlerSourceInfo}; 8 8 pub use filter::{FilterControl, FilterPatch, FilterSnapshot}; 9 9 pub use firehose::{FirehoseHandle, FirehoseSourceInfo}; 10 - pub(crate) use repos::repo_state_to_info; 11 10 pub use repos::{ListedRecord, Record, RecordList, RepoHandle, RepoInfo, ReposControl}; 12 11 13 12 use std::collections::BTreeMap;
+89 -3
src/control/repos.rs
··· 8 8 use jacquard_common::types::string::{Did, Handle, Rkey}; 9 9 use jacquard_common::types::tid::Tid; 10 10 use jacquard_common::{CowStr, Data, IntoStatic}; 11 - use miette::{IntoDiagnostic, Result}; 11 + use miette::{Context, IntoDiagnostic, Result}; 12 12 use rand::Rng; 13 13 use smol_str::ToSmolStr; 14 14 use url::Url; 15 15 16 - use crate::db::types::DbRkey; 16 + use crate::db::types::{DbRkey, TrimmedDid}; 17 17 use crate::db::{self, Db, keys, ser_repo_state}; 18 18 use crate::state::AppState; 19 19 use crate::types::{GaugeState, RepoState, RepoStatus}; ··· 67 67 pub struct ReposControl(pub(super) Arc<AppState>); 68 68 69 69 impl ReposControl { 70 - /// gets a handle for a repository to allow acting upon it. 70 + /// iterates through all repositories, returning their state. 71 + pub fn iter(&self, cursor: Option<&Did<'_>>) -> impl Iterator<Item = Result<RepoInfo>> { 72 + let start_bound = if let Some(cursor) = cursor { 73 + let did_key = keys::repo_key(cursor); 74 + std::ops::Bound::Excluded(did_key) 75 + } else { 76 + std::ops::Bound::Unbounded 77 + }; 78 + 79 + self.0 80 + .db 81 + .repos 82 + .range((start_bound, std::ops::Bound::Unbounded)) 83 + .map(|g| { 84 + let (k, v) = g.into_inner().into_diagnostic()?; 85 + let repo_state = crate::db::deser_repo_state(&v)?; 86 + let did = TrimmedDid::try_from(k.as_ref())?.to_did(); 87 + Ok(repo_state_to_info(did, repo_state)) 88 + }) 89 + } 90 + 91 + #[allow(dead_code)] 92 + /// iterates through pending repositories, returning their state. 93 + fn iter_pending(&self, cursor: Option<u64>) -> impl Iterator<Item = Result<(u64, RepoInfo)>> { 94 + let start_bound = if let Some(cursor) = cursor { 95 + std::ops::Bound::Excluded(cursor.to_be_bytes().to_vec()) 96 + } else { 97 + std::ops::Bound::Unbounded 98 + }; 99 + 100 + let repos = self.0.db.repos.clone(); 101 + self.0 102 + .db 103 + .pending 104 + .range((start_bound, std::ops::Bound::Unbounded)) 105 + .map(move |g| { 106 + let (id_raw, did_key) = g.into_inner().into_diagnostic()?; 107 + let id = u64::from_be_bytes( 108 + id_raw 109 + .as_ref() 110 + .try_into() 111 + .into_diagnostic() 112 + .wrap_err("can't parse pending key")?, 113 + ); 114 + let Some(bytes) = repos.get(&did_key).into_diagnostic()? else { 115 + // stale pending that we forgot to delete? shouldn't happen though 116 + tracing::warn!(id, did = ?did_key, "stale pending???"); 117 + return Ok(None); 118 + }; 119 + let repo_state = crate::db::deser_repo_state(&bytes)?; 120 + let did = TrimmedDid::try_from(did_key.as_ref())?.to_did(); 121 + Ok(Some((id, repo_state_to_info(did, repo_state)))) 122 + }) 123 + .map(|b| b.transpose()) 124 + .flatten() 125 + } 126 + 127 + #[allow(dead_code)] 128 + fn iter_resync(&self, cursor: Option<&Did<'_>>) -> impl Iterator<Item = Result<RepoInfo>> { 129 + let start_bound = if let Some(cursor) = cursor { 130 + let did_key = keys::repo_key(cursor); 131 + std::ops::Bound::Excluded(did_key) 132 + } else { 133 + std::ops::Bound::Unbounded 134 + }; 135 + 136 + let repos = self.0.db.repos.clone(); 137 + self.0 138 + .db 139 + .resync 140 + .range((start_bound, std::ops::Bound::Unbounded)) 141 + .map(move |g| { 142 + let did_key = g.key().into_diagnostic()?; 143 + let Some(bytes) = repos.get(&did_key).into_diagnostic()? else { 144 + // stale pending that we forgot to delete? shouldn't happen though 145 + tracing::warn!(did = ?did_key, "stale resync???"); 146 + return Ok(None); 147 + }; 148 + let repo_state = crate::db::deser_repo_state(&bytes)?; 149 + let did = TrimmedDid::try_from(did_key.as_ref())?.to_did(); 150 + Ok(Some(repo_state_to_info(did, repo_state))) 151 + }) 152 + .map(|b| b.transpose()) 153 + .flatten() 154 + } 155 + 156 + /// gets a handle for a repository to read from it. 71 157 pub fn get<'i>(&self, did: &Did<'i>) -> Result<RepoHandle<'i>> { 72 158 Ok(RepoHandle { 73 159 state: self.0.clone(),