···1111`hydrant` is an AT Protocol indexer built on the `fjall` database. it's built to
1212be flexible, supporting both full-network indexing and filtered indexing (e.g.,
1313by DID), allowing querying with XRPCs (not only `com.atproto.*`!), providing an
1414-ordered event stream, etc.
1414+ordered event stream, etc. oh and it can also act as a relay!
15151616you can see
1717[random.wisp.place](https://tangled.org/did:plc:dfl62fgb7wtjj3fcbb72naae/random.wisp.place)
+12-7
src/api/debug.rs
···205205 "invalid_u64".to_string()
206206 }
207207 } else if partition == "blocks" {
208208- // key is col|cid_bytes — show as "col|<cid_str>"
208208+ // key is col|cid_bytes, show as "col|<cid_str>"
209209 if let Some(sep) = k.iter().position(|&b| b == keys::SEP) {
210210 let col = String::from_utf8_lossy(&k[..sep]);
211211 match cid::Cid::read_bytes(&k[sep + 1..]) {
···303303 State(state): State<Arc<AppState>>,
304304) -> Result<StatusCode, StatusCode> {
305305 tokio::task::spawn_blocking(move || {
306306- crate::db::ephemeral::ephemeral_ttl_tick(&state.db, &state.ephemeral_ttl)
306306+ #[cfg(feature = "indexer")]
307307+ let res = crate::db::ephemeral::ephemeral_ttl_tick(&state.db, &state.ephemeral_ttl);
308308+ #[cfg(feature = "relay")]
309309+ let res = crate::db::ephemeral::relay_events_ttl_tick(&state.db, &state.ephemeral_ttl);
310310+ res
307311 })
308312 .await
309313 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
···316320pub struct DebugSeedWatermarkRequest {
317321 /// unix timestamp (seconds) to write the watermark at
318322 pub ts: u64,
319319- /// event_id the watermark points to — all events before this id will be pruned
323323+ /// event_id the watermark points to, all events before this id will be pruned
320324 pub event_id: u64,
321325}
322326···328332 Query(req): Query<DebugSeedWatermarkRequest>,
329333) -> Result<StatusCode, StatusCode> {
330334 tokio::task::spawn_blocking(move || {
335335+ #[cfg(feature = "indexer")]
336336+ let key = crate::db::keys::event_watermark_key(req.ts);
337337+ #[cfg(feature = "relay")]
338338+ let key = crate::db::keys::relay_event_watermark_key(req.ts);
331339 state
332340 .db
333341 .cursors
334334- .insert(
335335- crate::db::keys::event_watermark_key(req.ts),
336336- req.event_id.to_be_bytes(),
337337- )
342342+ .insert(key, req.event_id.to_be_bytes())
338343 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)
339344 })
340345 .await
···2929 });
3030 };
31313232- let (active, status) = repo_status_to_api(state.status);
3232+ let status = repo_status_to_api(state.status);
33333434 // rev is only meaningful when the repo is active and has been synced at least once
3535- let rev = active.then(|| state.root.map(|c| c.rev.to_tid())).flatten();
3535+ let rev = state
3636+ .active
3737+ .then(|| state.root.map(|c| c.rev.to_tid()))
3838+ .flatten();
36393740 Ok(Json(GetRepoStatusOutput {
3838- active,
4141+ active: state.active,
3942 did: req.did,
4043 rev,
4144 status: status.map(|s| match s {
···5154 }))
5255}
53565454-pub(super) fn repo_status_to_api(status: RepoStatus) -> (bool, Option<ApiRepoStatus<'static>>) {
5757+pub(super) fn repo_status_to_api(status: RepoStatus) -> Option<ApiRepoStatus<'static>> {
5558 match status {
5656- RepoStatus::Synced => (true, None),
5757- RepoStatus::Deactivated => (false, Some(ApiRepoStatus::Deactivated)),
5858- RepoStatus::Takendown => (false, Some(ApiRepoStatus::Takendown)),
5959- RepoStatus::Suspended => (false, Some(ApiRepoStatus::Suspended)),
6060- // we lost sync with this repo! report desynchronized
6161- // technicalllyyyy backfilling can mean the repo is active
6262- // because we are syncing it from the pds, but like also it is currently
6363- // desync'ed so...
6464- RepoStatus::Backfilling | RepoStatus::Error(_) => {
6565- (false, Some(ApiRepoStatus::Desynchronized))
6666- }
5959+ RepoStatus::Synced => None,
6060+ RepoStatus::Deactivated => Some(ApiRepoStatus::Deactivated),
6161+ RepoStatus::Takendown => Some(ApiRepoStatus::Takendown),
6262+ RepoStatus::Suspended => Some(ApiRepoStatus::Suspended),
6363+ RepoStatus::Deleted => Some(ApiRepoStatus::Deleted),
6464+ // per spec, desynchronized and throttled have active=may-be-true
6565+ RepoStatus::Desynchronized => Some(ApiRepoStatus::Desynchronized),
6666+ RepoStatus::Throttled => Some(ApiRepoStatus::Throttled),
6767+ RepoStatus::Error(_) => Some(ApiRepoStatus::Desynchronized),
6768 }
6869}
+3-3
src/api/xrpc/list_repos.rs
···3131 let mut next_cursor: Option<Did<'static>> = None;
32323333 for item in hydrant.repos.iter_states(cursor.as_ref()) {
3434- let (did, state) = item?;
3434+ let (did, state, _metadata) = item?;
35353636 // skip repos that haven't been synced at least once
3737 let Some(commit) = state.root else {
···4848 continue;
4949 };
50505151- let (active, status) = repo_status_to_api(state.status);
5151+ let status = repo_status_to_api(state.status);
5252 repos.push(Repo {
5353- active: Some(active),
5353+ active: Some(state.active),
5454 did: did.clone(),
5555 head: Cid::from(commit_cid),
5656 rev: atp_commit.rev,
···11-use super::*;
22-use crate::db::{self, keys};
33-use crate::filter::FilterMode;
44-use crate::ingest::stream::{Account, Commit, Identity, SubscribeReposMessage, Sync};
55-use crate::ingest::validation::{
66- CommitValidationError, SyncValidationError, ValidatedCommit, ValidatedSync, ValidationContext,
77- ValidationOptions,
88-};
99-use crate::ops;
1010-use crate::resolver::{NoSigningKeyError, ResolverError};
1111-use crate::state::AppState;
1212-use crate::types::{AccountEvt, BroadcastEvent, GaugeState, IdentityEvt, RepoState, RepoStatus};
1313-1414-use fjall::OwnedWriteBatch;
1515-1616-use jacquard_common::IntoStatic;
1717-use jacquard_common::cowstr::ToCowStr;
1818-use jacquard_common::types::did::Did;
1919-use jacquard_repo::error::CommitError;
2020-use miette::{Diagnostic, IntoDiagnostic, Result};
2121-use rand::Rng;
2222-use std::collections::hash_map::DefaultHasher;
2323-use std::hash::{Hash, Hasher};
2424-use std::sync::Arc;
2525-use std::sync::atomic::Ordering::SeqCst;
2626-use thiserror::Error;
2727-use tokio::runtime::Handle;
2828-use tokio::sync::mpsc;
2929-use tracing::{debug, error, info, trace, warn};
3030-3131-#[derive(Debug, Diagnostic, Error)]
3232-enum IngestError {
3333- #[error("{0}")]
3434- Generic(miette::Report),
3535-3636- #[error(transparent)]
3737- #[diagnostic(transparent)]
3838- Resolver(#[from] ResolverError),
3939-4040- #[error(transparent)]
4141- #[diagnostic(transparent)]
4242- Commit(#[from] CommitError),
4343-4444- #[error(transparent)]
4545- #[diagnostic(transparent)]
4646- NoSigningKey(#[from] NoSigningKeyError),
4747-}
4848-4949-impl From<miette::Report> for IngestError {
5050- fn from(report: miette::Report) -> Self {
5151- IngestError::Generic(report)
5252- }
5353-}
5454-5555-// gate returned by check_repo_state, tells the shard loop what to do with the message
5656-enum ProcessGate<'s, 'c> {
5757- // did not exist in db, newly queued for backfill, drop
5858- NewRepo,
5959- // explicitly untracked, backfilling, or in error, drop
6060- Drop,
6161- // inactive repo receiving a non-account message, buffer the commit if present, drop otherwise
6262- Buffer(Option<&'c Commit<'c>>),
6363- // ready to process with the latest state
6464- Ready(RepoState<'s>),
6565-}
6666-6767-// result returned by a message handler after the gate has been resolved
6868-#[derive(Debug)]
6969-enum RepoProcessResult<'s, 'c> {
7070- // message processed successfully, here is the (possibly updated) state
7171- Ok(RepoState<'s>),
7272- // repo was deleted as part of processing
7373- Deleted,
7474- // needs backfill; carries the triggering commit to buffer (None when already in the buffer)
7575- NeedsBackfill(Option<&'c Commit<'c>>),
7676-}
7777-7878-pub struct FirehoseWorker {
7979- state: Arc<AppState>,
8080- rx: BufferRx,
8181- verify_signatures: bool,
8282- ephemeral: bool,
8383- num_shards: usize,
8484- validation_opts: Arc<ValidationOptions>,
8585-}
8686-8787-struct WorkerContext<'a> {
8888- verify_signatures: bool,
8989- ephemeral: bool,
9090- state: &'a AppState,
9191- batch: OwnedWriteBatch,
9292- added_blocks: &'a mut i64,
9393- records_delta: &'a mut i64,
9494- broadcast_events: &'a mut Vec<BroadcastEvent>,
9595- vctx: ValidationContext<'a>,
9696-}
9797-9898-impl FirehoseWorker {
9999- pub fn new(
100100- state: Arc<AppState>,
101101- rx: BufferRx,
102102- verify_signatures: bool,
103103- ephemeral: bool,
104104- num_shards: usize,
105105- validation_opts: ValidationOptions,
106106- ) -> Self {
107107- Self {
108108- state,
109109- rx,
110110- verify_signatures,
111111- ephemeral,
112112- num_shards,
113113- validation_opts: Arc::new(validation_opts),
114114- }
115115- }
116116-117117- // starts the worker threads and the main dispatch loop
118118- // the dispatch loop reads from the firehose channel and
119119- // distributes messages to shards based on the hash of the DID
120120- pub fn run(mut self, handle: Handle) -> Result<()> {
121121- let mut shards = Vec::with_capacity(self.num_shards);
122122-123123- for i in 0..self.num_shards {
124124- // unbounded here so we dont block other shards potentially
125125- // if one has a small lag or something
126126- let (tx, rx) = mpsc::unbounded_channel();
127127- shards.push(tx);
128128-129129- let state = self.state.clone();
130130- let verify = self.verify_signatures;
131131- let ephemeral = self.ephemeral;
132132- let handle = handle.clone();
133133- let validation_opts = self.validation_opts.clone();
134134-135135- std::thread::Builder::new()
136136- .name(format!("ingest-shard-{i}"))
137137- .spawn(move || {
138138- Self::shard(i, rx, state, verify, ephemeral, handle, validation_opts);
139139- })
140140- .into_diagnostic()?;
141141- }
142142-143143- info!(num = self.num_shards, "started shards");
144144-145145- let _g = handle.enter();
146146-147147- // dispatch loop
148148- while let Some(msg) = self.rx.blocking_recv() {
149149- let did = match &msg {
150150- IngestMessage::Firehose { msg: m, .. } => match m {
151151- SubscribeReposMessage::Commit(c) => &c.repo,
152152- SubscribeReposMessage::Identity(i) => &i.did,
153153- SubscribeReposMessage::Account(a) => &a.did,
154154- SubscribeReposMessage::Sync(s) => &s.did,
155155- _ => continue,
156156- },
157157- IngestMessage::BackfillFinished(did) => did,
158158- };
159159-160160- // todo: consider using a different hasher?
161161- let mut hasher = DefaultHasher::new();
162162- did.hash(&mut hasher);
163163- let hash = hasher.finish();
164164- let shard_idx = (hash as usize) % self.num_shards;
165165-166166- if let Err(e) = shards[shard_idx].send(msg) {
167167- error!(shard = shard_idx, err = %e, "failed to send message to shard, shard panicked?");
168168- break;
169169- }
170170- }
171171-172172- Err(miette::miette!(
173173- "firehose worker dispatcher shutting down, shard died?"
174174- ))
175175- }
176176-177177- #[inline(always)]
178178- fn shard(
179179- id: usize,
180180- mut rx: mpsc::UnboundedReceiver<IngestMessage>,
181181- state: Arc<AppState>,
182182- verify_signatures: bool,
183183- ephemeral: bool,
184184- handle: Handle,
185185- validation_opts: Arc<ValidationOptions>,
186186- ) {
187187- let _guard = handle.enter();
188188- debug!(shard = id, "shard started");
189189-190190- let mut broadcast_events = Vec::new();
191191-192192- while let Some(msg) = rx.blocking_recv() {
193193- let batch = state.db.inner.batch();
194194- broadcast_events.clear();
195195-196196- let mut added_blocks = 0;
197197- let mut records_delta = 0;
198198-199199- let mut ctx = WorkerContext {
200200- state: &state,
201201- batch,
202202- added_blocks: &mut added_blocks,
203203- records_delta: &mut records_delta,
204204- broadcast_events: &mut broadcast_events,
205205- vctx: ValidationContext {
206206- opts: &validation_opts,
207207- },
208208- verify_signatures,
209209- ephemeral,
210210- };
211211-212212- match msg {
213213- IngestMessage::BackfillFinished(did) => {
214214- debug!(did = %did, "backfill finished, verifying state and draining buffer");
215215-216216- let repo_key = keys::repo_key(&did);
217217- if let Ok(Some(state_bytes)) = state.db.repos.get(&repo_key).into_diagnostic() {
218218- match crate::db::deser_repo_state(&state_bytes) {
219219- Ok(repo_state) => {
220220- let repo_state = repo_state.into_static();
221221-222222- match Self::drain_resync_buffer(&mut ctx, &did, repo_state) {
223223- Ok(RepoProcessResult::Ok(s)) => {
224224- // TODO: there might be a race condition here where we get a new commit
225225- // while the resync buffer is being drained, we should handle that probably
226226- // but also it should still be fine since we'll sync eventually anyway
227227- let res = ops::update_repo_status(
228228- &mut ctx.batch,
229229- &state.db,
230230- &did,
231231- s,
232232- RepoStatus::Synced,
233233- );
234234- if let Err(e) = res {
235235- // this can only fail if serde retry fails which would be really weird
236236- error!(did = %did, err = %e, "failed to transition to synced");
237237- }
238238- }
239239- // we don't have to handle this since drain_resync_buffer doesn't delete
240240- // the commits from the resync buffer so they will get retried later
241241- Ok(RepoProcessResult::NeedsBackfill(_)) => {}
242242- Ok(RepoProcessResult::Deleted) => {}
243243- Err(e) => {
244244- error!(did = %did, err = %e, "failed to drain resync buffer")
245245- }
246246- };
247247- }
248248- Err(e) => error!(did = %did, err = %e, "failed to deser repo state"),
249249- }
250250- }
251251- }
252252- IngestMessage::Firehose {
253253- relay: firehose,
254254- is_pds,
255255- msg,
256256- } => {
257257- let _span = tracing::info_span!("firehose", relay = %firehose).entered();
258258- let (did, seq) = match &msg {
259259- SubscribeReposMessage::Commit(c) => (&c.repo, c.seq),
260260- SubscribeReposMessage::Identity(i) => (&i.did, i.seq),
261261- SubscribeReposMessage::Account(a) => (&a.did, a.seq),
262262- SubscribeReposMessage::Sync(s) => (&s.did, s.seq),
263263- _ => continue,
264264- };
265265-266266- let gate = match Self::check_repo_state(&mut ctx, did, &msg) {
267267- Ok(g) => g,
268268- Err(e) => {
269269- if let IngestError::Generic(ref r) = e {
270270- db::check_poisoned_report(r);
271271- }
272272- error!(did = %did, err = %e, "error in check_repo_state");
273273- state
274274- .firehose_cursors
275275- .peek_with(&firehose, |_, c| c.store(seq, SeqCst));
276276- continue;
277277- }
278278- };
279279-280280- match gate {
281281- ProcessGate::NewRepo | ProcessGate::Drop => {}
282282- ProcessGate::Buffer(commit) => {
283283- if let Some(commit) = commit {
284284- if let Err(e) =
285285- ops::persist_to_resync_buffer(&state.db, did, commit)
286286- {
287287- error!(
288288- did = %did, err = %e,
289289- "failed to persist commit to resync_buffer"
290290- );
291291- }
292292- }
293293- }
294294- ProcessGate::Ready(mut repo_state) => {
295295- // first validate the pds host
296296- if let Some(host) = firehose.host_str()
297297- && is_pds
298298- {
299299- let authority = match Self::check_host_authority(
300300- &mut ctx,
301301- did,
302302- &mut repo_state,
303303- host,
304304- ) {
305305- Ok(a) => a,
306306- Err(e) => {
307307- error!(did = %did, err = %e, "failed to check host authority");
308308- state
309309- .firehose_cursors
310310- .peek_with(&firehose, |_, c| c.store(seq, SeqCst));
311311- continue;
312312- }
313313- };
314314- match authority {
315315- AuthorityOutcome::Authorized => {}
316316- AuthorityOutcome::WasStale => {
317317- // pds migrated: our data may be stale, backfill from the new host
318318- warn!(did = %did, source_host = host, "pds migration detected, triggering backfill");
319319- if let Err(e) =
320320- Self::trigger_backfill(&mut ctx, did, repo_state)
321321- {
322322- error!(did = %did, err = %e, "failed to trigger backfill");
323323- } else if let SubscribeReposMessage::Commit(commit) = &msg {
324324- if let Err(e) = ops::persist_to_resync_buffer(
325325- &state.db, did, commit,
326326- ) {
327327- error!(
328328- did = %did, err = %e,
329329- "failed to persist commit to resync_buffer"
330330- );
331331- }
332332- }
333333- state
334334- .firehose_cursors
335335- .peek_with(&firehose, |_, c| c.store(seq, SeqCst));
336336- continue;
337337- }
338338- // todo: ideally ban pds
339339- AuthorityOutcome::WrongHost { expected } => {
340340- warn!(did = %did, got = host, expected = %expected, "commit rejected: wrong host");
341341- state
342342- .firehose_cursors
343343- .peek_with(&firehose, |_, c| c.store(seq, SeqCst));
344344- continue;
345345- }
346346- }
347347- }
348348-349349- let pre_status = repo_state.status.clone();
350350-351351- // if it was in deactivated/takendown/suspended state, we can mark it
352352- // as synced because we are receiving an active=true account event now.
353353- // we do this before dispatching so handle_account sees pre_status correctly
354354- if matches!(
355355- pre_status,
356356- RepoStatus::Deactivated
357357- | RepoStatus::Suspended
358358- | RepoStatus::Takendown
359359- ) {
360360- if let SubscribeReposMessage::Account(acc) = &msg {
361361- if acc.active {
362362- match ops::update_repo_status(
363363- &mut ctx.batch,
364364- &ctx.state.db,
365365- did,
366366- repo_state,
367367- RepoStatus::Synced,
368368- ) {
369369- Ok(rs) => {
370370- repo_state = rs;
371371- ctx.state.db.update_gauge_diff(
372372- &GaugeState::Resync(None),
373373- &GaugeState::Synced,
374374- );
375375- }
376376- Err(e) => {
377377- error!(
378378- did = %did, err = %e,
379379- "failed to transition inactive repo to synced"
380380- );
381381- state
382382- .firehose_cursors
383383- .peek_with(&firehose, |_, c| {
384384- c.store(seq, SeqCst)
385385- });
386386- continue;
387387- }
388388- }
389389- }
390390- }
391391- }
392392-393393- match Self::process_message(&mut ctx, &msg, did, repo_state, pre_status)
394394- {
395395- Ok(RepoProcessResult::Ok(_)) => {}
396396- Ok(RepoProcessResult::Deleted) => {
397397- state.db.update_count("repos", -1);
398398- }
399399- Ok(RepoProcessResult::NeedsBackfill(Some(commit))) => {
400400- if let Err(e) =
401401- ops::persist_to_resync_buffer(&state.db, did, commit)
402402- {
403403- error!(
404404- did = %did, err = %e,
405405- "failed to persist commit to resync_buffer"
406406- );
407407- }
408408- }
409409- Ok(RepoProcessResult::NeedsBackfill(None)) => {}
410410- Err(e) => {
411411- if let IngestError::Generic(ref r) = e {
412412- db::check_poisoned_report(r);
413413- }
414414- error!(did = %did, err = %e, "error processing message");
415415- if Self::check_if_retriable_failure(&e) {
416416- if let SubscribeReposMessage::Commit(commit) = &msg {
417417- if let Err(e) = ops::persist_to_resync_buffer(
418418- &state.db, did, commit,
419419- ) {
420420- error!(
421421- did = %did, err = %e,
422422- "failed to persist commit to resync_buffer"
423423- );
424424- }
425425- }
426426- }
427427- }
428428- }
429429- }
430430- }
431431-432432- state
433433- .firehose_cursors
434434- .peek_with(&firehose, |_, c| c.store(seq, SeqCst));
435435- }
436436- }
437437-438438- if let Err(e) = ctx.batch.commit() {
439439- error!(shard = id, err = %e, "failed to commit batch");
440440- }
441441-442442- if added_blocks > 0 {
443443- state.db.update_count("blocks", added_blocks);
444444- }
445445- if records_delta != 0 {
446446- state.db.update_count("records", records_delta);
447447- }
448448- for evt in broadcast_events.drain(..) {
449449- let _ = state.db.event_tx.send(evt);
450450- }
451451-452452- // state.db.inner.persist(fjall::PersistMode::Buffer).ok();
453453- }
454454- }
455455-456456- // don't retry commit or sync on key fetch errors
457457- // since we'll just try again later if we get commit or sync again
458458- fn check_if_retriable_failure(e: &IngestError) -> bool {
459459- matches!(
460460- e,
461461- IngestError::Generic(_)
462462- | IngestError::Resolver(ResolverError::Ratelimited)
463463- | IngestError::Resolver(ResolverError::Transport(_))
464464- )
465465- }
466466-467467- fn process_message<'s, 'c>(
468468- ctx: &mut WorkerContext,
469469- msg: &'c SubscribeReposMessage<'static>,
470470- did: &Did,
471471- repo_state: RepoState<'s>,
472472- pre_status: RepoStatus,
473473- ) -> Result<RepoProcessResult<'s, 'c>, IngestError> {
474474- match msg {
475475- SubscribeReposMessage::Commit(commit) => {
476476- trace!(did = %did, "processing commit");
477477- Self::handle_commit(ctx, did, repo_state, commit)
478478- }
479479- SubscribeReposMessage::Sync(sync) => {
480480- debug!(did = %did, "processing sync");
481481- Self::handle_sync(ctx, did, repo_state, sync)
482482- }
483483- SubscribeReposMessage::Identity(identity) => {
484484- debug!(did = %did, "processing identity");
485485- Self::handle_identity(ctx, did, repo_state, identity)
486486- }
487487- SubscribeReposMessage::Account(account) => {
488488- debug!(did = %did, "processing account");
489489- Self::handle_account(ctx, did, repo_state, pre_status, account)
490490- }
491491- _ => {
492492- warn!(did = %did, "unknown message type in buffer");
493493- Ok(RepoProcessResult::Ok(repo_state))
494494- }
495495- }
496496- }
497497-498498- fn handle_commit<'s, 'c>(
499499- ctx: &mut WorkerContext,
500500- did: &Did,
501501- mut repo_state: RepoState<'s>,
502502- commit: &'c Commit<'c>,
503503- ) -> Result<RepoProcessResult<'s, 'c>, IngestError> {
504504- repo_state.advance_message_time(commit.time.0.timestamp_millis());
505505-506506- let Some(validated) = ctx.validate_commit(did, &mut repo_state, commit)? else {
507507- return Ok(RepoProcessResult::Ok(repo_state));
508508- };
509509-510510- if validated.chain_break.is_broken() {
511511- warn!(
512512- did = %did,
513513- broken = ?validated.chain_break,
514514- "chain break detected, triggering backfill"
515515- );
516516- Self::trigger_backfill(ctx, did, repo_state)?;
517517- // not updating repo state root commit since we are backfilling anyway
518518- return Ok(RepoProcessResult::NeedsBackfill(Some(commit)));
519519- }
520520-521521- let res = ops::apply_commit(
522522- &mut ctx.batch,
523523- &ctx.state.db,
524524- repo_state,
525525- validated,
526526- &ctx.state.filter.load(),
527527- ctx.ephemeral,
528528- )?;
529529- let repo_state = res.repo_state;
530530- *ctx.added_blocks += res.blocks_count;
531531- *ctx.records_delta += res.records_delta;
532532- ctx.broadcast_events.push(BroadcastEvent::Persisted(
533533- ctx.state.db.next_event_id.load(SeqCst) - 1,
534534- ));
535535-536536- Ok(RepoProcessResult::Ok(repo_state))
537537- }
538538-539539- fn handle_sync<'s, 'c>(
540540- ctx: &mut WorkerContext,
541541- did: &Did,
542542- mut repo_state: RepoState<'s>,
543543- sync: &'c Sync<'c>,
544544- ) -> Result<RepoProcessResult<'s, 'c>, IngestError> {
545545- repo_state.advance_message_time(sync.time.0.timestamp_millis());
546546-547547- let Some(validated) = ctx.validate_sync(did, &mut repo_state, sync)? else {
548548- return Ok(RepoProcessResult::Ok(repo_state));
549549- };
550550-551551- // skip noop syncs (data CID unchanged)
552552- if let Some(current_commit) = &repo_state.root {
553553- if current_commit.data == validated.commit_obj.data {
554554- debug!(did = %did, "skipping noop sync");
555555- return Ok(RepoProcessResult::Ok(repo_state));
556556- }
557557-558558- if validated.commit_obj.rev.as_str() <= current_commit.rev.to_tid().as_str() {
559559- debug!(did = %did, "skipping replayed sync");
560560- return Ok(RepoProcessResult::Ok(repo_state));
561561- }
562562- }
563563- // not updating repo state root commit since we are backfilling anyway
564564-565565- warn!(did = %did, "sync event, triggering backfill");
566566- let repo_state = Self::trigger_backfill(ctx, did, repo_state)?;
567567- Ok(RepoProcessResult::Ok(repo_state))
568568- }
569569-570570- fn handle_identity<'s>(
571571- ctx: &mut WorkerContext,
572572- did: &Did,
573573- mut repo_state: RepoState<'s>,
574574- identity: &Identity<'_>,
575575- ) -> Result<RepoProcessResult<'s, 'static>, IngestError> {
576576- let event_ms = identity.time.0.timestamp_millis();
577577- if repo_state.last_message_time.is_some_and(|t| event_ms <= t) {
578578- debug!(did = %did, "skipping stale/duplicate identity event");
579579- return Ok(RepoProcessResult::Ok(repo_state));
580580- }
581581- repo_state.advance_message_time(event_ms);
582582-583583- // todo: make this match relay sync behaviour
584584- let changed = if identity.handle.is_none() {
585585- // no handle sent is basically "invalidate your caches"
586586- ctx.state.resolver.invalidate_sync(did);
587587- let doc = Handle::current().block_on(ctx.state.resolver.resolve_doc(did))?;
588588- repo_state.update_from_doc(doc)
589589- } else {
590590- let old_handle = repo_state.handle.clone();
591591- repo_state.handle = identity
592592- .handle
593593- .clone()
594594- .map(IntoStatic::into_static)
595595- .or(repo_state.handle);
596596- repo_state.handle != old_handle
597597- };
598598-599599- repo_state.touch();
600600- ctx.batch.insert(
601601- &ctx.state.db.repos,
602602- keys::repo_key(did),
603603- crate::db::ser_repo_state(&repo_state)?,
604604- );
605605-606606- if changed {
607607- let evt = IdentityEvt {
608608- did: did.clone().into_static(),
609609- handle: repo_state.handle.clone().map(IntoStatic::into_static),
610610- };
611611- ctx.broadcast_events
612612- .push(ops::make_identity_event(&ctx.state.db, evt));
613613- }
614614-615615- Ok(RepoProcessResult::Ok(repo_state))
616616- }
617617-618618- fn handle_account<'s, 'c>(
619619- ctx: &mut WorkerContext,
620620- did: &Did,
621621- mut repo_state: RepoState<'s>,
622622- pre_status: RepoStatus,
623623- account: &'c Account<'c>,
624624- ) -> Result<RepoProcessResult<'s, 'c>, IngestError> {
625625- let event_ms = account.time.0.timestamp_millis();
626626- if repo_state.last_message_time.is_some_and(|t| event_ms <= t) {
627627- debug!(did = %did, "skipping stale/duplicate account event");
628628- return Ok(RepoProcessResult::Ok(repo_state));
629629- }
630630- repo_state.advance_message_time(event_ms);
631631-632632- // get active before we do any mutations
633633- let was_inactive = matches!(
634634- pre_status,
635635- RepoStatus::Deactivated | RepoStatus::Takendown | RepoStatus::Suspended
636636- );
637637- let is_inactive = !account.active;
638638- let evt = AccountEvt {
639639- did: did.clone().into_static(),
640640- active: account.active,
641641- status: account.status.as_ref().map(|s| s.to_cowstr().into_static()),
642642- };
643643-644644- ctx.refresh_doc(&mut repo_state, did)?;
645645-646646- if !account.active {
647647- use crate::ingest::stream::AccountStatus;
648648- match &account.status {
649649- Some(AccountStatus::Deleted) => {
650650- debug!(did = %did, "account deleted, wiping data");
651651- crate::ops::delete_repo(&mut ctx.batch, &ctx.state.db, did, &repo_state)?;
652652- return Ok(RepoProcessResult::Deleted);
653653- }
654654- status => {
655655- let target_status = inactive_account_repo_status(did, status);
656656-657657- if repo_state.status == target_status {
658658- debug!(did = %did, ?target_status, "account status unchanged");
659659- ctx.batch.insert(
660660- &ctx.state.db.repos,
661661- keys::repo_key(did),
662662- crate::db::ser_repo_state(&repo_state)?,
663663- );
664664- return Ok(RepoProcessResult::Ok(repo_state));
665665- }
666666-667667- repo_state = ops::update_repo_status(
668668- &mut ctx.batch,
669669- &ctx.state.db,
670670- did,
671671- repo_state,
672672- target_status,
673673- )?;
674674- ctx.state
675675- .db
676676- .update_gauge_diff(&GaugeState::Synced, &GaugeState::Resync(None));
677677- }
678678- }
679679- } else {
680680- // active=true: transition to synced is handled in the shard dispatch before calling this
681681- }
682682-683683- if was_inactive != is_inactive || repo_state.status != pre_status {
684684- ctx.broadcast_events
685685- .push(ops::make_account_event(&ctx.state.db, evt));
686686- }
687687-688688- // persist last_message_time for paths that don't go through update_repo_status
689689- // (active=true and already synced). harmless double-write for the status-changed path
690690- ctx.batch.insert(
691691- &ctx.state.db.repos,
692692- keys::repo_key(did),
693693- crate::db::ser_repo_state(&repo_state)?,
694694- );
695695-696696- Ok(RepoProcessResult::Ok(repo_state))
697697- }
698698-699699- // checks the current state of the repo in the database and returns a gate
700700- // indicating what the shard loop should do with the message.
701701- // if the repo is new, creates initial state and triggers backfill
702702- // for synced repos with buffered commits, drains the buffer first
703703- // so events are applied in order
704704- fn check_repo_state<'s, 'c>(
705705- ctx: &mut WorkerContext,
706706- did: &Did<'_>,
707707- msg: &'c SubscribeReposMessage<'static>,
708708- ) -> Result<ProcessGate<'s, 'c>, IngestError> {
709709- let repo_key = keys::repo_key(&did);
710710- let Some(state_bytes) = ctx.state.db.repos.get(&repo_key).into_diagnostic()? else {
711711- let filter = ctx.state.filter.load();
712712-713713- if filter.mode == FilterMode::Filter && !filter.signals.is_empty() {
714714- let commit = match msg {
715715- SubscribeReposMessage::Commit(c) => c,
716716- _ => return Ok(ProcessGate::NewRepo),
717717- };
718718- let touches_signal = commit.ops.iter().any(|op| {
719719- op.path
720720- .split_once('/')
721721- .map(|(col, _)| {
722722- let m = filter.matches_signal(col);
723723- debug!(
724724- did = %did, path = %op.path, col = %col, signals = ?filter.signals, matched = m,
725725- "signal check"
726726- );
727727- m
728728- })
729729- .unwrap_or(false)
730730- });
731731- if !touches_signal {
732732- trace!(did = %did, "dropping commit, no signal-matching ops");
733733- return Ok(ProcessGate::NewRepo);
734734- }
735735- }
736736-737737- debug!(did = %did, "discovered new account from firehose, queueing backfill");
738738-739739- let repo_state = RepoState::untracked(rand::rng().next_u64());
740740- let mut batch = ctx.state.db.inner.batch();
741741- batch.insert(
742742- &ctx.state.db.repos,
743743- &repo_key,
744744- crate::db::ser_repo_state(&repo_state)?,
745745- );
746746- batch.insert(
747747- &ctx.state.db.pending,
748748- keys::pending_key(repo_state.index_id),
749749- &repo_key,
750750- );
751751- batch.commit().into_diagnostic()?;
752752-753753- ctx.state.db.update_count("repos", 1);
754754- ctx.state
755755- .db
756756- .update_gauge_diff(&GaugeState::Synced, &GaugeState::Pending);
757757-758758- ctx.state.notify_backfill();
759759-760760- return Ok(ProcessGate::NewRepo);
761761- };
762762-763763- let repo_state = crate::db::deser_repo_state(&state_bytes)?.into_static();
764764-765765- if !repo_state.tracked && repo_state.status != RepoStatus::Backfilling {
766766- trace!(did = %did, "ignoring message, repo is explicitly untracked");
767767- return Ok(ProcessGate::Drop);
768768- }
769769-770770- match &repo_state.status {
771771- RepoStatus::Synced => {
772772- // lazy drain: if there are buffered commits, drain them now before
773773- // applying the live message so events are applied in order
774774- if ops::has_buffered_commits(&ctx.state.db, did) {
775775- return match Self::drain_resync_buffer(ctx, did, repo_state)? {
776776- RepoProcessResult::Ok(rs) => Ok(ProcessGate::Ready(rs)),
777777- // gap triggered during drain, so drop the live message
778778- RepoProcessResult::NeedsBackfill(_) => Ok(ProcessGate::Drop),
779779- RepoProcessResult::Deleted => Ok(ProcessGate::Drop),
780780- };
781781- }
782782- Ok(ProcessGate::Ready(repo_state))
783783- }
784784- RepoStatus::Backfilling | RepoStatus::Error(_) => {
785785- debug!(
786786- did = %did, status = ?repo_state.status,
787787- "ignoring message, repo is backfilling or in error state"
788788- );
789789- Ok(ProcessGate::Drop)
790790- }
791791- RepoStatus::Deactivated | RepoStatus::Suspended | RepoStatus::Takendown => {
792792- // account events always pass through because the
793793- // shard dispatch handles the active=true transition
794794- if let SubscribeReposMessage::Account(_) = msg {
795795- return Ok(ProcessGate::Ready(repo_state));
796796- }
797797- // buffer commits and drop everything else until we get an active=true message
798798- let commit = match msg {
799799- SubscribeReposMessage::Commit(c) => Some(c.as_ref()),
800800- _ => None,
801801- };
802802- Ok(ProcessGate::Buffer(commit))
803803- }
804804- }
805805- }
806806-807807- fn drain_resync_buffer<'s>(
808808- ctx: &mut WorkerContext,
809809- did: &Did,
810810- mut repo_state: RepoState<'s>,
811811- ) -> Result<RepoProcessResult<'s, 'static>, IngestError> {
812812- let prefix = keys::resync_buffer_prefix(did);
813813-814814- for guard in ctx.state.db.resync_buffer.prefix(&prefix) {
815815- let (key, value) = guard.into_inner().into_diagnostic()?;
816816- let commit: Commit = rmp_serde::from_slice(&value).into_diagnostic()?;
817817-818818- // buffered commits have already been source-checked on arrival; skip host check
819819- let res = Self::handle_commit(ctx, did, repo_state, &commit);
820820- let res = match res {
821821- Ok(r) => r,
822822- Err(e) => {
823823- if !Self::check_if_retriable_failure(&e) {
824824- ctx.batch.remove(&ctx.state.db.resync_buffer, key);
825825- }
826826- return Err(e);
827827- }
828828- };
829829- match res {
830830- RepoProcessResult::Ok(rs) => {
831831- ctx.batch.remove(&ctx.state.db.resync_buffer, key);
832832- repo_state = rs;
833833- }
834834- RepoProcessResult::NeedsBackfill(_) => {
835835- // commit is already in the buffer, leave it there for the next backfill
836836- return Ok(RepoProcessResult::NeedsBackfill(None));
837837- }
838838- RepoProcessResult::Deleted => {
839839- ctx.batch.remove(&ctx.state.db.resync_buffer, key);
840840- return Ok(RepoProcessResult::Deleted);
841841- }
842842- }
843843- }
844844-845845- Ok(RepoProcessResult::Ok(repo_state))
846846- }
847847-848848- // transitions repo to Backfilling, commits the status change immediately (separate from
849849- // ctx.batch), updates the gauge, and pings the backfill worker. returns the updated state.
850850- fn trigger_backfill<'s>(
851851- ctx: &mut WorkerContext,
852852- did: &Did,
853853- repo_state: RepoState<'s>,
854854- ) -> Result<RepoState<'s>, IngestError> {
855855- let mut batch = ctx.state.db.inner.batch();
856856- let repo_state = ops::update_repo_status(
857857- &mut batch,
858858- &ctx.state.db,
859859- did,
860860- repo_state,
861861- RepoStatus::Backfilling,
862862- )?;
863863- batch.commit().into_diagnostic()?;
864864- ctx.state
865865- .db
866866- .update_gauge_diff(&GaugeState::Synced, &GaugeState::Pending);
867867- ctx.state.notify_backfill();
868868- Ok(repo_state)
869869- }
870870-871871- fn check_host_authority(
872872- ctx: &mut WorkerContext,
873873- did: &Did,
874874- repo_state: &mut RepoState,
875875- source_host: &str,
876876- ) -> Result<AuthorityOutcome, IngestError> {
877877- let outcome =
878878- super::check_host_authority(&ctx.state.resolver, did, repo_state, source_host)?;
879879- if !matches!(outcome, AuthorityOutcome::Authorized) {
880880- ctx.batch.insert(
881881- &ctx.state.db.repos,
882882- keys::repo_key(did),
883883- crate::db::ser_repo_state(repo_state)?,
884884- );
885885- }
886886- Ok(outcome)
887887- }
888888-}
889889-890890-impl WorkerContext<'_> {
891891- fn refresh_doc(&mut self, repo_state: &mut RepoState, did: &Did) -> Result<(), IngestError> {
892892- super::refresh_doc(&self.state.resolver, did, repo_state)?;
893893- self.batch.insert(
894894- &self.state.db.repos,
895895- keys::repo_key(did),
896896- crate::db::ser_repo_state(repo_state)?,
897897- );
898898- Ok(())
899899- }
900900-901901- fn fetch_key(&self, did: &Did) -> Result<Option<PublicKey<'static>>> {
902902- super::fetch_key(&self.state.resolver, self.verify_signatures, did)
903903- }
904904-905905- fn validate_commit<'s, 'c>(
906906- &mut self,
907907- did: &Did,
908908- repo_state: &mut RepoState<'s>,
909909- commit: &'c Commit<'c>,
910910- ) -> Result<Option<ValidatedCommit<'c>>, IngestError> {
911911- let key = self.fetch_key(did)?;
912912- match self.vctx.validate_commit(commit, repo_state, key.as_ref()) {
913913- Ok(v) => return Ok(Some(v)),
914914- Err(CommitValidationError::StaleRev) => {
915915- debug!(did = %did, commit_rev = %commit.rev, "skipping replayed commit");
916916- return Ok(None);
917917- }
918918- Err(CommitValidationError::SigFailure) => {}
919919- Err(e) => {
920920- warn!(did = %did, err = %e, "commit rejected");
921921- return Ok(None);
922922- }
923923- }
924924-925925- self.refresh_doc(repo_state, did)?;
926926- let key = self.fetch_key(did)?;
927927- match self.vctx.validate_commit(commit, repo_state, key.as_ref()) {
928928- Ok(v) => Ok(Some(v)),
929929- Err(e) => {
930930- warn!(did = %did, err = %e, "commit rejected after key refresh");
931931- Ok(None)
932932- }
933933- }
934934- }
935935-936936- fn validate_sync<'s>(
937937- &mut self,
938938- did: &Did,
939939- repo_state: &mut RepoState<'s>,
940940- sync: &Sync<'_>,
941941- ) -> Result<Option<ValidatedSync>, IngestError> {
942942- let key = self.fetch_key(did)?;
943943- match self.vctx.validate_sync(sync, key.as_ref()) {
944944- Ok(v) => return Ok(Some(v)),
945945- Err(SyncValidationError::SigFailure) => {}
946946- Err(e) => {
947947- warn!(did = %did, err = %e, "sync rejected");
948948- return Ok(None);
949949- }
950950- }
951951-952952- self.refresh_doc(repo_state, did)?;
953953- let key = self.fetch_key(did)?;
954954- match self.vctx.validate_sync(sync, key.as_ref()) {
955955- Ok(v) => Ok(Some(v)),
956956- Err(e) => {
957957- warn!(did = %did, err = %e, "sync rejected after key refresh");
958958- Ok(None)
959959- }
960960- }
961961- }
962962-}
+6-7
src/lib.rs
···33pub mod filter;
44pub mod types;
5566-#[cfg(all(feature = "relay", feature = "events", not(debug_assertions)))]
77-compile_error!("`relay` and `events` features are mutually exclusive");
88-99-#[cfg(all(feature = "relay", feature = "backlinks", not(debug_assertions)))]
1010-compile_error!("`relay` and `backlinks` features are mutually exclusive");
66+#[cfg(all(feature = "relay", feature = "indexer"))]
77+compile_error!("can't be relay and indexer at the same time");
88+#[cfg(all(feature = "relay", feature = "backlinks"))]
99+compile_error!("can't index backlinks while running as a relay");
11101211pub(crate) mod api;
1313-#[cfg(feature = "events")]
1212+#[cfg(feature = "indexer")]
1413pub(crate) mod backfill;
1514#[cfg(feature = "backlinks")]
1615pub(crate) mod backlinks;
1716pub(crate) mod crawler;
1817pub(crate) mod db;
1918pub(crate) mod ingest;
2020-#[cfg(feature = "events")]
1919+#[cfg(feature = "indexer")]
2120pub(crate) mod ops;
2221pub(crate) mod resolver;
2322pub(crate) mod state;
+72-70
src/ops.rs
···66use jacquard_common::Data;
77use jacquard_common::types::did::Did;
88use miette::{Context, IntoDiagnostic, Result};
99-use rand::{Rng, rng};
109use std::collections::HashMap;
1110use std::sync::atomic::Ordering;
1211use tracing::debug;
13121413use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid};
1515-use crate::db::{self, Db, keys, ser_repo_state};
1414+use crate::db::{self, Db, keys};
1615use crate::filter::FilterConfig;
1716use crate::ingest::stream::Commit;
1817use crate::ingest::validation::ValidatedCommit;
···3231 "buffered commit to resync_buffer"
3332 );
3433 Ok(())
3535-}
3636-3737-pub fn has_buffered_commits(db: &Db, did: &Did) -> bool {
3838- let prefix = keys::resync_buffer_prefix(did);
3939- db.resync_buffer.prefix(&prefix).next().is_some()
4034}
41354236// emitting identity is ephemeral
···6963 batch: &mut OwnedWriteBatch,
7064 db: &Db,
7165 did: &Did,
7272- repo_state: &RepoState,
6666+ _repo_state: &RepoState,
7367) -> Result<()> {
7468 debug!(did = %did, "deleting repo");
75697670 let repo_key = keys::repo_key(did);
7777- let pending_key = keys::pending_key(repo_state.index_id);
7171+ let metadata_key = keys::repo_metadata_key(did);
78727979- // 1. delete from repos, pending, resync
8080- batch.remove(&db.repos, &repo_key);
8181- match repo_state.status {
8282- RepoStatus::Synced => {}
8383- RepoStatus::Backfilling => {
8484- batch.remove(&db.pending, &pending_key);
8585- }
8686- _ => {
8787- batch.remove(&db.resync, &repo_key);
8888- }
7373+ let metadata_bytes = db.repo_metadata.get(&metadata_key).into_diagnostic()?;
7474+ if let Some(metadata_bytes) = metadata_bytes {
7575+ let metadata = db::deser_repo_metadata(&metadata_bytes)?;
7676+ batch.remove(&db.pending, keys::pending_key(metadata.index_id));
8977 }
7878+7979+ // 1. delete from resync, and metadata
8080+ // we don't delete from repos, relay uses it as a tombstone
8181+ // todo: we should still delete it after some time
8282+ batch.remove(&db.resync, &repo_key);
8383+ batch.remove(&db.repo_metadata, &metadata_key);
90849185 // 2. delete from resync buffer
9286 let resync_prefix = keys::resync_buffer_prefix(did);
···123117 Ok(())
124118}
125119126126-pub fn update_repo_status<'batch, 's>(
120120+pub fn transition_repo<'batch, 's>(
127121 batch: &'batch mut OwnedWriteBatch,
128122 db: &Db,
129123 did: &Did,
···133127 debug!(did = %did, status = ?new_status, "updating repo status");
134128135129 let repo_key = keys::repo_key(did);
136136- let pending_key = keys::pending_key(repo_state.index_id);
130130+ let metadata_key = keys::repo_metadata_key(did);
131131+132132+ let metadata_bytes = db.repo_metadata.get(&metadata_key).into_diagnostic()?;
133133+ if let Some(metadata_bytes) = metadata_bytes {
134134+ let metadata = db::deser_repo_metadata(&metadata_bytes)?;
135135+ let pending_key = keys::pending_key(metadata.index_id);
137136138138- // manage queues
139139- match &new_status {
140140- RepoStatus::Synced => {
141141- batch.remove(&db.pending, &pending_key);
142142- // we dont have to remove from resync here because it has to transition resync -> pending first
143143- }
144144- RepoStatus::Backfilling => {
145145- // if we are coming from an error state, remove from resync
146146- if !matches!(repo_state.status, RepoStatus::Synced) {
137137+ // manage queues
138138+ match &new_status {
139139+ RepoStatus::Synced => {
140140+ batch.remove(&db.pending, &pending_key);
141141+ // we dont have to remove from resync here because it has to transition resync -> pending first
142142+ }
143143+ RepoStatus::Error(msg) => {
144144+ tracing::warn!("transitioning to error: {msg}");
145145+ batch.remove(&db.pending, &pending_key);
146146+ // TODO: we need to make errors have kind instead of "message" in repo status
147147+ // and then pass it to resync error kind
148148+ let resync_state = crate::types::ResyncState::Error {
149149+ kind: crate::types::ResyncErrorKind::Generic,
150150+ retry_count: 0,
151151+ next_retry: chrono::Utc::now().timestamp(),
152152+ };
153153+ batch.insert(
154154+ &db.resync,
155155+ &repo_key,
156156+ rmp_serde::to_vec(&resync_state).into_diagnostic()?,
157157+ );
158158+ }
159159+ RepoStatus::Deactivated | RepoStatus::Takendown | RepoStatus::Suspended => {
160160+ // this shouldnt be needed since a repo wont be in a pending state when it gets to any of these states
161161+ // batch.remove(&db.pending, &pending_key);
162162+ let resync_state = ResyncState::Gone {
163163+ status: new_status.clone(),
164164+ };
165165+ batch.insert(
166166+ &db.resync,
167167+ &repo_key,
168168+ rmp_serde::to_vec(&resync_state).into_diagnostic()?,
169169+ );
170170+ }
171171+ RepoStatus::Deleted => {
172172+ // terminal state: remove from queues, no resync entry needed
173173+ batch.remove(&db.pending, &pending_key);
147174 batch.remove(&db.resync, &repo_key);
148175 }
149149- // remove the old entry
150150- batch.remove(&db.pending, &pending_key);
151151- // add as new entry
152152- repo_state.index_id = rng().next_u64();
153153- batch.insert(
154154- &db.pending,
155155- keys::pending_key(repo_state.index_id),
156156- &repo_key,
157157- );
158158- }
159159- RepoStatus::Error(_msg) => {
160160- batch.remove(&db.pending, &pending_key);
161161- // TODO: we need to make errors have kind instead of "message" in repo status
162162- // and then pass it to resync error kind
163163- let resync_state = crate::types::ResyncState::Error {
164164- kind: crate::types::ResyncErrorKind::Generic,
165165- retry_count: 0,
166166- next_retry: chrono::Utc::now().timestamp(),
167167- };
168168- batch.insert(
169169- &db.resync,
170170- &repo_key,
171171- rmp_serde::to_vec(&resync_state).into_diagnostic()?,
172172- );
173173- }
174174- RepoStatus::Deactivated | RepoStatus::Takendown | RepoStatus::Suspended => {
175175- // this shouldnt be needed since a repo wont be in a pending state when it gets to any of these states
176176- // batch.remove(&db.pending, &pending_key);
177177- let resync_state = ResyncState::Gone {
178178- status: new_status.clone(),
179179- };
180180- batch.insert(
181181- &db.resync,
182182- &repo_key,
183183- rmp_serde::to_vec(&resync_state).into_diagnostic()?,
184184- );
176176+ RepoStatus::Desynchronized | RepoStatus::Throttled => {
177177+ // like an error: remove from pending and schedule a resync attempt
178178+ batch.remove(&db.pending, &pending_key);
179179+ let resync_state = crate::types::ResyncState::Error {
180180+ kind: crate::types::ResyncErrorKind::Generic,
181181+ retry_count: 0,
182182+ next_retry: chrono::Utc::now().timestamp(),
183183+ };
184184+ batch.insert(
185185+ &db.resync,
186186+ &repo_key,
187187+ rmp_serde::to_vec(&resync_state).into_diagnostic()?,
188188+ );
189189+ }
185190 }
186191 }
187192193193+ repo_state.active = matches!(new_status, RepoStatus::Synced | RepoStatus::Error(_));
188194 repo_state.status = new_status;
189195 repo_state.touch();
190190-191191- batch.insert(&db.repos, &repo_key, ser_repo_state(&repo_state)?);
192196193197 Ok(repo_state)
194198}
···214218215219 repo_state.root = Some(validated.commit_obj.into());
216220 repo_state.touch();
217217-218218- batch.insert(&db.repos, keys::repo_key(did), ser_repo_state(&repo_state)?);
219221220222 // 2. iterate ops and update records index
221223 let mut records_delta = 0;
+97-39
src/types.rs
···11use std::fmt::{Debug, Display};
2233+use bytes::Bytes;
34use jacquard_common::types::cid::IpldCid;
45use jacquard_common::types::nsid::Nsid;
56use jacquard_common::types::string::{Did, Rkey};
···1617pub(crate) mod v2 {
1718 use super::*;
18191919- // todo: add desynchronized and throttled fields
2020 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2121 pub enum RepoStatus {
2222 Backfilling,
···4040 #[derive(Debug, Clone, Serialize, Deserialize)]
4141 #[serde(bound(deserialize = "'i: 'de"))]
4242 pub(crate) struct RepoState<'i> {
4343- // todo: add active field
4443 pub status: RepoStatus,
4544 pub root: Option<Commit>,
4646- // todo: is this actually valid? the spec says this is informal and intermadiate
4747- // services may change it. we should probably document it. if we cant use this
4848- // then how do we dedup account / identity ops?
4545+ pub last_message_time: Option<i64>,
4646+ pub last_updated_at: i64,
4747+ pub tracked: bool,
4848+ pub index_id: u64,
4949+ #[serde(borrow)]
5050+ pub signing_key: Option<DidKey<'i>>,
5151+ #[serde(borrow)]
5252+ pub pds: Option<CowStr<'i>>,
5353+ #[serde(borrow)]
5454+ pub handle: Option<Handle<'i>>,
5555+ }
5656+}
5757+5858+pub(crate) mod v4 {
5959+ use super::*;
6060+ pub(crate) use v2::Commit;
6161+6262+ #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
6363+ pub enum RepoStatus {
6464+ /// repo is synced to latest commit from what we know of
6565+ Synced,
6666+ /// some unclassified fatal error
6767+ Error(SmolStr),
6868+ /// user has temporarily paused their overall account. content should
6969+ /// not be displayed or redistributed, but does not need to be deleted
7070+ /// from infrastructure. implied time-limited. also the initial state
7171+ /// for an account after migrating to another pds instance.
7272+ Deactivated,
7373+ /// host or service has takendown the account. implied permanent or
7474+ /// long-term, though may be reverted.
7575+ Takendown,
7676+ /// host or service has temporarily paused the account. implied
7777+ /// time-limited.
7878+ Suspended,
7979+ /// user or host has deleted the account, and content should be removed
8080+ /// from the network. implied permanent or long-term, though may be
8181+ /// reverted (deleted accounts may reactivate on the same or another
8282+ /// host).
8383+ ///
8484+ /// account is deleted; kept as a tombstone so stale commits arriving from the upstream
8585+ /// backfill window are not forwarded. active=false per spec.
8686+ Deleted,
8787+ /// host detected a repo sync problem. active may be true or false per spec;
8888+ /// the `active` field on `RepoState` is authoritative.
8989+ Desynchronized,
9090+ /// resource rate-limit exceeded. active may be true or false per spec;
9191+ /// the `active` field on `RepoState` is authoritative.
9292+ Throttled,
9393+ }
9494+9595+ #[derive(Debug, Clone, Serialize, Deserialize)]
9696+ #[serde(bound(deserialize = "'i: 'de"))]
9797+ pub(crate) struct RepoState<'i> {
9898+ /// whether the upstream considers this account active.
9999+ /// services should use the `active` flag to control overall account visibility
100100+ pub active: bool,
101101+ pub status: RepoStatus,
102102+ pub root: Option<Commit>,
49103 /// ms since epoch of the last firehose message we processed for this repo.
50104 /// used to deduplicate identity / account events that can arrive from multiple relays at
51105 /// different wall-clock times but represent the same underlying PDS event.
52106 pub last_message_time: Option<i64>,
53107 /// this is when we *ingested* any last updates
54108 pub last_updated_at: i64, // unix timestamp
5555- /// whether we are ingesting events for this repo
5656- pub tracked: bool,
5757- /// index id in pending keyspace
5858- pub index_id: u64,
59109 #[serde(borrow)]
60110 pub signing_key: Option<DidKey<'i>>,
61111 #[serde(borrow)]
···63113 #[serde(borrow)]
64114 pub handle: Option<Handle<'i>>,
65115 }
6666-}
6767-pub(crate) use v2::*;
681166969-impl Display for RepoStatus {
7070- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
7171- match self {
7272- RepoStatus::Backfilling => write!(f, "backfilling"),
7373- RepoStatus::Synced => write!(f, "synced"),
7474- RepoStatus::Error(e) => write!(f, "error({e})"),
7575- RepoStatus::Deactivated => write!(f, "deactivated"),
7676- RepoStatus::Takendown => write!(f, "takendown"),
7777- RepoStatus::Suspended => write!(f, "suspended"),
7878- }
117117+ #[derive(Debug, Clone, Serialize, Deserialize)]
118118+ pub(crate) struct RepoMetadata {
119119+ /// whether we are ingesting events for this repo
120120+ pub tracked: bool,
121121+ /// index id in pending keyspace (if backfilling)
122122+ pub index_id: u64,
79123 }
80124}
125125+126126+pub(crate) use v4::*;
8112782128impl<'c> From<AtpCommit<'c>> for Commit {
83129 fn from(value: AtpCommit<'c>) -> Self {
···9313994140impl Commit {
95141 pub(crate) fn into_atp_commit<'i>(self, did: Did<'i>) -> Option<AtpCommit<'i>> {
9696- // from a migration
142142+ // version < 0 is a sentinel used in v2 migration for repos with no commit data
97143 if self.version < 0 {
98144 return None;
99145 }
···108154 }
109155}
110156111111-impl<'i> RepoState<'i> {
157157+impl Display for RepoStatus {
158158+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159159+ match self {
160160+ RepoStatus::Synced => write!(f, "synced"),
161161+ RepoStatus::Error(e) => write!(f, "error({e})"),
162162+ RepoStatus::Deactivated => write!(f, "deactivated"),
163163+ RepoStatus::Takendown => write!(f, "takendown"),
164164+ RepoStatus::Suspended => write!(f, "suspended"),
165165+ RepoStatus::Deleted => write!(f, "deleted"),
166166+ RepoStatus::Desynchronized => write!(f, "desynchronized"),
167167+ RepoStatus::Throttled => write!(f, "throttled"),
168168+ }
169169+ }
170170+}
171171+172172+impl RepoMetadata {
112173 pub fn backfilling(index_id: u64) -> Self {
113174 Self {
114114- status: RepoStatus::Backfilling,
115115- root: None,
116116- last_updated_at: chrono::Utc::now().timestamp(),
117175 index_id,
118176 tracked: true,
119119- handle: None,
120120- pds: None,
121121- signing_key: None,
122122- last_message_time: None,
123177 }
124178 }
179179+}
125180126126- /// backfilling, but not tracked yet
127127- pub fn untracked(index_id: u64) -> Self {
181181+impl<'i> RepoState<'i> {
182182+ pub fn backfilling() -> Self {
128183 Self {
129129- tracked: false,
130130- ..Self::backfilling(index_id)
184184+ active: true,
185185+ status: RepoStatus::Desynchronized,
186186+ root: None,
187187+ last_updated_at: chrono::Utc::now().timestamp(),
188188+ handle: None,
189189+ pds: None,
190190+ signing_key: None,
191191+ last_message_time: None,
131192 }
132193 }
133194···158219159220 fn into_static(self) -> Self::Output {
160221 RepoState {
222222+ active: self.active,
161223 status: self.status,
162224 root: self.root,
163225 last_updated_at: self.last_updated_at,
164164- index_id: self.index_id,
165165- tracked: self.tracked,
166226 handle: self.handle.map(IntoStatic::into_static),
167227 pds: self.pds.map(IntoStatic::into_static),
168228 signing_key: self.signing_key.map(IntoStatic::into_static),
···244304 pub account: Option<AccountEvt<'i>>,
245305}
246306247247-#[cfg(feature = "events")]
307307+#[cfg(feature = "indexer")]
248308#[derive(Clone, Debug)]
249309pub(crate) enum BroadcastEvent {
250310 #[allow(dead_code)]
···284344 #[serde(skip_serializing_if = "Option::is_none")]
285345 pub status: Option<CowStr<'i>>,
286346}
287287-288288-use bytes::Bytes;
289347290348#[derive(Serialize, Deserialize, Clone)]
291349pub(crate) enum StoredData {
+1-1
tests/authenticated_stream.nu
···106106 $e | select id type | insert value $value
107107 })
108108 print $"captured ($events | length) events"
109109- $display_events | table -e | print
109109+ $display_events | to text | print
110110111111 # filter live events for the relevant entities
112112 let relevant_events = ($events | where { |it|
+1-1
tests/backlinks.nu
···157157# verify that reverse=true actually inverts the order using a subject with 2+ backlinks.
158158# returns an error string on failure, or empty string on success.
159159def check-reverse-ordering [url: string, subject: string, expected_count: int] {
160160- print $"checking reverse ordering — subject has ($expected_count) backlinks..."
160160+ print $"checking reverse ordering... subject has ($expected_count) backlinks..."
161161 print $" subject: ($subject)"
162162163163 let fwd = (http get $"($url)/xrpc/blue.microcosm.links.getBacklinks?subject=($subject | url encode)&limit=50")
+2-2
tests/by_collection.nu
···5757 let filter = (http get $"($url)/filter")
5858 print $"filter state: ($filter | to json)"
5959 if not ($filter.signals | any { |s| $s == $collection }) {
6060- print $"FAILED: ($collection) not in signals — filter not configured"
6060+ print $"FAILED: ($collection) not in signals, filter not configured"
6161 try { kill -9 $instance.pid }
6262 rm -rf $db_path
6363 exit 1
···9999 print $"FAILED: ($did) not found in repos API"
100100 $all_found = false
101101 } else {
102102- print $"ok: ($did) — status: ($repo.status)"
102102+ print $"ok: ($did), status: ($repo.status)"
103103 }
104104 }
105105
+6-3
tests/run_all.nu
···3737 }
3838}
39394040-def main [--only: list<string> = []] {
4040+def main [--only: list<string> = [], --skip-creds] {
4141 print "building hydrant..."
4242- # build defaults features
4242+ # build default features
4343 cargo build
4444 # build backlinks
4545 cargo build --features backlinks
4646 print ""
47474848 # discover all test scripts, excluding infrastructure files
4949- let excluded = ["common", "mock_relay", "run_all"]
4949+ mut excluded = ["common", "mock_relay", "run_all"]
5050+ if $skip_creds {
5151+ $excluded = ($excluded | append ["authenticated_stream", "repo_sync_integrity"])
5252+ }
5053 let discovered = (
5154 ls tests/*.nu
5255 | get name
+1-1
tests/signal_filter.nu
···1515 let url = $"http://localhost:($port)"
1616 let db_path = (mktemp -d -t hydrant_signal_test.XXXXXX)
17171818- let random_str = (random chars -l 6)
1818+ let random_str = ("a" + (random chars -l 5))
1919 let collection = $"systems.hydrant.test.($random_str)"
20202121 print $"database path: ($db_path)"