very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[lib] return actual types in RepoInfo instead of their opaque representations

dawn d1d19573 e4304906

+49 -17
+1
examples/statusphere.rs
··· 110 110 .ok() 111 111 .flatten() 112 112 .and_then(|info| info.handle) 113 + .map(|h| h.to_string()) 113 114 .unwrap_or_else(|| did.to_string()) 114 115 }; 115 116 while let Some(event) = stream.next().await {
+1 -1
src/api/repos.rs
··· 47 47 .map_err(internal)? 48 48 .to_did(); 49 49 50 - Ok(repo_state_to_info(did.to_string(), repo_state)) 50 + Ok(repo_state_to_info(did, repo_state)) 51 51 }; 52 52 53 53 let results = match partition.as_str() {
+34 -16
src/control.rs
··· 8 8 use chrono::{DateTime, Utc}; 9 9 use futures::{FutureExt, Stream}; 10 10 use jacquard_common::types::cid::{ATP_CID_HASH, IpldCid}; 11 - use jacquard_common::types::string::Did; 11 + use jacquard_common::types::string::{Did, Handle}; 12 + use jacquard_common::types::tid::Tid; 12 13 use jacquard_common::{CowStr, IntoStatic, RawData}; 13 14 use jacquard_repo::DAG_CBOR_CID_CODEC; 14 15 use miette::{IntoDiagnostic, Result}; ··· 16 17 use sha2::{Digest, Sha256}; 17 18 use tokio::sync::{mpsc, watch}; 18 19 use tracing::{debug, error, info}; 20 + use url::Url; 19 21 20 22 use crate::backfill::BackfillWorker; 21 23 use crate::config::{Config, SignatureVerification}; ··· 24 26 use crate::ingest::{firehose::FirehoseIngestor, worker::FirehoseWorker}; 25 27 use crate::state::AppState; 26 28 use crate::types::{ 27 - BroadcastEvent, GaugeState, MarshallableEvt, RecordEvt, RepoState, StoredData, StoredEvent, 29 + BroadcastEvent, GaugeState, MarshallableEvt, RecordEvt, RepoState, RepoStatus, StoredData, 30 + StoredEvent, 28 31 }; 29 32 30 33 /// an event emitted by the hydrant event stream. ··· 999 1002 } 1000 1003 } 1001 1004 1002 - // --- repos control --- 1003 - 1004 1005 /// information about a tracked or known repository. returned by [`ReposControl`] methods. 1005 1006 #[derive(Debug, Clone, serde::Serialize)] 1006 1007 pub struct RepoInfo { 1007 - pub did: String, 1008 - pub status: String, 1008 + /// the DID of the repository. 1009 + pub did: Did<'static>, 1010 + /// the status of the repository. 1011 + #[serde(serialize_with = "crate::util::repo_status_serialize_str")] 1012 + pub status: RepoStatus, 1013 + /// whether this repository is tracked or not. 1014 + /// untracked repositories are not updated and they stay frozen. 1009 1015 pub tracked: bool, 1016 + /// the revision of the root commit of this repository. 1010 1017 #[serde(skip_serializing_if = "Option::is_none")] 1011 - pub rev: Option<String>, 1018 + pub rev: Option<Tid>, 1019 + /// the CID of the root commit of this repository. 1020 + #[serde(serialize_with = "crate::util::opt_cid_serialize_str")] 1012 1021 #[serde(skip_serializing_if = "Option::is_none")] 1013 - pub handle: Option<String>, 1022 + pub data: Option<IpldCid>, 1023 + /// the handle for the DID of this repository. 1014 1024 #[serde(skip_serializing_if = "Option::is_none")] 1015 - pub pds: Option<String>, 1025 + pub handle: Option<Handle<'static>>, 1026 + /// the URL for the PDS in which this repository is hosted on. 1027 + #[serde(skip_serializing_if = "Option::is_none")] 1028 + pub pds: Option<Url>, 1029 + /// ATProto signing key of this repository. 1016 1030 #[serde(skip_serializing_if = "Option::is_none")] 1017 1031 pub signing_key: Option<String>, 1032 + /// when this repository was last touched (status update, commit ingested, etc.). 1018 1033 #[serde(skip_serializing_if = "Option::is_none")] 1019 1034 pub last_updated_at: Option<DateTime<Utc>>, 1035 + /// the time of the last message gotten from the firehose for this repository. 1036 + /// this is equal to the `time` field. 1020 1037 #[serde(skip_serializing_if = "Option::is_none")] 1021 1038 pub last_message_at: Option<DateTime<Utc>>, 1022 1039 } ··· 1038 1055 /// has never seen this DID. 1039 1056 pub async fn get(&self, did: &Did<'_>) -> Result<Option<RepoInfo>> { 1040 1057 let did_key = keys::repo_key(did); 1041 - let did_str = did.as_str().to_owned(); 1042 1058 let db = self.0.db.clone(); 1059 + let did = did.clone().into_static(); 1043 1060 1044 1061 tokio::task::spawn_blocking(move || { 1045 1062 let bytes = db.repos.get(&did_key).into_diagnostic()?; 1046 1063 let state = bytes.as_deref().map(db::deser_repo_state).transpose()?; 1047 - Ok(state.map(|s| repo_state_to_info(did_str, s))) 1064 + Ok(state.map(|s| repo_state_to_info(did, s))) 1048 1065 }) 1049 1066 .await 1050 1067 .into_diagnostic()? ··· 1169 1186 } 1170 1187 } 1171 1188 1172 - pub fn repo_state_to_info(did: String, s: RepoState<'_>) -> RepoInfo { 1189 + pub fn repo_state_to_info(did: Did<'static>, s: RepoState<'_>) -> RepoInfo { 1173 1190 RepoInfo { 1174 1191 did, 1175 - status: s.status.to_string(), 1192 + status: s.status, 1176 1193 tracked: s.tracked, 1177 - rev: s.rev.as_ref().map(|r| r.to_string()), 1178 - handle: s.handle.map(|h| h.to_string()), 1179 - pds: s.pds.map(|p| p.to_string()), 1194 + rev: s.rev.map(|r| r.to_tid()), 1195 + data: s.data, 1196 + handle: s.handle.map(|h| h.into_static()), 1197 + pds: s.pds.and_then(|p| p.parse().ok()), 1180 1198 signing_key: s.signing_key.map(|k| k.encode()), 1181 1199 last_updated_at: DateTime::from_timestamp_secs(s.last_updated_at), 1182 1200 last_message_at: s.last_message_time.and_then(DateTime::from_timestamp_secs),
+13
src/util.rs
··· 6 6 use tokio::sync::watch; 7 7 use tracing::info; 8 8 9 + use crate::types::RepoStatus; 10 + 9 11 /// outcome of [`RetryWithBackoff::retry`] when the operation does not succeed. 10 12 pub enum RetryOutcome<E> { 11 13 /// ratelimited after exhausting all retries ··· 141 143 .transpose() 142 144 .map_err(serde::de::Error::custom) 143 145 } 146 + 147 + pub fn opt_cid_serialize_str<S: Serializer>(v: &Option<cid::Cid>, s: S) -> Result<S::Ok, S::Error> { 148 + match v { 149 + Some(cid) => s.serialize_some(cid.to_string().as_str()), 150 + None => s.serialize_none(), 151 + } 152 + } 153 + 154 + pub fn repo_status_serialize_str<S: Serializer>(v: &RepoStatus, s: S) -> Result<S::Ok, S::Error> { 155 + s.serialize_str(&v.to_string()) 156 + }