···11-mod crawler;
22-mod filter;
33-mod firehose;
44-mod repos;
55-mod stream;
11+pub(crate) mod crawler;
22+pub(crate) mod filter;
33+pub(crate) mod firehose;
44+pub(crate) mod repos;
55+pub(crate) mod stream;
6677pub use crawler::{CrawlerHandle, CrawlerSourceInfo};
88pub use filter::{FilterControl, FilterPatch, FilterSnapshot};
+123-8
src/control/repos.rs
···11+use std::collections::HashMap;
12use std::sync::Arc;
2334use chrono::{DateTime, Utc};
···56use jacquard_common::cowstr::ToCowStr;
67use jacquard_common::types::cid::{Cid, IpldCid};
78use jacquard_common::types::ident::AtIdentifier;
99+use jacquard_common::types::nsid::Nsid;
810use jacquard_common::types::string::{Did, Handle, Rkey};
911use jacquard_common::types::tid::Tid;
1012use jacquard_common::{CowStr, Data, IntoStatic};
···1315use smol_str::ToSmolStr;
1416use url::Url;
15171616-use crate::db::types::{DbRkey, TrimmedDid};
1818+use crate::db::types::{DbRkey, DidKey, TrimmedDid};
1719use crate::db::{self, Db, keys, ser_repo_state};
1820use crate::state::AppState;
1921use crate::types::{GaugeState, RepoState, RepoStatus};
···3739 #[serde(skip_serializing_if = "Option::is_none")]
3840 pub data: Option<IpldCid>,
3941 /// the handle for the DID of this repository.
4242+ ///
4343+ /// note that this handle is not bi-directionally verified.
4044 #[serde(skip_serializing_if = "Option::is_none")]
4145 pub handle: Option<Handle<'static>>,
4246 /// the URL for the PDS in which this repository is hosted on.
4347 #[serde(skip_serializing_if = "Option::is_none")]
4448 pub pds: Option<Url>,
4549 /// ATProto signing key of this repository.
5050+ #[serde(serialize_with = "crate::util::opt_did_key_serialize_str")]
4651 #[serde(skip_serializing_if = "Option::is_none")]
4747- pub signing_key: Option<String>,
5252+ pub signing_key: Option<DidKey<'static>>,
4853 /// when this repository was last touched (status update, commit ingested, etc.).
4954 #[serde(skip_serializing_if = "Option::is_none")]
5055 pub last_updated_at: Option<DateTime<Utc>>,
···154159 }
155160156161 /// gets a handle for a repository to read from it.
157157- pub fn get<'i>(&self, did: &Did<'i>) -> Result<RepoHandle<'i>> {
158158- Ok(RepoHandle {
162162+ pub fn get<'i>(&self, did: &Did<'i>) -> RepoHandle<'i> {
163163+ RepoHandle {
159164 state: self.0.clone(),
160165 did: did.clone(),
161161- })
166166+ }
162167 }
163168164169 /// same as [`ReposControl::get`] but allows you to pass in an identifier that can be
···171176 })
172177 }
173178174174- /// fetch the current state of repository.
179179+ /// fetch the current state of a repository.
175180 /// returns `None` if hydrant has never seen this repository.
176181 pub async fn info(&self, did: &Did<'_>) -> Result<Option<RepoInfo>> {
177177- self.get(did)?.info().await
182182+ self.get(did).info().await
178183 }
179184180185 fn _resync(
···384389 data: s.data,
385390 handle: s.handle.map(|h| h.into_static()),
386391 pds: s.pds.and_then(|p| p.parse().ok()),
387387- signing_key: s.signing_key.map(|k| k.encode()),
392392+ signing_key: s.signing_key.map(|k| k.into_static()),
388393 last_updated_at: DateTime::from_timestamp_secs(s.last_updated_at),
389394 last_message_at: s.last_message_time.and_then(DateTime::from_timestamp_secs),
390395 }
···407412 pub cursor: Option<Rkey<'static>>,
408413}
409414415415+#[derive(Debug, thiserror::Error)]
416416+pub enum MiniDocError {
417417+ #[error("repo is not synced yet")]
418418+ NotSynced,
419419+ #[error("repo not found")]
420420+ RepoNotFound,
421421+ #[error("could not resolve identity")]
422422+ CouldNotResolveIdentity,
423423+ #[error("{0}")]
424424+ Other(miette::Error),
425425+}
426426+427427+/// a mini doc with a bi-directionally verified handle.
428428+pub struct MiniDoc<'i> {
429429+ /// the did.
430430+ pub did: Did<'i>,
431431+ /// the handle. if verification fails or no handle is found,
432432+ /// this will be "handle.invalid".
433433+ pub handle: Handle<'i>,
434434+ /// the url of the PDS of this repo.
435435+ pub pds: Url,
436436+ /// the atproto signing key of this repo.
437437+ pub signing_key: DidKey<'i>,
438438+}
439439+410440/// handle to access data related to this repository.
411441#[derive(Clone)]
412442pub struct RepoHandle<'i> {
···415445}
416446417447impl<'i> RepoHandle<'i> {
448448+ /// fetch the current state of this repository.
449449+ /// returns `None` if hydrant has never seen this repository.
418450 pub async fn info(&self) -> Result<Option<RepoInfo>> {
419451 let did_key = keys::repo_key(&self.did);
420452 let state = self.state.clone();
···429461 .into_diagnostic()?
430462 }
431463464464+ /// returns the collections of this repository and the number of records it has in each.
465465+ pub async fn collections(&self) -> Result<HashMap<Nsid<'static>, u64>> {
466466+ let did = self.did.clone().into_static();
467467+ let state = self.state.clone();
468468+469469+ tokio::task::spawn_blocking(move || {
470470+ let prefix = keys::did_collection_prefix(&did);
471471+ let mut res = HashMap::new();
472472+ for item in state.db.counts.prefix(&prefix) {
473473+ let (k, v) = item.into_inner().into_diagnostic()?;
474474+ let col = k
475475+ .strip_prefix(prefix.as_slice())
476476+ .ok_or_else(|| miette::miette!("invalid collection count key: {k:?}"))
477477+ .and_then(|r| std::str::from_utf8(r).into_diagnostic())
478478+ .and_then(|n| Nsid::new(n).into_diagnostic())?
479479+ .into_static();
480480+ let count = u64::from_be_bytes(
481481+ v.as_ref()
482482+ .try_into()
483483+ .into_diagnostic()
484484+ .wrap_err("expected to be count (8 bytes)")?,
485485+ );
486486+ res.insert(col, count);
487487+ }
488488+ Ok(res)
489489+ })
490490+ .await
491491+ .into_diagnostic()?
492492+ }
493493+494494+ /// returns a bi-directionally validated mini doc.
495495+ pub async fn mini_doc(&self) -> Result<MiniDoc<'static>, MiniDocError> {
496496+ fn invalid_handle() -> Handle<'static> {
497497+ unsafe { Handle::unchecked("handle.invalid") }
498498+ }
499499+500500+ let Some(info) = self.info().await.map_err(MiniDocError::Other)? else {
501501+ return Err(MiniDocError::RepoNotFound);
502502+ };
503503+504504+ if info.status == RepoStatus::Backfilling {
505505+ return Err(MiniDocError::NotSynced);
506506+ }
507507+508508+ let pds = info
509509+ .pds
510510+ .ok_or_else(|| MiniDocError::CouldNotResolveIdentity)?;
511511+ let signing_key = info
512512+ .signing_key
513513+ .ok_or_else(|| MiniDocError::CouldNotResolveIdentity)?
514514+ .into_static();
515515+516516+ let handle = if let Some(handle_unverified) = info.handle {
517517+ let id = AtIdentifier::Handle(handle_unverified);
518518+ let handle_did = self
519519+ .state
520520+ .resolver
521521+ .resolve_did(&id)
522522+ .await
523523+ .into_diagnostic()
524524+ .map_err(MiniDocError::Other)?;
525525+526526+ (handle_did == self.did)
527527+ .then(|| match id {
528528+ AtIdentifier::Handle(h) => h,
529529+ _ => unreachable!("can only be handle"),
530530+ })
531531+ .unwrap_or_else(invalid_handle)
532532+ } else {
533533+ invalid_handle()
534534+ };
535535+536536+ Ok(MiniDoc {
537537+ did: self.did.clone().into_static(),
538538+ handle,
539539+ pds,
540540+ signing_key,
541541+ })
542542+ }
543543+544544+ /// gets a record from this repository.
432545 pub async fn get_record(&self, collection: &str, rkey: &str) -> Result<Option<Record>> {
433546 let did = self.did.clone().into_static();
434547 let db_key = keys::record_key(&did, collection, &DbRkey::new(rkey));
···464577 .into_diagnostic()?
465578 }
466579580580+ /// lists records from this repository.
467581 pub async fn list_records(
468582 &self,
469583 collection: &str,
···559673 })
560674 }
561675676676+ /// gets how many records of a collection this repository has.
562677 pub async fn count_records(&self, collection: &str) -> Result<u64> {
563678 let did = self.did.clone().into_static();
564679 let state = self.state.clone();