very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[lib,api] fix iterating through repos and getting repo info in relay mode

dawn fd64a728 b994039c

+44 -28
+1 -1
src/api/xrpc/list_repos.rs
··· 31 31 let mut next_cursor: Option<Did<'static>> = None; 32 32 33 33 for item in hydrant.repos.iter_states(cursor.as_ref()) { 34 - let (did, state, _metadata) = item?; 34 + let (did, state) = item?; 35 35 36 36 // skip repos that haven't been synced at least once 37 37 let Some(commit) = state.root else {
+43 -27
src/control/repos/mod.rs
··· 27 27 #[cfg(feature = "indexer")] 28 28 pub use indexer::*; 29 29 30 - /// information about a tracked or known repository. returned by [`ReposControl`] methods. 30 + /// information about a repository known to hydrant. returned by [`ReposControl`] methods. 31 31 #[derive(Debug, Clone, serde::Serialize)] 32 32 pub struct RepoInfo { 33 33 /// the DID of the repository. ··· 37 37 pub status: RepoStatus, 38 38 /// whether this repository is tracked or not. 39 39 /// untracked repositories are not updated and they stay frozen. 40 + /// 41 + /// this will always be `true` in relay mode. 40 42 pub tracked: bool, 41 43 /// the revision of the root commit of this repository. 42 44 #[serde(skip_serializing_if = "Option::is_none")] ··· 66 68 pub last_message_at: Option<DateTime<Utc>>, 67 69 } 68 70 69 - /// control over which repositories are tracked and access to their state. 71 + /// control over repositories and access to their state. 70 72 /// 71 - /// in `filter` mode, a repo is only indexed if it either matches a signal or is 72 - /// explicitly tracked via [`ReposControl::track`]. in `full` mode all repos are 73 - /// indexed and tracking is implicit. 73 + /// in indexer mode, repositories can be explicitly tracked or untracked: 74 + /// - in `filter` mode, a repo is only indexed if it matches a signal or is 75 + /// explicitly tracked via [`ReposControl::track`]. in `full` mode all repos 76 + /// are indexed and tracking is implicit. 77 + /// - tracking a DID that hydrant has never seen enqueues an immediate backfill. 78 + /// - tracking a DID that hydrant already knows about (but has marked untracked) 79 + /// re-enqueues it for backfill. 74 80 /// 75 - /// tracking a DID that hydrant has never seen enqueues an immediate backfill. 76 - /// tracking a DID that hydrant already knows about (but has marked untracked) 77 - /// re-enqueues it for backfill. 81 + /// in relay mode, all observed repositories are passively indexed. explicit 82 + /// tracking and backfill do not apply; [`RepoInfo::tracked`] is always `true`. 78 83 #[derive(Clone)] 79 84 pub struct ReposControl(pub(super) Arc<AppState>); 80 85 ··· 82 87 pub(crate) fn iter_states( 83 88 &self, 84 89 cursor: Option<&Did<'_>>, 85 - ) -> impl Iterator<Item = Result<(Did<'static>, RepoState<'static>, crate::types::RepoMetadata)>> 86 - { 90 + ) -> impl Iterator<Item = Result<(Did<'static>, RepoState<'static>)>> { 87 91 let start_bound = if let Some(cursor) = cursor { 88 92 let did_key = keys::repo_key(cursor); 89 93 std::ops::Bound::Excluded(did_key) ··· 91 95 std::ops::Bound::Unbounded 92 96 }; 93 97 94 - let state = self.0.clone(); 95 98 self.0 96 99 .db 97 100 .repos 98 101 .range((start_bound, std::ops::Bound::Unbounded)) 99 - .map(move |g| { 102 + .map(|g| { 100 103 let (k, v) = g.into_inner().into_diagnostic()?; 101 104 let repo_state = crate::db::deser_repo_state(&v)?.into_static(); 102 105 let did = TrimmedDid::try_from(k.as_ref())?.to_did(); 103 - let metadata_key = keys::repo_metadata_key(&did); 104 - let metadata = state 105 - .db 106 - .repo_metadata 107 - .get(&metadata_key) 108 - .into_diagnostic()? 109 - .ok_or_else(|| miette::miette!("repo metadata not found for {}", did))?; 110 - let metadata = crate::db::deser_repo_meta(metadata.as_ref())?; 111 - Ok((did, repo_state, metadata)) 106 + Ok((did, repo_state)) 112 107 }) 113 108 } 114 109 115 110 /// iterates through all repositories, returning their state. 116 111 pub fn iter(&self, cursor: Option<&Did<'_>>) -> impl Iterator<Item = Result<RepoInfo>> { 117 - self.iter_states(cursor) 118 - .map(|r| r.map(|(did, s, m)| repo_state_to_info(did, s, m.tracked))) 112 + #[cfg(feature = "indexer")] 113 + let state = self.0.clone(); 114 + self.iter_states(cursor).map(move |r| { 115 + r.and_then(|(did, s)| { 116 + #[cfg(feature = "indexer")] 117 + let tracked = state 118 + .db 119 + .repo_metadata 120 + .get(&keys::repo_metadata_key(&did)) 121 + .into_diagnostic()? 122 + .map(|b| crate::db::deser_repo_meta(&b)) 123 + .transpose()? 124 + .map_or(true, |m| m.tracked); 125 + #[cfg(not(feature = "indexer"))] 126 + let tracked = true; 127 + Ok(repo_state_to_info(did, s, tracked)) 128 + }) 129 + }) 119 130 } 120 131 121 132 /// gets a handle for a repository to read from it. ··· 233 244 pub async fn info(&self) -> Result<Option<RepoInfo>> { 234 245 let did = self.did.clone().into_static(); 235 246 let did_key = keys::repo_key(&did); 247 + #[cfg(feature = "indexer")] 236 248 let metadata_key = keys::repo_metadata_key(&did); 237 249 let app_state = self.state.clone(); 238 250 ··· 243 255 }; 244 256 let repo_state = crate::db::deser_repo_state(&state_bytes)?; 245 257 246 - let metadata_bytes = app_state 258 + #[cfg(feature = "indexer")] 259 + let tracked = app_state 247 260 .db 248 261 .repo_metadata 249 262 .get(&metadata_key) 250 263 .into_diagnostic()? 251 - .ok_or_else(|| miette::miette!("repo metadata not found for {}", did))?; 252 - let metadata = crate::db::deser_repo_meta(&metadata_bytes)?; 264 + .map(|b| crate::db::deser_repo_meta(&b)) 265 + .transpose()? 266 + .map_or(true, |m| m.tracked); 267 + #[cfg(not(feature = "indexer"))] 268 + let tracked = true; 253 269 254 - Ok(Some(repo_state_to_info(did, repo_state, metadata.tracked))) 270 + Ok(Some(repo_state_to_info(did, repo_state, tracked))) 255 271 }) 256 272 .await 257 273 .into_diagnostic()?