···4343 continue;
4444 }
4545 };
4646- let mut metadata = crate::db::deser_repo_metadata(&metadata_bytes)?;
4646+ let mut metadata = crate::db::deser_repo_meta(&metadata_bytes)?;
47474848 // move from resync back into pending
4949 batch.remove(&state.db.resync, key.clone());
···5858 batch.insert(
5959 &state.db.repo_metadata,
6060 &metadata_key,
6161- crate::db::ser_repo_metadata(&metadata)?,
6161+ crate::db::ser_repo_meta(&metadata)?,
6262 );
63636464 transitions.push((GaugeState::Resync(None), GaugeState::Pending));
···133133 continue;
134134 }
135135 };
136136- let mut metadata = match crate::db::deser_repo_metadata(
137137- metadata_bytes.as_ref(),
138138- ) {
136136+ let mut metadata = match crate::db::deser_repo_meta(metadata_bytes.as_ref())
137137+ {
139138 Ok(m) => m,
140139 Err(e) => {
141140 error!(did = %did, err = %e, "failed to deserialize repo metadata");
···153152 keys::pending_key(metadata.index_id),
154153 key.clone(),
155154 );
156156- let serialized_metadata = match crate::db::ser_repo_metadata(&metadata) {
155155+ let serialized_metadata = match crate::db::ser_repo_meta(&metadata) {
157156 Ok(s) => s,
158157 Err(e) => {
159158 error!(did = %did, err = %e, "failed to serialize repo metadata");
+3-3
src/backfill/mod.rs
···742742 .get(&metadata_key)
743743 .into_diagnostic()?
744744 .ok_or_else(|| miette::miette!("repo metadata not found for {}", did))?;
745745- let mut metadata = crate::db::deser_repo_metadata(&metadata_bytes)?;
745745+ let mut metadata = crate::db::deser_repo_meta(&metadata_bytes)?;
746746 metadata.tracked = true;
747747 batch.insert(
748748 &app_state.db.repo_metadata,
749749 &metadata_key,
750750- crate::db::ser_repo_metadata(&metadata)?,
750750+ crate::db::ser_repo_meta(&metadata)?,
751751 );
752752753753 // add the counts
···771771 .get(&metadata_key)
772772 .into_diagnostic()?
773773 .ok_or_else(|| miette::miette!("repo metadata not found for {}", did))?;
774774- let metadata = crate::db::deser_repo_metadata(metadata_bytes.as_ref())?;
774774+ let metadata = crate::db::deser_repo_meta(metadata_bytes.as_ref())?;
775775776776 let Some((_state, records_cnt_delta, added_blocks, count)) = result else {
777777 // signal mode: no signal-matching records found, clean up the optimistically-added repo
+75
src/control/indexer.rs
···11+use super::*;
22+33+/// a stream of [`Event`]s. returned by [`Hydrant::subscribe`].
44+///
55+/// implements [`futures::Stream`] and can be used with `StreamExt::next`,
66+/// `while let Some(evt) = stream.next().await`, `forward`, etc.
77+/// the stream terminates when the underlying channel closes (i.e. hydrant shuts down).
88+pub struct EventStream(mpsc::Receiver<Event>);
99+1010+impl Stream for EventStream {
1111+ type Item = Event;
1212+1313+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1414+ self.0.poll_recv(cx)
1515+ }
1616+}
1717+1818+/// runtime control over the backfill worker component.
1919+///
2020+/// the backfill worker fetches full repo CAR files from each repo's PDS for any
2121+/// repository in the pending queue, parses the MST, and inserts all matching records
2222+/// into the database. concurrency is bounded by `HYDRANT_BACKFILL_CONCURRENCY_LIMIT`.
2323+#[derive(Clone)]
2424+pub struct BackfillHandle(Arc<AppState>);
2525+2626+impl BackfillHandle {
2727+ pub(crate) fn new(state: Arc<AppState>) -> Self {
2828+ Self(state)
2929+ }
3030+3131+ /// enable the backfill worker, no-op if already enabled.
3232+ pub fn enable(&self) {
3333+ self.0.backfill_enabled.send_replace(true);
3434+ }
3535+ /// disable the backfill worker, in-flight repos complete before pausing.
3636+ pub fn disable(&self) {
3737+ self.0.backfill_enabled.send_replace(false);
3838+ }
3939+ /// returns the current enabled state of the backfill worker.
4040+ pub fn is_enabled(&self) -> bool {
4141+ *self.0.backfill_enabled.borrow()
4242+ }
4343+}
4444+4545+impl Hydrant {
4646+ /// subscribe to the ordered event stream.
4747+ ///
4848+ /// returns an [`EventStream`] that implements [`futures::Stream`].
4949+ ///
5050+ /// - if `cursor` is `None`, streaming starts from the current head (live tail only).
5151+ /// - if `cursor` is `Some(id)`, all persisted `record` events from that ID onward are
5252+ /// replayed first, then the stream will switch to live tailing.
5353+ ///
5454+ /// `identity` and `account` events are ephemeral and are never replayed from a cursor,
5555+ /// only live ones are delivered. use [`ReposControl::info`] to fetch current state for
5656+ /// a specific repository.
5757+ ///
5858+ /// multiple concurrent subscribers each receive a full independent copy of the stream.
5959+ /// the stream ends when the `EventStream` is dropped.
6060+ pub fn subscribe(&self, cursor: Option<u64>) -> EventStream {
6161+ let (tx, rx) = mpsc::channel(500);
6262+ let state = self.state.clone();
6363+ let runtime = tokio::runtime::Handle::current();
6464+6565+ std::thread::Builder::new()
6666+ .name("hydrant-stream".into())
6767+ .spawn(move || {
6868+ let _g = runtime.enter();
6969+ event_stream_thread(state, tx, cursor);
7070+ })
7171+ .expect("failed to spawn stream thread");
7272+7373+ EventStream(rx)
7474+ }
7575+}
+83-164
src/control/mod.rs
···11#![allow(unused_imports)]
2233+#[cfg(feature = "indexer")]
34pub(crate) mod crawler;
45pub(crate) mod filter;
56pub(crate) mod firehose;
···89mod seed;
910pub(crate) mod stream;
10111111-pub use crawler::{CrawlerHandle, CrawlerSourceInfo};
1212+#[cfg(feature = "indexer")]
1313+mod indexer;
1414+#[cfg(feature = "indexer")]
1515+pub use indexer::*;
1616+1717+#[cfg(feature = "relay")]
1818+mod relay;
1919+#[cfg(feature = "relay")]
2020+pub use relay::*;
2121+1222pub use filter::{FilterControl, FilterPatch, FilterSnapshot};
1323pub use firehose::{FirehoseHandle, FirehoseSourceInfo};
1424pub use pds::{PdsControl, PdsTierAssignment, PdsTierDefinition};
···3040#[cfg(feature = "indexer")]
3141use crate::backfill::BackfillWorker;
3242use crate::config::{Config, SignatureVerification};
3333-use crate::db::{
3434- self, filter as db_filter, keys, load_persisted_crawler_sources,
3535- load_persisted_firehose_sources,
3636-};
4343+#[cfg(feature = "indexer")]
4444+use crate::db::load_persisted_crawler_sources;
4545+use crate::db::{self, filter as db_filter, keys, load_persisted_firehose_sources};
3746use crate::filter::FilterMode;
3847#[cfg(feature = "indexer")]
3948use crate::ingest::indexer::FirehoseWorker;
4049use crate::state::AppState;
4150use crate::types::MarshallableEvt;
42514343-use crawler::{CrawlerShared, spawn_crawler_producer};
4452use firehose::{FirehoseShared, spawn_firehose_ingestor};
4553#[cfg(feature = "indexer")]
4654use stream::event_stream_thread;
···8694/// ```
8795#[derive(Clone)]
8896pub struct Hydrant {
8989- pub crawler: CrawlerHandle,
9797+ #[cfg(feature = "indexer")]
9898+ pub crawler: crawler::CrawlerHandle,
9099 pub firehose: FirehoseHandle,
100100+ #[cfg(feature = "indexer")]
91101 pub backfill: BackfillHandle,
92102 pub filter: FilterControl,
93103 pub pds: PdsControl,
···160170 state.filter.store(Arc::new(new_filter));
161171 }
162172163163- // 4. set crawler enabled state from config, evaluated against the post-patch filter
164164- let post_patch_crawler = match config.enable_crawler {
165165- Some(b) => b,
166166- None => {
167167- state.filter.load().mode == FilterMode::Full || !config.crawler_sources.is_empty()
168168- }
169169- };
170170- state.crawler_enabled.send_replace(post_patch_crawler);
173173+ #[cfg(feature = "indexer")]
174174+ {
175175+ // 4. set crawler enabled state from config, evaluated against the post-patch filter
176176+ let post_patch_crawler = match config.enable_crawler {
177177+ Some(b) => b,
178178+ None => {
179179+ state.filter.load().mode == FilterMode::Full
180180+ || !config.crawler_sources.is_empty()
181181+ }
182182+ };
183183+ state.crawler_enabled.send_replace(post_patch_crawler);
184184+ }
171185172186 let state = Arc::new(state);
173187174188 Ok(Self {
175175- crawler: CrawlerHandle {
189189+ #[cfg(feature = "indexer")]
190190+ crawler: crawler::CrawlerHandle {
176191 state: state.clone(),
177192 shared: Arc::new(std::sync::OnceLock::new()),
178193 tasks: Arc::new(scc::HashMap::new()),
179194 persisted: Arc::new(scc::HashSet::new()),
180195 },
181196 firehose: FirehoseHandle::new(state.clone()),
182182- backfill: BackfillHandle(state.clone()),
197197+ #[cfg(feature = "indexer")]
198198+ backfill: BackfillHandle::new(state.clone()),
183199 filter: FilterControl(state.clone()),
184200 pds: pds::PdsControl(state.clone()),
185201 repos: ReposControl(state.clone()),
···209225 pub fn run(&self) -> Result<impl Future<Output = Result<()>>> {
210226 let state = self.state.clone();
211227 let config = self.config.clone();
228228+ #[cfg(feature = "indexer")]
212229 let crawler = self.crawler.clone();
213230 let firehose = self.firehose.clone();
214231···357374 let (fatal_tx_inner, mut fatal_rx) = watch::channel(None);
358375 let fatal_tx = Arc::new(fatal_tx_inner);
359376360360- info!(
361361- crawler_enabled = *state.crawler_enabled.borrow(),
362362- firehose_enabled = *state.firehose_enabled.borrow(),
363363- filter_mode = ?state.filter.load().mode,
364364- "starting ingestion"
365365- );
366366-367377 // 10. set shared and spawn firehose ingestors
368378 firehose
369379 .shared
···516526 // set shared objects so CrawlerHandle methods can use them
517527 crawler
518528 .shared
519519- .set(CrawlerShared {
529529+ .set(crawler::CrawlerShared {
520530 http,
521531 checker,
522532 in_flight,
···530540 // spawn initial sources from config
531541 for source in config.crawler_sources.iter() {
532542 let enabled_rx = state.crawler_enabled.subscribe();
533533- let handle = spawn_crawler_producer(
543543+ let handle = crawler::spawn_crawler_producer(
534544 source,
535545 &shared.http,
536546 &state,
···556566 continue;
557567 }
558568 let enabled_rx = state.crawler_enabled.subscribe();
559559- let handle = spawn_crawler_producer(
569569+ let handle = crawler::spawn_crawler_producer(
560570 source,
561571 &shared.http,
562572 &state,
···662672 Ok(fut)
663673 }
664674665665- /// subscribe to the ordered event stream.
666666- ///
667667- /// returns an [`EventStream`] that implements [`futures::Stream`].
668668- ///
669669- /// - if `cursor` is `None`, streaming starts from the current head (live tail only).
670670- /// - if `cursor` is `Some(id)`, all persisted `record` events from that ID onward are
671671- /// replayed first, then the stream will switch to live tailing.
672672- ///
673673- /// `identity` and `account` events are ephemeral and are never replayed from a cursor,
674674- /// only live ones are delivered. use [`ReposControl::info`] to fetch current state for
675675- /// a specific repository.
676676- ///
677677- /// multiple concurrent subscribers each receive a full independent copy of the stream.
678678- /// the stream ends when the `EventStream` is dropped.
679679- #[cfg(feature = "indexer")]
680680- pub fn subscribe(&self, cursor: Option<u64>) -> EventStream {
681681- let (tx, rx) = mpsc::channel(500);
682682- let state = self.state.clone();
683683- let runtime = tokio::runtime::Handle::current();
684684-685685- std::thread::Builder::new()
686686- .name("hydrant-stream".into())
687687- .spawn(move || {
688688- let _g = runtime.enter();
689689- event_stream_thread(state, tx, cursor);
690690- })
691691- .expect("failed to spawn stream thread");
692692-693693- EventStream(rx)
694694- }
695695-696696- /// subscribe to the relay's ordered `subscribeRepos` event stream.
697697- ///
698698- /// returns a [`RelayEventStream`] that yields pre-encoded CBOR binary frames
699699- /// ready to forward directly to ATProto clients via WebSocket.
700700- ///
701701- /// - if `cursor` is `None`, streaming starts from the current head (live tail only).
702702- /// - if `cursor` is `Some(seq)`, all persisted events from that seq onward are replayed first.
703703- #[cfg(feature = "relay")]
704704- pub fn subscribe_repos(&self, cursor: Option<u64>) -> RelayEventStream {
705705- let (tx, rx) = mpsc::channel(500);
706706- let state = self.state.clone();
707707- let runtime = tokio::runtime::Handle::current();
708708-709709- std::thread::Builder::new()
710710- .name("hydrant-relay-stream".into())
711711- .spawn(move || {
712712- let _g = runtime.enter();
713713- relay_stream_thread(state, tx, cursor);
714714- })
715715- .expect("failed to spawn relay stream thread");
716716-717717- RelayEventStream(rx)
718718- }
719719-720675 /// return database counts and on-disk sizes for all keyspaces.
721676 ///
722677 /// counts include: `repos`, `pending`, `resync`, `records`, `blocks`, `events`,
···726681 pub async fn stats(&self) -> Result<StatsResponse> {
727682 let state = self.state.clone();
728683729729- // todo: update stats, only return necessary info on relay vs indexer modes
730730- // (and ephemeral indexer)
731731- let mut counts: BTreeMap<&'static str, u64> = futures::future::join_all(
732732- [
733733- "repos",
734734- "pending",
735735- "records",
736736- "blocks",
737737- "resync",
738738- "error_ratelimited",
739739- "error_transport",
740740- "error_generic",
741741- ]
742742- .into_iter()
743743- .map(|name| {
684684+ #[allow(unused_mut)]
685685+ let mut count_keys = vec![
686686+ "repos",
687687+ "error_ratelimited",
688688+ "error_transport",
689689+ "error_generic",
690690+ ];
691691+692692+ #[cfg(feature = "indexer")]
693693+ {
694694+ count_keys.push("pending");
695695+ count_keys.push("records");
696696+ count_keys.push("blocks");
697697+ count_keys.push("resync");
698698+ }
699699+700700+ let mut counts: BTreeMap<&'static str, u64> =
701701+ futures::future::join_all(count_keys.into_iter().map(|name| {
744702 let state = state.clone();
745703 async move { (name, state.db.get_count(name).await) }
746746- }),
747747- )
748748- .await
749749- .into_iter()
750750- .collect();
704704+ }))
705705+ .await
706706+ .into_iter()
707707+ .collect();
751708709709+ #[cfg(feature = "indexer")]
752710 counts.insert("events", state.db.events.approximate_len() as u64);
753711712712+ #[cfg(feature = "relay")]
713713+ counts.insert(
714714+ "relay_events",
715715+ state.db.relay_events.approximate_len() as u64,
716716+ );
717717+754718 let sizes = tokio::task::spawn_blocking(move || {
755719 let mut s = BTreeMap::new();
756720 s.insert("repos", state.db.repos.disk_space());
757757- s.insert("records", state.db.records.disk_space());
758758- s.insert("blocks", state.db.blocks.disk_space());
759721 s.insert("cursors", state.db.cursors.disk_space());
760760- s.insert("pending", state.db.pending.disk_space());
761761- s.insert("resync", state.db.resync.disk_space());
762762- s.insert("resync_buffer", state.db.resync_buffer.disk_space());
763763- s.insert("events", state.db.events.disk_space());
764722 s.insert("counts", state.db.counts.disk_space());
765723 s.insert("filter", state.db.filter.disk_space());
766724 s.insert("crawler", state.db.crawler.disk_space());
725725+726726+ #[cfg(feature = "indexer")]
727727+ {
728728+ s.insert("records", state.db.records.disk_space());
729729+ s.insert("blocks", state.db.blocks.disk_space());
730730+ s.insert("pending", state.db.pending.disk_space());
731731+ s.insert("resync", state.db.resync.disk_space());
732732+ s.insert("resync_buffer", state.db.resync_buffer.disk_space());
733733+ s.insert("events", state.db.events.disk_space());
734734+ }
735735+736736+ #[cfg(feature = "relay")]
737737+ s.insert("relay_events", state.db.relay_events.disk_space());
738738+739739+ #[cfg(feature = "backlinks")]
740740+ s.insert("backlinks", state.db.backlinks.disk_space());
741741+767742 s
768743 })
769744 .await
···890865 }
891866}
892867893893-/// a stream of [`Event`]s. returned by [`Hydrant::subscribe`].
894894-///
895895-/// implements [`futures::Stream`] and can be used with `StreamExt::next`,
896896-/// `while let Some(evt) = stream.next().await`, `forward`, etc.
897897-/// the stream terminates when the underlying channel closes (i.e. hydrant shuts down).
898898-#[cfg(feature = "indexer")]
899899-pub struct EventStream(mpsc::Receiver<Event>);
900900-901901-#[cfg(feature = "indexer")]
902902-impl Stream for EventStream {
903903- type Item = Event;
904904-905905- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
906906- self.0.poll_recv(cx)
907907- }
908908-}
909909-910910-/// the relay event stream produced by [`Hydrant::subscribe_repos`].
911911-#[cfg(feature = "relay")]
912912-pub struct RelayEventStream(mpsc::Receiver<bytes::Bytes>);
913913-914914-#[cfg(feature = "relay")]
915915-impl futures::Stream for RelayEventStream {
916916- type Item = bytes::Bytes;
917917-918918- fn poll_next(
919919- mut self: std::pin::Pin<&mut Self>,
920920- cx: &mut std::task::Context<'_>,
921921- ) -> std::task::Poll<Option<Self::Item>> {
922922- self.0.poll_recv(cx)
923923- }
924924-}
925925-926868/// database statistics returned by [`Hydrant::stats`].
927869#[derive(serde::Serialize)]
928870pub struct StatsResponse {
···930872 pub counts: BTreeMap<&'static str, u64>,
931873 /// on-disk size in bytes per keyspace
932874 pub sizes: BTreeMap<&'static str, u64>,
933933-}
934934-935935-/// runtime control over the backfill worker component.
936936-///
937937-/// the backfill worker fetches full repo CAR files from each repo's PDS for any
938938-/// repository in the pending queue, parses the MST, and inserts all matching records
939939-/// into the database. concurrency is bounded by `HYDRANT_BACKFILL_CONCURRENCY_LIMIT`.
940940-#[derive(Clone)]
941941-pub struct BackfillHandle(Arc<AppState>);
942942-943943-impl BackfillHandle {
944944- /// enable the backfill worker, no-op if already enabled.
945945- pub fn enable(&self) {
946946- self.0.backfill_enabled.send_replace(true);
947947- }
948948- /// disable the backfill worker, in-flight repos complete before pausing.
949949- pub fn disable(&self) {
950950- self.0.backfill_enabled.send_replace(false);
951951- }
952952- /// returns the current enabled state of the backfill worker.
953953- pub fn is_enabled(&self) -> bool {
954954- *self.0.backfill_enabled.borrow()
955955- }
956875}
957876958877/// control over database maintenance operations.
+40
src/control/relay.rs
···11+use super::*;
22+33+/// the relay event stream produced by [`Hydrant::subscribe_repos`].
44+pub struct RelayEventStream(mpsc::Receiver<bytes::Bytes>);
55+66+impl futures::Stream for RelayEventStream {
77+ type Item = bytes::Bytes;
88+99+ fn poll_next(
1010+ mut self: std::pin::Pin<&mut Self>,
1111+ cx: &mut std::task::Context<'_>,
1212+ ) -> std::task::Poll<Option<Self::Item>> {
1313+ self.0.poll_recv(cx)
1414+ }
1515+}
1616+1717+impl Hydrant {
1818+ /// subscribe to the relay's ordered `subscribeRepos` event stream.
1919+ ///
2020+ /// returns a [`RelayEventStream`] that yields pre-encoded CBOR binary frames
2121+ /// ready to forward directly to ATProto clients via WebSocket.
2222+ ///
2323+ /// - if `cursor` is `None`, streaming starts from the current head (live tail only).
2424+ /// - if `cursor` is `Some(seq)`, all persisted events from that seq onward are replayed first.
2525+ pub fn subscribe_repos(&self, cursor: Option<u64>) -> RelayEventStream {
2626+ let (tx, rx) = mpsc::channel(500);
2727+ let state = self.state.clone();
2828+ let runtime = tokio::runtime::Handle::current();
2929+3030+ std::thread::Builder::new()
3131+ .name("hydrant-relay-stream".into())
3232+ .spawn(move || {
3333+ let _g = runtime.enter();
3434+ relay_stream_thread(state, tx, cursor);
3535+ })
3636+ .expect("failed to spawn relay stream thread");
3737+3838+ RelayEventStream(rx)
3939+ }
4040+}
+22-357
src/control/repos.rs
src/control/repos/indexer.rs
···11-use std::collections::HashMap;
22-use std::sync::Arc;
33-44-use chrono::{DateTime, Utc};
55-use fjall::OwnedWriteBatch;
66-use futures::TryFutureExt;
77-use jacquard_common::cowstr::ToCowStr;
88-use jacquard_common::types::cid::{Cid, IpldCid};
99-use jacquard_common::types::ident::AtIdentifier;
1010-use jacquard_common::types::nsid::Nsid;
1111-use jacquard_common::types::string::{Did, Handle, Rkey};
1212-use jacquard_common::types::tid::Tid;
1313-use jacquard_common::{CowStr, Data, IntoStatic};
1414-use miette::{Context, IntoDiagnostic, Result};
11+use futures::{FutureExt, TryFutureExt};
152use rand::Rng;
1616-use smol_str::ToSmolStr;
1717-use url::Url;
1831919-use crate::db::types::{DbRkey, DidKey, TrimmedDid};
2020-use crate::db::{self, Db, keys};
2121-use crate::state::AppState;
2222-use crate::types::{GaugeState, RepoMetadata, RepoState, RepoStatus};
2323-use crate::util::invalid_handle;
2424-2525-/// information about a tracked or known repository. returned by [`ReposControl`] methods.
2626-#[derive(Debug, Clone, serde::Serialize)]
2727-pub struct RepoInfo {
2828- /// the DID of the repository.
2929- pub did: Did<'static>,
3030- /// the status of the repository.
3131- #[serde(serialize_with = "crate::util::repo_status_serialize_str")]
3232- pub status: RepoStatus,
3333- /// whether this repository is tracked or not.
3434- /// untracked repositories are not updated and they stay frozen.
3535- pub tracked: bool,
3636- /// the revision of the root commit of this repository.
3737- #[serde(skip_serializing_if = "Option::is_none")]
3838- pub rev: Option<Tid>,
3939- /// the CID of the MST root of this repository.
4040- #[serde(serialize_with = "crate::util::opt_cid_serialize_str")]
4141- #[serde(skip_serializing_if = "Option::is_none")]
4242- pub data: Option<IpldCid>,
4343- /// the handle for the DID of this repository.
4444- ///
4545- /// note that this handle is not bi-directionally verified.
4646- #[serde(skip_serializing_if = "Option::is_none")]
4747- pub handle: Option<Handle<'static>>,
4848- /// the URL for the PDS in which this repository is hosted on.
4949- #[serde(skip_serializing_if = "Option::is_none")]
5050- pub pds: Option<Url>,
5151- /// ATProto signing key of this repository.
5252- #[serde(serialize_with = "crate::util::opt_did_key_serialize_str")]
5353- #[serde(skip_serializing_if = "Option::is_none")]
5454- pub signing_key: Option<DidKey<'static>>,
5555- /// when this repository was last touched (status update, commit ingested, etc.).
5656- #[serde(skip_serializing_if = "Option::is_none")]
5757- pub last_updated_at: Option<DateTime<Utc>>,
5858- /// the time of the last message gotten from the firehose for this repository.
5959- /// this is equal to the `time` field.
6060- #[serde(skip_serializing_if = "Option::is_none")]
6161- pub last_message_at: Option<DateTime<Utc>>,
6262-}
6363-6464-/// control over which repositories are tracked and access to their state.
6565-///
6666-/// in `filter` mode, a repo is only indexed if it either matches a signal or is
6767-/// explicitly tracked via [`ReposControl::track`]. in `full` mode all repos are
6868-/// indexed and tracking is implicit.
6969-///
7070-/// tracking a DID that hydrant has never seen enqueues an immediate backfill.
7171-/// tracking a DID that hydrant already knows about (but has marked untracked)
7272-/// re-enqueues it for backfill.
7373-#[derive(Clone)]
7474-pub struct ReposControl(pub(super) Arc<AppState>);
44+use super::*;
755766impl ReposControl {
7777- pub(crate) fn iter_states(
7878- &self,
7979- cursor: Option<&Did<'_>>,
8080- ) -> impl Iterator<Item = Result<(Did<'static>, RepoState<'static>, crate::types::RepoMetadata)>>
8181- {
8282- let start_bound = if let Some(cursor) = cursor {
8383- let did_key = keys::repo_key(cursor);
8484- std::ops::Bound::Excluded(did_key)
8585- } else {
8686- std::ops::Bound::Unbounded
8787- };
8888-8989- let state = self.0.clone();
9090- self.0
9191- .db
9292- .repos
9393- .range((start_bound, std::ops::Bound::Unbounded))
9494- .map(move |g| {
9595- let (k, v) = g.into_inner().into_diagnostic()?;
9696- let repo_state = crate::db::deser_repo_state(&v)?.into_static();
9797- let did = TrimmedDid::try_from(k.as_ref())?.to_did();
9898- let metadata_key = keys::repo_metadata_key(&did);
9999- let metadata = state
100100- .db
101101- .repo_metadata
102102- .get(&metadata_key)
103103- .into_diagnostic()?
104104- .ok_or_else(|| miette::miette!("repo metadata not found for {}", did))?;
105105- let metadata = crate::db::deser_repo_metadata(metadata.as_ref())?;
106106- Ok((did, repo_state, metadata))
107107- })
108108- }
109109-110110- /// iterates through all repositories, returning their state.
111111- pub fn iter(&self, cursor: Option<&Did<'_>>) -> impl Iterator<Item = Result<RepoInfo>> {
112112- self.iter_states(cursor)
113113- .map(|r| r.map(|(did, s, m)| repo_state_to_info(did, s, m.tracked)))
114114- }
115115-116116- #[allow(dead_code)]
1177 /// iterates through pending repositories, returning their state.
118118- fn iter_pending(&self, cursor: Option<u64>) -> impl Iterator<Item = Result<(u64, RepoInfo)>> {
88+ #[allow(dead_code)]
99+ pub(crate) fn iter_pending(
1010+ &self,
1111+ cursor: Option<u64>,
1212+ ) -> impl Iterator<Item = Result<(u64, RepoInfo)>> {
11913 let start_bound = if let Some(cursor) = cursor {
12014 std::ops::Bound::Excluded(cursor.to_be_bytes().to_vec())
12115 } else {
···15145 .get(&metadata_key)
15246 .into_diagnostic()?
15347 .ok_or_else(|| miette::miette!("repo metadata not found for {}", did))?;
154154- let metadata = crate::db::deser_repo_metadata(metadata.as_ref())?;
4848+ let metadata = crate::db::deser_repo_meta(metadata.as_ref())?;
15549 Ok(Some((
15650 id,
15751 repo_state_to_info(did, repo_state.into_static(), metadata.tracked),
···16256 }
1635716458 #[allow(dead_code)]
165165- fn iter_resync(&self, cursor: Option<&Did<'_>>) -> impl Iterator<Item = Result<RepoInfo>> {
5959+ pub(crate) fn iter_resync(
6060+ &self,
6161+ cursor: Option<&Did<'_>>,
6262+ ) -> impl Iterator<Item = Result<RepoInfo>> {
16663 let start_bound = if let Some(cursor) = cursor {
16764 let did_key = keys::repo_key(cursor);
16865 std::ops::Bound::Excluded(did_key)
···19289 .get(&metadata_key)
19390 .into_diagnostic()?
19491 .ok_or_else(|| miette::miette!("repo metadata not found for {}", did))?;
195195- let metadata = crate::db::deser_repo_metadata(metadata.as_ref())?;
9292+ let metadata = crate::db::deser_repo_meta(metadata.as_ref())?;
19693 Ok(Some(repo_state_to_info(
19794 did,
19895 repo_state.into_static(),
···203100 .flatten()
204101 }
205102206206- /// gets a handle for a repository to read from it.
207207- pub fn get<'i>(&self, did: &Did<'i>) -> RepoHandle<'i> {
208208- RepoHandle {
209209- state: self.0.clone(),
210210- did: did.clone(),
211211- }
212212- }
213213-214214- /// same as [`ReposControl::get`] but allows you to pass in an identifier that can be
215215- /// either a handle or a DID.
216216- pub async fn resolve(&self, repo: &AtIdentifier<'_>) -> Result<RepoHandle<'static>> {
217217- let did = self.0.resolver.resolve_did(repo).await?;
218218- Ok(RepoHandle {
219219- state: self.0.clone(),
220220- did,
221221- })
222222- }
223223-224224- /// fetch the current state of a repository.
225225- /// returns `None` if hydrant has never seen this repository.
226226- pub async fn info(&self, did: &Did<'_>) -> Result<Option<RepoInfo>> {
227227- self.get(did).info().await
228228- }
229229-230230- fn _resync(
103103+ pub(crate) fn _resync(
231104 db: &Db,
232105 did: &Did<'_>,
233233- batch: &mut OwnedWriteBatch,
106106+ batch: &mut fjall::OwnedWriteBatch,
234107 transitions: &mut Vec<(GaugeState, GaugeState)>,
235108 ) -> Result<bool> {
236109 let did_key = keys::repo_key(did);
···248121 .get(&metadata_key)
249122 .into_diagnostic()?
250123 .ok_or_else(|| miette::miette!("repo metadata not found for {}", did))?;
251251- let mut metadata = crate::db::deser_repo_metadata(&metadata_bytes)?;
124124+ let mut metadata = crate::db::deser_repo_meta(&metadata_bytes)?;
252125253126 // skip if already in pending queue
254127 let is_pending = db
···269142 batch.insert(
270143 &db.repo_metadata,
271144 &metadata_key,
272272- crate::db::ser_repo_metadata(&metadata)?,
145145+ crate::db::ser_repo_meta(&metadata)?,
273146 );
274147 transitions.push((old, GaugeState::Pending));
275148 return Ok(true);
···341214 let mut added = 0i64;
342215 let mut queued: Vec<Did<'static>> = Vec::new();
343216 let mut transitions: Vec<(GaugeState, GaugeState)> = Vec::new();
344344- let mut rng = rand::rng();
345217346218 for did in dids {
347219 let did_key = keys::repo_key(&did);
···349221350222 let metadata_bytes = db.repo_metadata.get(&metadata_key).into_diagnostic()?;
351223 let existing_metadata = metadata_bytes
352352- .map(|b| crate::db::deser_repo_metadata(&b))
224224+ .map(|b| crate::db::deser_repo_meta(&b))
353225 .transpose()?;
354226355227 if let Some(metadata) = existing_metadata {
···358230 }
359231 } else {
360232 let repo_state = RepoState::backfilling();
361361- let metadata = RepoMetadata::backfilling(rng.next_u64());
233233+ let metadata = RepoMetadata::backfilling(rand::random());
362234 batch.insert(&db.repos, &did_key, crate::db::ser_repo_state(&repo_state)?);
363235 batch.insert(
364236 &db.repo_metadata,
365237 &metadata_key,
366366- crate::db::ser_repo_metadata(&metadata)?,
238238+ crate::db::ser_repo_meta(&metadata)?,
367239 );
368240 batch.insert(&db.pending, keys::pending_key(metadata.index_id), &did_key);
369241 added += 1;
···418290 if let Some(repo_state) = existing {
419291 let metadata_bytes = db.repo_metadata.get(&metadata_key).into_diagnostic()?;
420292 let existing_metadata = metadata_bytes
421421- .map(|b| crate::db::deser_repo_metadata(&b))
293293+ .map(|b| crate::db::deser_repo_meta(&b))
422294 .transpose()?;
423295424296 if let Some(mut metadata) = existing_metadata {
···429301 batch.insert(
430302 &db.repo_metadata,
431303 &metadata_key,
432432- crate::db::ser_repo_metadata(&metadata)?,
304304+ crate::db::ser_repo_meta(&metadata)?,
433305 );
434306 batch.remove(&db.pending, keys::pending_key(metadata.index_id));
435307 batch.remove(&db.resync, &did_key);
···459331 }
460332}
461333462462-pub(crate) fn repo_state_to_info(did: Did<'static>, s: RepoState<'_>, tracked: bool) -> RepoInfo {
463463- let (rev, data) = s
464464- .root
465465- .map(|c| (Some(c.rev.to_tid()), Some(c.data)))
466466- .unwrap_or_default();
467467- RepoInfo {
468468- did,
469469- status: s.status,
470470- tracked,
471471- rev,
472472- data,
473473- handle: s.handle.map(|h| h.into_static()),
474474- pds: s.pds.and_then(|p| p.parse().ok()),
475475- signing_key: s.signing_key.map(|k| k.into_static()),
476476- last_updated_at: DateTime::from_timestamp_secs(s.last_updated_at),
477477- last_message_at: s.last_message_time.and_then(DateTime::from_timestamp_secs),
478478- }
479479-}
480480-481481-pub struct Record {
482482- pub did: Did<'static>,
483483- pub cid: Cid<'static>,
484484- pub value: Data<'static>,
485485-}
486486-487487-pub struct ListedRecord {
488488- pub rkey: Rkey<'static>,
489489- pub cid: Cid<'static>,
490490- pub value: Data<'static>,
491491-}
492492-493493-pub struct RecordList {
494494- pub records: Vec<ListedRecord>,
495495- pub cursor: Option<Rkey<'static>>,
496496-}
497497-498498-#[derive(Debug, thiserror::Error)]
499499-pub enum MiniDocError {
500500- #[error("repo is not synced yet")]
501501- NotSynced,
502502- #[error("repo not found")]
503503- RepoNotFound,
504504- #[error("could not resolve identity")]
505505- CouldNotResolveIdentity,
506506- #[error("{0}")]
507507- Other(miette::Error),
508508-}
509509-510510-/// a mini doc with a bi-directionally verified handle.
511511-pub struct MiniDoc<'i> {
512512- /// the did.
513513- pub did: Did<'i>,
514514- /// the handle. if verification fails or no handle is found,
515515- /// this will be "handle.invalid".
516516- pub handle: Handle<'i>,
517517- /// the url of the PDS of this repo.
518518- pub pds: Url,
519519- /// the atproto signing key of this repo.
520520- pub signing_key: DidKey<'i>,
521521-}
522522-523523-/// handle to access data related to this repository.
524524-#[derive(Clone)]
525525-pub struct RepoHandle<'i> {
526526- state: Arc<AppState>,
527527- pub did: Did<'i>,
528528-}
529529-530334impl<'i> RepoHandle<'i> {
531531- pub(crate) async fn state(&self) -> Result<Option<RepoState<'static>>> {
532532- let did_key = keys::repo_key(&self.did);
533533- let app_state = self.state.clone();
534534-535535- tokio::task::spawn_blocking(move || {
536536- let bytes = app_state.db.repos.get(&did_key).into_diagnostic()?;
537537- bytes
538538- .as_deref()
539539- .map(db::deser_repo_state)
540540- .transpose()
541541- .map(|opt| opt.map(IntoStatic::into_static))
542542- })
543543- .await
544544- .into_diagnostic()?
545545- }
546546-547547- /// fetch the current state of this repository.
548548- /// returns `None` if hydrant has never seen this repository.
549549- pub async fn info(&self) -> Result<Option<RepoInfo>> {
550550- let did = self.did.clone().into_static();
551551- let did_key = keys::repo_key(&did);
552552- let metadata_key = keys::repo_metadata_key(&did);
553553- let app_state = self.state.clone();
554554-555555- tokio::task::spawn_blocking(move || {
556556- let state_bytes = app_state.db.repos.get(&did_key).into_diagnostic()?;
557557- let Some(state_bytes) = state_bytes else {
558558- return Ok(None);
559559- };
560560- let repo_state = crate::db::deser_repo_state(&state_bytes)?;
561561-562562- let metadata_bytes = app_state
563563- .db
564564- .repo_metadata
565565- .get(&metadata_key)
566566- .into_diagnostic()?
567567- .ok_or_else(|| miette::miette!("repo metadata not found for {}", did))?;
568568- let metadata = crate::db::deser_repo_metadata(&metadata_bytes)?;
569569-570570- Ok(Some(repo_state_to_info(did, repo_state, metadata.tracked)))
571571- })
572572- .await
573573- .into_diagnostic()?
574574- }
575575-576576- /// returns the collections of this repository and the number of records it has in each.
577577- pub async fn collections(&self) -> Result<HashMap<Nsid<'static>, u64>> {
578578- let did = self.did.clone().into_static();
579579- let state = self.state.clone();
580580-581581- tokio::task::spawn_blocking(move || {
582582- let prefix = keys::did_collection_prefix(&did);
583583- let mut res = HashMap::new();
584584- for item in state.db.counts.prefix(&prefix) {
585585- let (k, v) = item.into_inner().into_diagnostic()?;
586586- let col = k
587587- .strip_prefix(prefix.as_slice())
588588- .ok_or_else(|| miette::miette!("invalid collection count key: {k:?}"))
589589- .and_then(|r| std::str::from_utf8(r).into_diagnostic())
590590- .and_then(|n| Nsid::new(n).into_diagnostic())?
591591- .into_static();
592592- let count = u64::from_be_bytes(
593593- v.as_ref()
594594- .try_into()
595595- .into_diagnostic()
596596- .wrap_err("expected to be count (8 bytes)")?,
597597- );
598598- res.insert(col, count);
599599- }
600600- Ok(res)
601601- })
602602- .await
603603- .into_diagnostic()?
604604- }
605605-606606- /// returns a bi-directionally validated mini doc.
607607- pub async fn mini_doc(&self) -> Result<MiniDoc<'static>, MiniDocError> {
608608- let Some(info) = self.info().await.map_err(MiniDocError::Other)? else {
609609- return Err(MiniDocError::RepoNotFound);
610610- };
611611-612612- // check if repo is still backfilling (in pending)
613613- let metadata_key = keys::repo_metadata_key(&self.did);
614614- let app_state = self.state.clone();
615615-616616- let is_pending = tokio::task::spawn_blocking(move || {
617617- let metadata_bytes = app_state
618618- .db
619619- .repo_metadata
620620- .get(&metadata_key)
621621- .into_diagnostic()?;
622622- let Some(metadata_bytes) = metadata_bytes else {
623623- return Ok::<_, miette::Report>(false);
624624- };
625625- let metadata = crate::db::deser_repo_metadata(metadata_bytes.as_ref())?;
626626- Ok(app_state
627627- .db
628628- .pending
629629- .get(crate::db::keys::pending_key(metadata.index_id))
630630- .into_diagnostic()?
631631- .is_some())
632632- })
633633- .await
634634- .map_err(|e| MiniDocError::Other(miette::miette!(e)))?
635635- .map_err(MiniDocError::Other)?;
636636-637637- if is_pending {
638638- return Err(MiniDocError::NotSynced);
639639- }
640640-641641- let pds = info
642642- .pds
643643- .ok_or_else(|| MiniDocError::CouldNotResolveIdentity)?;
644644- let signing_key = info
645645- .signing_key
646646- .ok_or_else(|| MiniDocError::CouldNotResolveIdentity)?
647647- .into_static();
648648-649649- let handle = if let Some(h) = info.handle {
650650- let is_valid = self
651651- .state
652652- .resolver
653653- .verify_handle(&self.did, &h)
654654- .await
655655- .into_diagnostic()
656656- .map_err(MiniDocError::Other)?;
657657- is_valid.then_some(h).unwrap_or_else(invalid_handle)
658658- } else {
659659- invalid_handle()
660660- };
661661-662662- Ok(MiniDoc {
663663- did: self.did.clone().into_static(),
664664- handle,
665665- pds,
666666- signing_key,
667667- })
668668- }
669669-670335 /// gets a record from this repository.
671336 pub async fn get_record(&self, collection: &str, rkey: &str) -> Result<Option<Record>> {
672337 let did = self.did.clone().into_static();
+358
src/control/repos/mod.rs
···11+use std::collections::HashMap;
22+use std::sync::Arc;
33+44+use chrono::{DateTime, Utc};
55+use jacquard_common::cowstr::ToCowStr;
66+use jacquard_common::types::cid::{Cid, IpldCid};
77+use jacquard_common::types::ident::AtIdentifier;
88+use jacquard_common::types::nsid::Nsid;
99+use jacquard_common::types::string::{Did, Handle, Rkey};
1010+use jacquard_common::types::tid::Tid;
1111+use jacquard_common::{CowStr, Data, IntoStatic};
1212+use miette::{Context, IntoDiagnostic, Result, WrapErr};
1313+use smol_str::ToSmolStr;
1414+use url::Url;
1515+1616+use crate::db::types::{DbRkey, DidKey, TrimmedDid};
1717+use crate::db::{self, Db, keys};
1818+use crate::state::AppState;
1919+#[cfg(feature = "indexer")]
2020+use crate::types::GaugeState;
2121+use crate::types::{RepoMetadata, RepoState, RepoStatus};
2222+use crate::util::invalid_handle;
2323+2424+#[cfg(feature = "indexer")]
2525+mod indexer;
2626+2727+#[cfg(feature = "indexer")]
2828+pub use indexer::*;
2929+3030+/// information about a tracked or known repository. returned by [`ReposControl`] methods.
3131+#[derive(Debug, Clone, serde::Serialize)]
3232+pub struct RepoInfo {
3333+ /// the DID of the repository.
3434+ pub did: Did<'static>,
3535+ /// the status of the repository.
3636+ #[serde(serialize_with = "crate::util::repo_status_serialize_str")]
3737+ pub status: RepoStatus,
3838+ /// whether this repository is tracked or not.
3939+ /// untracked repositories are not updated and they stay frozen.
4040+ pub tracked: bool,
4141+ /// the revision of the root commit of this repository.
4242+ #[serde(skip_serializing_if = "Option::is_none")]
4343+ pub rev: Option<Tid>,
4444+ /// the CID of the MST root of this repository.
4545+ #[serde(serialize_with = "crate::util::opt_cid_serialize_str")]
4646+ #[serde(skip_serializing_if = "Option::is_none")]
4747+ pub data: Option<IpldCid>,
4848+ /// the handle for the DID of this repository.
4949+ ///
5050+ /// note that this handle is not bi-directionally verified.
5151+ #[serde(skip_serializing_if = "Option::is_none")]
5252+ pub handle: Option<Handle<'static>>,
5353+ /// the URL for the PDS in which this repository is hosted on.
5454+ #[serde(skip_serializing_if = "Option::is_none")]
5555+ pub pds: Option<Url>,
5656+ /// ATProto signing key of this repository.
5757+ #[serde(serialize_with = "crate::util::opt_did_key_serialize_str")]
5858+ #[serde(skip_serializing_if = "Option::is_none")]
5959+ pub signing_key: Option<DidKey<'static>>,
6060+ /// when this repository was last touched (status update, commit ingested, etc.).
6161+ #[serde(skip_serializing_if = "Option::is_none")]
6262+ pub last_updated_at: Option<DateTime<Utc>>,
6363+ /// the time of the last message gotten from the firehose for this repository.
6464+ /// this is equal to the `time` field.
6565+ #[serde(skip_serializing_if = "Option::is_none")]
6666+ pub last_message_at: Option<DateTime<Utc>>,
6767+}
6868+6969+/// control over which repositories are tracked and access to their state.
7070+///
7171+/// in `filter` mode, a repo is only indexed if it either matches a signal or is
7272+/// explicitly tracked via [`ReposControl::track`]. in `full` mode all repos are
7373+/// indexed and tracking is implicit.
7474+///
7575+/// tracking a DID that hydrant has never seen enqueues an immediate backfill.
7676+/// tracking a DID that hydrant already knows about (but has marked untracked)
7777+/// re-enqueues it for backfill.
7878+#[derive(Clone)]
7979+pub struct ReposControl(pub(super) Arc<AppState>);
8080+8181+impl ReposControl {
8282+ pub(crate) fn iter_states(
8383+ &self,
8484+ cursor: Option<&Did<'_>>,
8585+ ) -> impl Iterator<Item = Result<(Did<'static>, RepoState<'static>, crate::types::RepoMetadata)>>
8686+ {
8787+ let start_bound = if let Some(cursor) = cursor {
8888+ let did_key = keys::repo_key(cursor);
8989+ std::ops::Bound::Excluded(did_key)
9090+ } else {
9191+ std::ops::Bound::Unbounded
9292+ };
9393+9494+ let state = self.0.clone();
9595+ self.0
9696+ .db
9797+ .repos
9898+ .range((start_bound, std::ops::Bound::Unbounded))
9999+ .map(move |g| {
100100+ let (k, v) = g.into_inner().into_diagnostic()?;
101101+ let repo_state = crate::db::deser_repo_state(&v)?.into_static();
102102+ let did = TrimmedDid::try_from(k.as_ref())?.to_did();
103103+ let metadata_key = keys::repo_metadata_key(&did);
104104+ let metadata = state
105105+ .db
106106+ .repo_metadata
107107+ .get(&metadata_key)
108108+ .into_diagnostic()?
109109+ .ok_or_else(|| miette::miette!("repo metadata not found for {}", did))?;
110110+ let metadata = crate::db::deser_repo_meta(metadata.as_ref())?;
111111+ Ok((did, repo_state, metadata))
112112+ })
113113+ }
114114+115115+ /// iterates through all repositories, returning their state.
116116+ pub fn iter(&self, cursor: Option<&Did<'_>>) -> impl Iterator<Item = Result<RepoInfo>> {
117117+ self.iter_states(cursor)
118118+ .map(|r| r.map(|(did, s, m)| repo_state_to_info(did, s, m.tracked)))
119119+ }
120120+121121+ /// gets a handle for a repository to read from it.
122122+ pub fn get<'i>(&self, did: &Did<'i>) -> RepoHandle<'i> {
123123+ RepoHandle {
124124+ state: self.0.clone(),
125125+ did: did.clone(),
126126+ }
127127+ }
128128+129129+ /// same as [`ReposControl::get`] but allows you to pass in an identifier that can be
130130+ /// either a handle or a DID.
131131+ pub async fn resolve(&self, repo: &AtIdentifier<'_>) -> Result<RepoHandle<'static>> {
132132+ let did = self.0.resolver.resolve_did(repo).await?;
133133+ Ok(RepoHandle {
134134+ state: self.0.clone(),
135135+ did,
136136+ })
137137+ }
138138+139139+ /// fetch the current state of a repository.
140140+ /// returns `None` if hydrant has never seen this repository.
141141+ pub async fn info(&self, did: &Did<'_>) -> Result<Option<RepoInfo>> {
142142+ self.get(did).info().await
143143+ }
144144+}
145145+146146+pub(crate) fn repo_state_to_info(did: Did<'static>, s: RepoState<'_>, tracked: bool) -> RepoInfo {
147147+ let (rev, data) = s
148148+ .root
149149+ .map(|c| (Some(c.rev.to_tid()), Some(c.data)))
150150+ .unwrap_or_default();
151151+ RepoInfo {
152152+ did,
153153+ status: s.status,
154154+ tracked,
155155+ rev,
156156+ data,
157157+ handle: s.handle.map(|h| h.into_static()),
158158+ pds: s.pds.and_then(|p| p.parse().ok()),
159159+ signing_key: s.signing_key.map(|k| k.into_static()),
160160+ last_updated_at: DateTime::from_timestamp_secs(s.last_updated_at),
161161+ last_message_at: s.last_message_time.and_then(DateTime::from_timestamp_secs),
162162+ }
163163+}
164164+165165+pub struct Record {
166166+ pub did: Did<'static>,
167167+ pub cid: Cid<'static>,
168168+ pub value: Data<'static>,
169169+}
170170+171171+pub struct ListedRecord {
172172+ pub rkey: Rkey<'static>,
173173+ pub cid: Cid<'static>,
174174+ pub value: Data<'static>,
175175+}
176176+177177+pub struct RecordList {
178178+ pub records: Vec<ListedRecord>,
179179+ pub cursor: Option<Rkey<'static>>,
180180+}
181181+182182+#[derive(Debug, thiserror::Error)]
183183+pub enum MiniDocError {
184184+ #[error("repo is not synced yet")]
185185+ NotSynced,
186186+ #[error("repo not found")]
187187+ RepoNotFound,
188188+ #[error("could not resolve identity")]
189189+ CouldNotResolveIdentity,
190190+ #[error("{0}")]
191191+ Other(miette::Error),
192192+}
193193+194194+/// a mini doc with a bi-directionally verified handle.
195195+pub struct MiniDoc<'i> {
196196+ /// the did.
197197+ pub did: Did<'i>,
198198+ /// the handle. if verification fails or no handle is found,
199199+ /// this will be "handle.invalid".
200200+ pub handle: Handle<'i>,
201201+ /// the url of the PDS of this repo.
202202+ pub pds: Url,
203203+ /// the atproto signing key of this repo.
204204+ pub signing_key: DidKey<'i>,
205205+}
206206+207207+/// handle to access data related to this repository.
208208+#[derive(Clone)]
209209+pub struct RepoHandle<'i> {
210210+ state: Arc<AppState>,
211211+ pub did: Did<'i>,
212212+}
213213+214214+impl<'i> RepoHandle<'i> {
215215+ pub(crate) async fn state(&self) -> Result<Option<RepoState<'static>>> {
216216+ let did_key = keys::repo_key(&self.did);
217217+ let app_state = self.state.clone();
218218+219219+ tokio::task::spawn_blocking(move || {
220220+ let bytes = app_state.db.repos.get(&did_key).into_diagnostic()?;
221221+ bytes
222222+ .as_deref()
223223+ .map(db::deser_repo_state)
224224+ .transpose()
225225+ .map(|opt| opt.map(IntoStatic::into_static))
226226+ })
227227+ .await
228228+ .into_diagnostic()?
229229+ }
230230+231231+ /// fetch the current state of this repository.
232232+ /// returns `None` if hydrant has never seen this repository.
233233+ pub async fn info(&self) -> Result<Option<RepoInfo>> {
234234+ let did = self.did.clone().into_static();
235235+ let did_key = keys::repo_key(&did);
236236+ let metadata_key = keys::repo_metadata_key(&did);
237237+ let app_state = self.state.clone();
238238+239239+ tokio::task::spawn_blocking(move || {
240240+ let state_bytes = app_state.db.repos.get(&did_key).into_diagnostic()?;
241241+ let Some(state_bytes) = state_bytes else {
242242+ return Ok(None);
243243+ };
244244+ let repo_state = crate::db::deser_repo_state(&state_bytes)?;
245245+246246+ let metadata_bytes = app_state
247247+ .db
248248+ .repo_metadata
249249+ .get(&metadata_key)
250250+ .into_diagnostic()?
251251+ .ok_or_else(|| miette::miette!("repo metadata not found for {}", did))?;
252252+ let metadata = crate::db::deser_repo_meta(&metadata_bytes)?;
253253+254254+ Ok(Some(repo_state_to_info(did, repo_state, metadata.tracked)))
255255+ })
256256+ .await
257257+ .into_diagnostic()?
258258+ }
259259+260260+ /// returns the collections of this repository and the number of records it has in each.
261261+ pub async fn collections(&self) -> Result<HashMap<Nsid<'static>, u64>> {
262262+ let did = self.did.clone().into_static();
263263+ let state = self.state.clone();
264264+265265+ tokio::task::spawn_blocking(move || {
266266+ let prefix = keys::did_collection_prefix(&did);
267267+ let mut res = HashMap::new();
268268+ for item in state.db.counts.prefix(&prefix) {
269269+ let (k, v) = item.into_inner().into_diagnostic()?;
270270+ let col = k
271271+ .strip_prefix(prefix.as_slice())
272272+ .ok_or_else(|| miette::miette!("invalid collection count key: {k:?}"))
273273+ .and_then(|r| std::str::from_utf8(r).into_diagnostic())
274274+ .and_then(|n| Nsid::new(n).into_diagnostic())?
275275+ .into_static();
276276+ let count = u64::from_be_bytes(
277277+ v.as_ref()
278278+ .try_into()
279279+ .into_diagnostic()
280280+ .wrap_err("expected to be count (8 bytes)")?,
281281+ );
282282+ res.insert(col, count);
283283+ }
284284+ Ok(res)
285285+ })
286286+ .await
287287+ .into_diagnostic()?
288288+ }
289289+290290+ /// returns a bi-directionally validated mini doc.
291291+ pub async fn mini_doc(&self) -> Result<MiniDoc<'static>, MiniDocError> {
292292+ let Some(info) = self.info().await.map_err(MiniDocError::Other)? else {
293293+ return Err(MiniDocError::RepoNotFound);
294294+ };
295295+296296+ // check if repo is still backfilling (in pending)
297297+ #[cfg(feature = "indexer")]
298298+ let is_pending = {
299299+ let metadata_key = keys::repo_metadata_key(&self.did);
300300+ let app_state = self.state.clone();
301301+ tokio::task::spawn_blocking(move || {
302302+ let metadata_bytes = app_state
303303+ .db
304304+ .repo_metadata
305305+ .get(&metadata_key)
306306+ .into_diagnostic()?;
307307+ let Some(metadata_bytes) = metadata_bytes else {
308308+ return Ok::<_, miette::Report>(false);
309309+ };
310310+311311+ let metadata = crate::db::deser_repo_meta(metadata_bytes.as_ref())?;
312312+ return Ok(app_state
313313+ .db
314314+ .pending
315315+ .get(crate::db::keys::pending_key(metadata.index_id))
316316+ .into_diagnostic()?
317317+ .is_some());
318318+ })
319319+ .await
320320+ .map_err(|e| MiniDocError::Other(miette::miette!(e)))?
321321+ .map_err(MiniDocError::Other)?
322322+ };
323323+ #[cfg(feature = "relay")]
324324+ let is_pending = false;
325325+326326+ if is_pending {
327327+ return Err(MiniDocError::NotSynced);
328328+ }
329329+330330+ let pds = info
331331+ .pds
332332+ .ok_or_else(|| MiniDocError::CouldNotResolveIdentity)?;
333333+ let signing_key = info
334334+ .signing_key
335335+ .ok_or_else(|| MiniDocError::CouldNotResolveIdentity)?
336336+ .into_static();
337337+338338+ let handle = if let Some(h) = info.handle {
339339+ let is_valid = self
340340+ .state
341341+ .resolver
342342+ .verify_handle(&self.did, &h)
343343+ .await
344344+ .into_diagnostic()
345345+ .map_err(MiniDocError::Other)?;
346346+ is_valid.then_some(h).unwrap_or_else(invalid_handle)
347347+ } else {
348348+ invalid_handle()
349349+ };
350350+351351+ Ok(MiniDoc {
352352+ did: self.did.clone().into_static(),
353353+ handle,
354354+ pds,
355355+ signing_key,
356356+ })
357357+ }
358358+}
+1-1
src/control/seed.rs
···5959 let ks = state.db.cursors.clone();
6060 let key = cursor_key.clone();
6161 match db::Db::get(ks, key).await {
6262- Ok(Some(b)) => rmp_serde::from_slice::<String>(&b).ok(),
6262+ Ok(Some(b)) => rmp_serde::from_slice::<String>(b.as_ref()).ok(),
6363 Ok(None) => None,
6464 Err(e) => {
6565 warn!(err = %e, "failed to load seed cursor, starting from scratch");
···304304 ..
305305 } = validated;
306306307307+ #[cfg(not(feature = "indexer"))]
308308+ let _ = parsed_blocks;
309309+307310 if chain_break.is_broken() {
308311 // chain breaks are not grounds for blocking when acting as a relay
309312 debug!(broken = ?chain_break, "chain break, forwarding anyway");
···782785 .repo_metadata
783786 .get(&metadata_key)
784787 .into_diagnostic()?
785785- .map(|bytes| db::deser_repo_metadata(&bytes))
788788+ .map(|bytes| db::deser_repo_meta(&bytes))
786789 .transpose()?;
787790788791 if metadata.map_or(false, |m| !m.tracked) {
+1
src/lib.rs
···1313pub(crate) mod backfill;
1414#[cfg(feature = "backlinks")]
1515pub(crate) mod backlinks;
1616+#[cfg(feature = "indexer")]
1617pub(crate) mod crawler;
1718pub(crate) mod db;
1819pub(crate) mod ingest;
+2-2
src/ops.rs
···72727373 let metadata_bytes = db.repo_metadata.get(&metadata_key).into_diagnostic()?;
7474 if let Some(metadata_bytes) = metadata_bytes {
7575- let metadata = db::deser_repo_metadata(&metadata_bytes)?;
7575+ let metadata = db::deser_repo_meta(&metadata_bytes)?;
7676 batch.remove(&db.pending, keys::pending_key(metadata.index_id));
7777 }
7878···131131132132 let metadata_bytes = db.repo_metadata.get(&metadata_key).into_diagnostic()?;
133133 if let Some(metadata_bytes) = metadata_bytes {
134134- let metadata = db::deser_repo_metadata(&metadata_bytes)?;
134134+ let metadata = db::deser_repo_meta(&metadata_bytes)?;
135135 let pending_key = keys::pending_key(metadata.index_id);
136136137137 // manage queues