very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust
fjall
at-protocol
atproto
indexer
1use std::collections::HashMap;
2use std::sync::Arc;
3#[cfg(feature = "relay")]
4use std::sync::atomic::Ordering;
5
6use fjall::OwnedWriteBatch;
7
8use jacquard_api::com_atproto::sync::get_repo_status::{
9 GetRepoStatus, GetRepoStatusError, GetRepoStatusOutputStatus,
10};
11use jacquard_common::types::crypto::PublicKey;
12use jacquard_common::types::did::Did;
13use jacquard_common::xrpc::{XrpcError, XrpcExt};
14use jacquard_common::{CowStr, IntoStatic};
15use miette::{IntoDiagnostic, Result};
16use tokio::runtime::Handle;
17use tokio::sync::mpsc;
18use tracing::{debug, error, info, info_span, trace, warn};
19use url::Url;
20
21use crate::db::keys::pds_account_count_key;
22use crate::db::{self, CountDeltas, keys};
23use crate::ingest::stream::AccountStatus;
24#[cfg(feature = "relay")]
25use crate::ingest::stream::encode_frame;
26use crate::ingest::stream::{Account, Commit, Identity, InfoName, SubscribeReposMessage, Sync};
27use crate::ingest::validation::{
28 CommitValidationError, SyncValidationError, ValidatedCommit, ValidatedSync, ValidationContext,
29 ValidationOptions,
30};
31use crate::ingest::{BufferRx, IngestMessage};
32use crate::state::AppState;
33#[cfg(feature = "relay")]
34use crate::types::RelayBroadcast;
35use crate::types::{RepoState, RepoStatus};
36use crate::util;
37use smol_str::{SmolStr, ToSmolStr};
38
39struct WorkerContext<'a> {
40 verify_signatures: bool,
41 state: &'a AppState,
42 vctx: ValidationContext<'a>,
43 batch: OwnedWriteBatch,
44 count_deltas: CountDeltas,
45 #[cfg(feature = "relay")]
46 pending_broadcasts: Vec<RelayBroadcast>,
47 #[cfg(feature = "indexer")]
48 pending_hook_messages: Vec<crate::ingest::indexer::IndexerMessage>,
49 #[cfg(feature = "indexer")]
50 hook: crate::ingest::indexer::IndexerTx,
51 http: reqwest::Client,
52 error_counts: HashMap<u64, u32, nohash_hasher::BuildNoHashHasher<u64>>,
53}
54
55struct WorkerMessage {
56 is_pds: bool,
57 firehose: Url,
58 msg: SubscribeReposMessage<'static>,
59}
60
61pub struct RelayWorker {
62 state: Arc<AppState>,
63 rx: BufferRx,
64 #[cfg(feature = "indexer")]
65 hook: crate::ingest::indexer::IndexerTx,
66 verify_signatures: bool,
67 num_shards: usize,
68 validation_opts: Arc<ValidationOptions>,
69 http: reqwest::Client,
70}
71
72impl RelayWorker {
73 pub fn new(
74 state: Arc<AppState>,
75 rx: BufferRx,
76 #[cfg(feature = "indexer")] hook: crate::ingest::indexer::IndexerTx,
77 verify_signatures: bool,
78 num_shards: usize,
79 validation_opts: ValidationOptions,
80 ) -> Self {
81 Self {
82 state,
83 rx,
84 #[cfg(feature = "indexer")]
85 hook,
86 verify_signatures,
87 num_shards,
88 validation_opts: Arc::new(validation_opts),
89 http: reqwest::Client::new(),
90 }
91 }
92
93 pub fn run(mut self, handle: Handle) -> Result<()> {
94 let mut shards = Vec::with_capacity(self.num_shards);
95
96 for i in 0..self.num_shards {
97 let (tx, rx) = mpsc::unbounded_channel();
98 shards.push(tx);
99
100 let state = self.state.clone();
101 #[cfg(feature = "indexer")]
102 let hook = self.hook.clone();
103 let verify = self.verify_signatures;
104 let h = handle.clone();
105 let opts = self.validation_opts.clone();
106 let http = self.http.clone();
107
108 std::thread::Builder::new()
109 .name(format!("relay-shard-{i}"))
110 .spawn(move || {
111 Self::shard(
112 i,
113 rx,
114 state,
115 #[cfg(feature = "indexer")]
116 hook,
117 verify,
118 h,
119 opts,
120 http,
121 );
122 })
123 .into_diagnostic()?;
124 }
125
126 info!(num = self.num_shards, "relay worker: started shards");
127
128 let _g = handle.enter();
129
130 while let Some(msg) = self.rx.blocking_recv() {
131 let IngestMessage::Firehose { url, is_pds, msg } = msg;
132
133 // #info only pertains to us, the direct consumer
134 if let SubscribeReposMessage::Info(inf) = msg {
135 match inf.name {
136 InfoName::OutdatedCursor => {
137 // todo: handle
138 }
139 InfoName::Other(name) => {
140 let message = inf
141 .message
142 .unwrap_or_else(|| CowStr::Borrowed("<no message>"));
143 info!(name = %name, "relay sent info: {message}");
144 }
145 }
146 continue;
147 }
148
149 let shard_idx = {
150 let did = match &msg {
151 SubscribeReposMessage::Commit(c) => &c.repo,
152 SubscribeReposMessage::Identity(i) => &i.did,
153 SubscribeReposMessage::Account(a) => &a.did,
154 SubscribeReposMessage::Sync(s) => &s.did,
155 _ => continue,
156 };
157 (util::hash(did) as usize) % self.num_shards
158 };
159
160 if let Err(e) = shards[shard_idx].send(WorkerMessage {
161 firehose: url,
162 is_pds,
163 msg,
164 }) {
165 error!(shard = shard_idx, err = %e, "relay worker: failed to send to shard");
166 break;
167 }
168 }
169
170 Err(miette::miette!("relay worker dispatcher shutting down"))
171 }
172
173 fn shard(
174 id: usize,
175 mut rx: mpsc::UnboundedReceiver<WorkerMessage>,
176 state: Arc<AppState>,
177 #[cfg(feature = "indexer")] hook: crate::ingest::indexer::IndexerTx,
178 verify_signatures: bool,
179 handle: Handle,
180 validation_opts: Arc<ValidationOptions>,
181 http: reqwest::Client,
182 ) {
183 let _guard = handle.enter();
184 let span = info_span!("worker_shard", shard = id);
185 let _entered = span.clone().entered();
186 debug!("relay shard started");
187
188 let mut ctx = WorkerContext {
189 verify_signatures,
190 state: &state,
191 vctx: ValidationContext {
192 opts: &validation_opts,
193 },
194 batch: state.db.inner.batch(),
195 count_deltas: CountDeltas::default(),
196 #[cfg(feature = "relay")]
197 pending_broadcasts: Vec::with_capacity(2),
198 #[cfg(feature = "indexer")]
199 pending_hook_messages: Vec::with_capacity(2),
200 #[cfg(feature = "indexer")]
201 hook,
202 http,
203 error_counts: Default::default(),
204 };
205
206 while let Some(msg) = rx.blocking_recv() {
207 ctx.count_deltas = CountDeltas::default();
208 let (did, seq) = match &msg.msg {
209 SubscribeReposMessage::Commit(c) => (c.repo.clone(), c.seq),
210 SubscribeReposMessage::Identity(i) => (i.did.clone(), i.seq),
211 SubscribeReposMessage::Account(a) => (a.did.clone(), a.seq),
212 SubscribeReposMessage::Sync(s) => (s.did.clone(), s.seq),
213 _ => continue,
214 };
215
216 let firehose = msg.firehose.clone();
217 let _span = info_span!("relay", did = %did, firehose = %firehose, seq = %seq).entered();
218
219 if let Err(e) = Self::process_message(&mut ctx, msg) {
220 error!(did = %did, err = %e, "relay shard: error processing message");
221 }
222
223 let mut batch = std::mem::replace(&mut ctx.batch, ctx.state.db.inner.batch());
224 let reservation = ctx
225 .state
226 .db
227 .stage_count_deltas(&mut batch, &ctx.count_deltas);
228 let res = batch.commit();
229 if let Err(e) = res {
230 error!(shard = id, err = %e, "relay shard: failed to commit batch");
231 drop(reservation);
232 continue;
233 }
234 ctx.state.db.apply_count_deltas(&ctx.count_deltas);
235 drop(reservation);
236
237 #[cfg(feature = "relay")]
238 for broadcast in ctx.pending_broadcasts.drain(..) {
239 let _ = state.db.relay_broadcast_tx.send(broadcast);
240 }
241 #[cfg(feature = "indexer")]
242 for msg in ctx.pending_hook_messages.drain(..) {
243 let _ = ctx.hook.blocking_send(msg);
244 }
245
246 // advance cursor for this firehose only if we are the terminal consumer (relay mode)
247 // in events mode, FirehoseWorker will advance the cursor after processing
248 #[cfg(feature = "relay")]
249 {
250 ctx.state
251 .firehose_cursors
252 .peek_with(&firehose, |_, c| c.store(seq, Ordering::SeqCst));
253 }
254 }
255 }
256
257 fn process_message(ctx: &mut WorkerContext, msg: WorkerMessage) -> Result<()> {
258 let Some(mut repo_state) = ctx.load_repo_state(&msg)? else {
259 return Ok(());
260 };
261 let did = msg.msg.did().expect("already checked for did");
262
263 if let Some(host) = msg.firehose.host_str()
264 && msg.is_pds
265 {
266 let outcome = ctx.check_host_authority(did, &mut repo_state, host)?;
267 if let AuthorityOutcome::WrongHost { expected } = outcome {
268 if !ctx.inc_error(host) {
269 warn!(got = host, expected = %expected, "message rejected: wrong host");
270 }
271 return Ok(());
272 }
273 ctx.reset_error(host);
274 }
275
276 match msg.msg {
277 SubscribeReposMessage::Commit(commit) => {
278 trace!("processing commit");
279 Self::handle_commit(ctx, &mut repo_state, &msg.firehose, *commit)
280 }
281 SubscribeReposMessage::Sync(sync) => {
282 debug!("processing sync");
283 Self::handle_sync(ctx, &mut repo_state, &msg.firehose, *sync)
284 }
285 SubscribeReposMessage::Identity(identity) => {
286 debug!("processing identity");
287 Self::handle_identity(ctx, &mut repo_state, &msg.firehose, *identity, msg.is_pds)
288 }
289 SubscribeReposMessage::Account(account) => {
290 debug!("processing account");
291 Self::handle_account(ctx, &mut repo_state, &msg.firehose, *account, msg.is_pds)
292 }
293 _ => Ok(()),
294 }
295 }
296
297 fn handle_commit(
298 ctx: &mut WorkerContext,
299 repo_state: &mut RepoState,
300 #[allow(unused_variables)] firehose: &Url,
301 #[allow(unused_mut)] mut commit: Commit<'static>,
302 ) -> Result<()> {
303 if !repo_state.active {
304 return Ok(());
305 }
306
307 repo_state.advance_message_time(commit.time.0.timestamp_millis());
308
309 let Some(validated) = ctx.validate_commit(repo_state, &commit)? else {
310 return Ok(());
311 };
312 let ValidatedCommit {
313 chain_break,
314 commit_obj,
315 parsed_blocks,
316 ..
317 } = validated;
318
319 #[cfg(not(feature = "indexer"))]
320 let _ = parsed_blocks;
321
322 if chain_break.is_broken() {
323 // chain breaks are not grounds for blocking when acting as a relay
324 debug!(broken = ?chain_break, "chain break, forwarding anyway");
325 }
326
327 let repo_key = keys::repo_key(&commit.repo);
328
329 #[cfg(feature = "indexer")]
330 {
331 ctx.pending_hook_messages
332 .push(crate::ingest::indexer::IndexerMessage::Event(Box::new(
333 crate::ingest::indexer::IndexerEvent {
334 seq: commit.seq,
335 firehose: firehose.clone(),
336 data: crate::ingest::indexer::IndexerEventData::Commit(
337 crate::ingest::indexer::IndexerCommitData {
338 commit,
339 chain_break: chain_break.is_broken(),
340 parsed_blocks,
341 },
342 ),
343 },
344 )));
345 }
346 #[cfg(feature = "relay")]
347 {
348 ctx.queue_emit(|seq| {
349 commit.seq = seq;
350 encode_frame("#commit", &commit)
351 })?;
352 }
353
354 repo_state.root = Some(commit_obj.into());
355 repo_state.touch();
356 ctx.batch.insert(
357 &ctx.state.db.repos,
358 repo_key,
359 db::ser_repo_state(repo_state)?,
360 );
361
362 Ok(())
363 }
364
365 fn handle_sync(
366 ctx: &mut WorkerContext,
367 repo_state: &mut RepoState,
368 #[allow(unused_variables)] firehose: &Url,
369 #[allow(unused_mut)] mut sync: Sync<'static>,
370 ) -> Result<()> {
371 if !repo_state.active {
372 return Ok(());
373 }
374
375 repo_state.advance_message_time(sync.time.0.timestamp_millis());
376
377 let Some(validated) = ctx.validate_sync(repo_state, &sync)? else {
378 return Ok(());
379 };
380
381 let repo_key = keys::repo_key(&sync.did);
382
383 #[cfg(feature = "indexer")]
384 {
385 ctx.pending_hook_messages
386 .push(crate::ingest::indexer::IndexerMessage::Event(Box::new(
387 crate::ingest::indexer::IndexerEvent {
388 seq: sync.seq,
389 firehose: firehose.clone(),
390 data: crate::ingest::indexer::IndexerEventData::Sync(
391 sync.did.into_static(),
392 ),
393 },
394 )));
395 }
396 #[cfg(feature = "relay")]
397 {
398 ctx.queue_emit(|seq| {
399 sync.seq = seq;
400 encode_frame("#sync", &sync)
401 })?;
402 }
403
404 repo_state.root = Some(validated.commit_obj.into());
405 repo_state.touch();
406 ctx.batch.insert(
407 &ctx.state.db.repos,
408 repo_key,
409 db::ser_repo_state(repo_state)?,
410 );
411
412 Ok(())
413 }
414
415 fn handle_identity(
416 ctx: &mut WorkerContext,
417 repo_state: &mut RepoState,
418 #[allow(unused_variables)] firehose: &Url,
419 mut identity: Identity<'static>,
420 is_pds: bool,
421 ) -> Result<()> {
422 let event_ms = identity.time.0.timestamp_millis();
423 if repo_state.last_message_time.is_some_and(|t| event_ms <= t) {
424 debug!("skipping stale/duplicate identity event");
425 return Ok(());
426 }
427 repo_state.advance_message_time(event_ms);
428
429 #[cfg(feature = "indexer")]
430 let (was_handle, was_signing_key) = (
431 repo_state.handle.clone().map(IntoStatic::into_static),
432 repo_state.signing_key.clone().map(IntoStatic::into_static),
433 );
434
435 // refresh did doc if a pds sent this event
436 // or if there is no handle specified
437 if is_pds || identity.handle.is_none() {
438 ctx.state.resolver.invalidate_sync(&identity.did);
439 let doc = Handle::current().block_on(ctx.state.resolver.resolve_doc(&identity.did));
440 match doc {
441 Ok(doc) => {
442 repo_state.update_from_doc(doc);
443 }
444 Err(err) => {
445 warn!(err = %err, "couldnt fetch identity");
446 }
447 }
448 }
449
450 // don't pass handle through if it doesnt match ours for pds events
451 if is_pds && repo_state.handle != identity.handle {
452 identity.handle = None;
453 }
454
455 let repo_key = keys::repo_key(&identity.did);
456
457 #[cfg(feature = "indexer")]
458 {
459 let changed =
460 repo_state.handle != was_handle || repo_state.signing_key != was_signing_key;
461 ctx.pending_hook_messages
462 .push(crate::ingest::indexer::IndexerMessage::Event(Box::new(
463 crate::ingest::indexer::IndexerEvent {
464 seq: identity.seq,
465 firehose: firehose.clone(),
466 data: crate::ingest::indexer::IndexerEventData::Identity(
467 crate::ingest::indexer::IndexerIdentityData { identity, changed },
468 ),
469 },
470 )));
471 }
472 #[cfg(feature = "relay")]
473 {
474 ctx.queue_emit(|seq| {
475 identity.seq = seq;
476 encode_frame("#identity", &identity)
477 })?;
478 }
479
480 ctx.batch.insert(
481 &ctx.state.db.repos,
482 repo_key,
483 db::ser_repo_state(repo_state)?,
484 );
485
486 Ok(())
487 }
488
489 fn handle_account(
490 ctx: &mut WorkerContext,
491 repo_state: &mut RepoState,
492 firehose: &Url,
493 #[allow(unused_mut)] mut account: Account<'static>,
494 is_pds: bool,
495 ) -> Result<()> {
496 let event_ms = account.time.0.timestamp_millis();
497 if repo_state.last_message_time.is_some_and(|t| event_ms <= t) {
498 debug!("skipping stale/duplicate account event");
499 return Ok(());
500 }
501
502 repo_state.advance_message_time(event_ms);
503
504 // always capture was_active for count tracking, not just in indexer mode
505 let was_active = repo_state.active;
506 #[cfg(feature = "indexer")]
507 let was_status = repo_state.status.clone();
508
509 repo_state.active = account.active;
510 if !account.active {
511 use crate::ingest::stream::AccountStatus;
512 match &account.status {
513 Some(AccountStatus::Deleted) => {
514 // keep a Deleted tombstone so any stale commits that arrive later
515 // (e.g. from the upstream backfill window) are not forwarded.
516 // per spec: "if any further #commit messages are emitted for the repo,
517 // all downstream services should ignore the event and not pass it through."
518 repo_state.status = RepoStatus::Deleted;
519 }
520 status => {
521 repo_state.status = ctx.inactive_account_repo_status(&account.did, status);
522 }
523 }
524 } else {
525 // active=true: desynchronized/throttled may still carry active=true per spec.
526 // anything else (including unknown statuses) is treated as synced.
527 use crate::ingest::stream::AccountStatus;
528 repo_state.status = match &account.status {
529 Some(AccountStatus::Desynchronized) => RepoStatus::Desynchronized,
530 Some(AccountStatus::Throttled) => RepoStatus::Throttled,
531 _ => RepoStatus::Synced,
532 };
533 }
534
535 // update per-PDS active account count on transitions
536 if is_pds {
537 if let Some(host) = firehose.host_str() {
538 let count_key = pds_account_count_key(host);
539 let delta = if !was_active && repo_state.active {
540 1
541 } else if was_active && !repo_state.active {
542 -1
543 } else {
544 0
545 };
546
547 if delta != 0 {
548 ctx.count_deltas.add(&count_key, delta);
549 let count = ctx.count_deltas.projected_count(&ctx.state.db, &count_key);
550 ctx.state
551 .apply_host_limit_status(&mut ctx.batch, host, count);
552 }
553 }
554 }
555
556 let repo_key = keys::repo_key(&account.did);
557
558 #[cfg(feature = "indexer")]
559 {
560 let changed = repo_state.active != was_active || repo_state.status != was_status;
561 ctx.pending_hook_messages
562 .push(crate::ingest::indexer::IndexerMessage::Event(Box::new(
563 crate::ingest::indexer::IndexerEvent {
564 seq: account.seq,
565 firehose: firehose.clone(),
566 data: crate::ingest::indexer::IndexerEventData::Account(
567 crate::ingest::indexer::IndexerAccountData {
568 account,
569 was_active,
570 changed,
571 },
572 ),
573 },
574 )));
575 }
576 #[cfg(feature = "relay")]
577 {
578 ctx.queue_emit(|seq| {
579 account.seq = seq;
580 encode_frame("#account", &account)
581 })?;
582 }
583
584 repo_state.touch();
585 ctx.batch.insert(
586 &ctx.state.db.repos,
587 repo_key,
588 db::ser_repo_state(repo_state)?,
589 );
590
591 Ok(())
592 }
593}
594
595impl WorkerContext<'_> {
596 /// increments host error counter, returns if host is suppressed or not
597 fn inc_error(&mut self, host: &str) -> bool {
598 let error_count = self.error_counts.entry(util::hash(&host)).or_default();
599 let is_suppressed = *error_count > 50;
600 *error_count += 1;
601 is_suppressed
602 }
603
604 fn reset_error(&mut self, host: &str) {
605 if let Some(count) = self.error_counts.get_mut(&util::hash(&host)) {
606 *count = 0;
607 }
608 }
609
610 fn check_host_authority(
611 &mut self,
612 did: &Did,
613 repo_state: &mut RepoState,
614 source_host: &str,
615 ) -> Result<AuthorityOutcome> {
616 let pds_host = |pds: &str| {
617 Url::parse(pds)
618 .ok()
619 .and_then(|u| u.host_str().map(SmolStr::new))
620 };
621
622 let expected = repo_state.pds.as_deref().and_then(pds_host);
623 if expected.as_deref() == Some(source_host) {
624 return Ok(AuthorityOutcome::Authorized);
625 }
626
627 // try again once
628 self.refresh_doc(did, repo_state)?;
629 let Some(expected) = repo_state.pds.as_deref().and_then(pds_host) else {
630 miette::bail!("can't get pds host???");
631 };
632
633 Ok((expected.as_str() == source_host)
634 .then_some(AuthorityOutcome::WasStale)
635 .unwrap_or(AuthorityOutcome::WrongHost { expected }))
636 }
637
638 fn refresh_doc(&mut self, did: &Did, repo_state: &mut RepoState) -> Result<()> {
639 let db = &self.state.db;
640 self.state.resolver.invalidate_sync(did);
641 let doc = Handle::current()
642 .block_on(self.state.resolver.resolve_doc(did))
643 .map_err(|e| miette::miette!("{e}"))?;
644 repo_state.update_from_doc(doc);
645 repo_state.touch();
646
647 self.batch.insert(
648 &db.repos,
649 keys::repo_key(did),
650 db::ser_repo_state(repo_state)?,
651 );
652 Ok(())
653 }
654
655 fn validate_commit<'c>(
656 &mut self,
657 repo_state: &mut RepoState,
658 commit: &'c Commit<'c>,
659 ) -> Result<Option<ValidatedCommit<'c>>> {
660 let did = &commit.repo;
661 let key = self.fetch_key(did)?;
662 match self.vctx.validate_commit(commit, repo_state, key.as_ref()) {
663 Ok(v) => return Ok(Some(v)),
664 Err(CommitValidationError::StaleRev) => {
665 trace!("skipping replayed commit");
666 return Ok(None);
667 }
668 Err(CommitValidationError::SigFailure) => {}
669 Err(e) => {
670 debug!(err = %e, "commit rejected");
671 return Ok(None);
672 }
673 }
674
675 self.refresh_doc(did, repo_state)?;
676 let key = self.fetch_key(did)?;
677 match self.vctx.validate_commit(commit, repo_state, key.as_ref()) {
678 Ok(v) => Ok(Some(v)),
679 Err(e) => {
680 debug!(err = %e, "commit rejected after key refresh");
681 Ok(None)
682 }
683 }
684 }
685
686 fn validate_sync(
687 &mut self,
688 repo_state: &mut RepoState,
689 sync: &Sync<'_>,
690 ) -> Result<Option<ValidatedSync>> {
691 let did = &sync.did;
692 let key = self.fetch_key(did)?;
693 match self.vctx.validate_sync(sync, key.as_ref()) {
694 Ok(v) => return Ok(Some(v)),
695 Err(SyncValidationError::SigFailure) => {}
696 Err(e) => {
697 debug!(err = %e, "sync rejected");
698 return Ok(None);
699 }
700 }
701
702 self.refresh_doc(did, repo_state)?;
703 let key = self.fetch_key(did)?;
704 match self.vctx.validate_sync(sync, key.as_ref()) {
705 Ok(v) => Ok(Some(v)),
706 Err(e) => {
707 debug!(err = %e, "sync rejected after key refresh");
708 Ok(None)
709 }
710 }
711 }
712
713 fn fetch_key(&self, did: &Did) -> Result<Option<PublicKey<'static>>> {
714 if self.verify_signatures {
715 let key = Handle::current()
716 .block_on(self.state.resolver.resolve_signing_key(did))
717 .map_err(|e| miette::miette!("{e}"))?;
718 Ok(Some(key))
719 } else {
720 Ok(None)
721 }
722 }
723
724 /// maps an inactive account status to the corresponding `RepoStatus`.
725 /// panics on `AccountStatus::Deleted`, caller must handle that
726 fn inactive_account_repo_status(
727 &self,
728 did: &Did,
729 status: &Option<AccountStatus<'_>>,
730 ) -> RepoStatus {
731 match status {
732 Some(AccountStatus::Takendown) => RepoStatus::Takendown,
733 Some(AccountStatus::Suspended) => RepoStatus::Suspended,
734 Some(AccountStatus::Deactivated) => RepoStatus::Deactivated,
735 Some(AccountStatus::Throttled) => RepoStatus::Throttled,
736 Some(AccountStatus::Desynchronized) => RepoStatus::Desynchronized,
737 Some(AccountStatus::Other(s)) => {
738 warn!(did = %did, status = %s, "unknown account status");
739 RepoStatus::Error(s.to_smolstr())
740 }
741 Some(AccountStatus::Deleted) => {
742 unreachable!("deleted is handled before status mapping")
743 }
744 None => {
745 warn!(did = %did, "account inactive but no status provided");
746 RepoStatus::Error("unknown".into())
747 }
748 }
749 }
750
751 async fn check_repo_status(
752 &self,
753 did: &Did<'_>,
754 pds: &Url,
755 ) -> Result<Option<RepoState<'static>>> {
756 let req = GetRepoStatus::new().did(did.clone().into_static()).build();
757 let resp = self
758 .http
759 .xrpc(crate::util::url_to_fluent_uri(pds))
760 .send(&req)
761 .await;
762
763 let output = match resp {
764 Err(_) => return Ok(None),
765 Ok(r) => match r.into_output() {
766 Ok(o) => o,
767 Err(XrpcError::Xrpc(GetRepoStatusError::RepoNotFound(_))) => {
768 // pds explicitly says it doesn't have this repo
769 // we shouldnt really get here unless the pds is buggy?
770 // or somehow the repo gets gon right after we receive the event
771 let mut repo_state = RepoState::backfilling();
772 repo_state.active = false;
773 repo_state.status = RepoStatus::Error("not_found".into());
774 return Ok(Some(repo_state));
775 }
776 Err(_) => return Ok(None),
777 },
778 };
779
780 let mut repo_state = RepoState::backfilling();
781 repo_state.active = output.active;
782 repo_state.status = match output.status {
783 Some(GetRepoStatusOutputStatus::Takendown) => RepoStatus::Takendown,
784 Some(GetRepoStatusOutputStatus::Suspended) => RepoStatus::Suspended,
785 Some(GetRepoStatusOutputStatus::Deactivated) => RepoStatus::Deactivated,
786 Some(GetRepoStatusOutputStatus::Deleted) => RepoStatus::Deleted,
787 Some(GetRepoStatusOutputStatus::Desynchronized) => RepoStatus::Desynchronized,
788 Some(GetRepoStatusOutputStatus::Throttled) => RepoStatus::Throttled,
789 Some(GetRepoStatusOutputStatus::Other(s)) => RepoStatus::Error(s.into()),
790 None => output
791 .active
792 .then_some(RepoStatus::Synced)
793 .unwrap_or_else(|| RepoStatus::Error("unknown".into())),
794 };
795
796 Ok(Some(repo_state))
797 }
798
799 fn load_repo_state(&mut self, msg: &WorkerMessage) -> Result<Option<RepoState<'static>>> {
800 let db = &self.state.db;
801 let did = msg.msg.did().expect("we checked if valid");
802 let repo_key = keys::repo_key(did);
803 let metadata_key = keys::repo_metadata_key(did);
804
805 let metadata = db
806 .repo_metadata
807 .get(&metadata_key)
808 .into_diagnostic()?
809 .map(|bytes| db::deser_repo_meta(&bytes))
810 .transpose()?;
811
812 if metadata.map_or(false, |m| !m.tracked) {
813 trace!(did = %did, "ignoring message, repo is explicitly untracked");
814 return Ok(None);
815 }
816
817 let repo_state_opt = db
818 .repos
819 .get(&repo_key)
820 .into_diagnostic()?
821 .map(|bytes| db::deser_repo_state(bytes.as_ref()).map(|s| s.into_static()))
822 .transpose()?;
823
824 if let Some(repo_state) = repo_state_opt {
825 return Ok(Some(repo_state));
826 }
827
828 #[cfg(feature = "indexer")]
829 {
830 let filter = self.state.filter.load();
831 if filter.mode == crate::filter::FilterMode::Filter && !filter.signals.is_empty() {
832 let commit = match &msg.msg {
833 SubscribeReposMessage::Commit(c) => c,
834 _ => return Ok(None),
835 };
836 let touches_signal = commit.ops.iter().any(|op| {
837 op.path
838 .split_once('/')
839 .map(|(col, _)| {
840 let m = filter.matches_signal(col);
841 debug!(
842 did = %did, path = %op.path, col = %col,
843 signals = ?filter.signals, matched = m,
844 "signal check"
845 );
846 m
847 })
848 .unwrap_or(false)
849 });
850 if !touches_signal {
851 trace!(did = %did, "dropping commit, no signal-matching ops");
852 return Ok(None);
853 }
854 }
855 }
856
857 debug!(did = %did, "discovered new account from firehose, queueing backfill");
858
859 // resolve doc to initialize repo state
860 self.state.resolver.invalidate_sync(did);
861 let doc = tokio::runtime::Handle::current()
862 .block_on(self.state.resolver.resolve_doc(did))
863 .into_diagnostic()?;
864
865 // if it's a PDS, verify it's the authoritative one
866 if msg.is_pds {
867 let pds_host = doc.pds.host_str().map(|h| h.to_string());
868 if pds_host.as_deref() != msg.firehose.host_str() {
869 warn!(did = %did, got = ?pds_host, expected = ?msg.firehose.host_str(), "message rejected: wrong host for new account");
870 return Ok(None);
871 }
872
873 if let Some(host) = msg.firehose.host_str() {
874 let count = self.state.db.get_count_sync(&pds_account_count_key(host));
875 if self.state.is_over_account_limit(host, count) {
876 warn!(did = %did, host, count, "account limit reached for host, dropping new account");
877 return Ok(None);
878 }
879 }
880 }
881
882 // try to get upstream status
883 let mut repo_state = tokio::runtime::Handle::current()
884 .block_on(self.check_repo_status(did, &doc.pds))
885 .ok()
886 .flatten()
887 .unwrap_or_else(RepoState::backfilling);
888
889 repo_state.update_from_doc(doc);
890
891 self.batch.insert(
892 &db.repos,
893 &repo_key,
894 crate::db::ser_repo_state(&repo_state)?,
895 );
896
897 #[cfg(feature = "indexer")]
898 {
899 self.pending_hook_messages
900 .push(crate::ingest::indexer::IndexerMessage::NewRepo(
901 did.clone().into_static(),
902 ));
903 }
904
905 self.count_deltas.add("repos", 1);
906
907 // track initial active state for per-PDS rate limiting
908 if msg.is_pds && repo_state.active {
909 if let Some(host) = msg.firehose.host_str() {
910 self.count_deltas.add(&pds_account_count_key(host), 1);
911 }
912 }
913
914 Ok(Some(repo_state))
915 }
916
917 #[cfg(feature = "relay")]
918 fn queue_emit(&mut self, make_frame: impl FnOnce(i64) -> Result<bytes::Bytes>) -> Result<()> {
919 let db = &self.state.db;
920 let seq = db.next_relay_seq.fetch_add(1, Ordering::SeqCst);
921 let frame = make_frame(seq as i64)?;
922 self.batch
923 .insert(&db.relay_events, keys::relay_event_key(seq), frame.as_ref());
924 self.pending_broadcasts
925 .push(RelayBroadcast::Ephemeral(seq, frame));
926 self.pending_broadcasts.push(RelayBroadcast::Persisted(seq));
927 Ok(())
928 }
929}
930
931/// outcome of a host authority check.
932enum AuthorityOutcome {
933 /// stored pds matched the source host immediately.
934 Authorized,
935 /// pds migrated: doc now points to this host, but our stored state was stale.
936 WasStale,
937 /// host did not match even after doc resolution.
938 WrongHost { expected: SmolStr },
939}