very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
59
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 939 lines 33 kB view raw
1use std::collections::HashMap; 2use std::sync::Arc; 3#[cfg(feature = "relay")] 4use std::sync::atomic::Ordering; 5 6use fjall::OwnedWriteBatch; 7 8use jacquard_api::com_atproto::sync::get_repo_status::{ 9 GetRepoStatus, GetRepoStatusError, GetRepoStatusOutputStatus, 10}; 11use jacquard_common::types::crypto::PublicKey; 12use jacquard_common::types::did::Did; 13use jacquard_common::xrpc::{XrpcError, XrpcExt}; 14use jacquard_common::{CowStr, IntoStatic}; 15use miette::{IntoDiagnostic, Result}; 16use tokio::runtime::Handle; 17use tokio::sync::mpsc; 18use tracing::{debug, error, info, info_span, trace, warn}; 19use url::Url; 20 21use crate::db::keys::pds_account_count_key; 22use crate::db::{self, CountDeltas, keys}; 23use crate::ingest::stream::AccountStatus; 24#[cfg(feature = "relay")] 25use crate::ingest::stream::encode_frame; 26use crate::ingest::stream::{Account, Commit, Identity, InfoName, SubscribeReposMessage, Sync}; 27use crate::ingest::validation::{ 28 CommitValidationError, SyncValidationError, ValidatedCommit, ValidatedSync, ValidationContext, 29 ValidationOptions, 30}; 31use crate::ingest::{BufferRx, IngestMessage}; 32use crate::state::AppState; 33#[cfg(feature = "relay")] 34use crate::types::RelayBroadcast; 35use crate::types::{RepoState, RepoStatus}; 36use crate::util; 37use smol_str::{SmolStr, ToSmolStr}; 38 39struct WorkerContext<'a> { 40 verify_signatures: bool, 41 state: &'a AppState, 42 vctx: ValidationContext<'a>, 43 batch: OwnedWriteBatch, 44 count_deltas: CountDeltas, 45 #[cfg(feature = "relay")] 46 pending_broadcasts: Vec<RelayBroadcast>, 47 #[cfg(feature = "indexer")] 48 pending_hook_messages: Vec<crate::ingest::indexer::IndexerMessage>, 49 #[cfg(feature = "indexer")] 50 hook: crate::ingest::indexer::IndexerTx, 51 http: reqwest::Client, 52 error_counts: HashMap<u64, u32, nohash_hasher::BuildNoHashHasher<u64>>, 53} 54 55struct WorkerMessage { 56 is_pds: bool, 57 firehose: Url, 58 msg: SubscribeReposMessage<'static>, 59} 60 61pub struct RelayWorker { 62 state: Arc<AppState>, 63 rx: BufferRx, 64 #[cfg(feature = "indexer")] 65 hook: crate::ingest::indexer::IndexerTx, 66 verify_signatures: bool, 67 num_shards: usize, 68 validation_opts: Arc<ValidationOptions>, 69 http: reqwest::Client, 70} 71 72impl RelayWorker { 73 pub fn new( 74 state: Arc<AppState>, 75 rx: BufferRx, 76 #[cfg(feature = "indexer")] hook: crate::ingest::indexer::IndexerTx, 77 verify_signatures: bool, 78 num_shards: usize, 79 validation_opts: ValidationOptions, 80 ) -> Self { 81 Self { 82 state, 83 rx, 84 #[cfg(feature = "indexer")] 85 hook, 86 verify_signatures, 87 num_shards, 88 validation_opts: Arc::new(validation_opts), 89 http: reqwest::Client::new(), 90 } 91 } 92 93 pub fn run(mut self, handle: Handle) -> Result<()> { 94 let mut shards = Vec::with_capacity(self.num_shards); 95 96 for i in 0..self.num_shards { 97 let (tx, rx) = mpsc::unbounded_channel(); 98 shards.push(tx); 99 100 let state = self.state.clone(); 101 #[cfg(feature = "indexer")] 102 let hook = self.hook.clone(); 103 let verify = self.verify_signatures; 104 let h = handle.clone(); 105 let opts = self.validation_opts.clone(); 106 let http = self.http.clone(); 107 108 std::thread::Builder::new() 109 .name(format!("relay-shard-{i}")) 110 .spawn(move || { 111 Self::shard( 112 i, 113 rx, 114 state, 115 #[cfg(feature = "indexer")] 116 hook, 117 verify, 118 h, 119 opts, 120 http, 121 ); 122 }) 123 .into_diagnostic()?; 124 } 125 126 info!(num = self.num_shards, "relay worker: started shards"); 127 128 let _g = handle.enter(); 129 130 while let Some(msg) = self.rx.blocking_recv() { 131 let IngestMessage::Firehose { url, is_pds, msg } = msg; 132 133 // #info only pertains to us, the direct consumer 134 if let SubscribeReposMessage::Info(inf) = msg { 135 match inf.name { 136 InfoName::OutdatedCursor => { 137 // todo: handle 138 } 139 InfoName::Other(name) => { 140 let message = inf 141 .message 142 .unwrap_or_else(|| CowStr::Borrowed("<no message>")); 143 info!(name = %name, "relay sent info: {message}"); 144 } 145 } 146 continue; 147 } 148 149 let shard_idx = { 150 let did = match &msg { 151 SubscribeReposMessage::Commit(c) => &c.repo, 152 SubscribeReposMessage::Identity(i) => &i.did, 153 SubscribeReposMessage::Account(a) => &a.did, 154 SubscribeReposMessage::Sync(s) => &s.did, 155 _ => continue, 156 }; 157 (util::hash(did) as usize) % self.num_shards 158 }; 159 160 if let Err(e) = shards[shard_idx].send(WorkerMessage { 161 firehose: url, 162 is_pds, 163 msg, 164 }) { 165 error!(shard = shard_idx, err = %e, "relay worker: failed to send to shard"); 166 break; 167 } 168 } 169 170 Err(miette::miette!("relay worker dispatcher shutting down")) 171 } 172 173 fn shard( 174 id: usize, 175 mut rx: mpsc::UnboundedReceiver<WorkerMessage>, 176 state: Arc<AppState>, 177 #[cfg(feature = "indexer")] hook: crate::ingest::indexer::IndexerTx, 178 verify_signatures: bool, 179 handle: Handle, 180 validation_opts: Arc<ValidationOptions>, 181 http: reqwest::Client, 182 ) { 183 let _guard = handle.enter(); 184 let span = info_span!("worker_shard", shard = id); 185 let _entered = span.clone().entered(); 186 debug!("relay shard started"); 187 188 let mut ctx = WorkerContext { 189 verify_signatures, 190 state: &state, 191 vctx: ValidationContext { 192 opts: &validation_opts, 193 }, 194 batch: state.db.inner.batch(), 195 count_deltas: CountDeltas::default(), 196 #[cfg(feature = "relay")] 197 pending_broadcasts: Vec::with_capacity(2), 198 #[cfg(feature = "indexer")] 199 pending_hook_messages: Vec::with_capacity(2), 200 #[cfg(feature = "indexer")] 201 hook, 202 http, 203 error_counts: Default::default(), 204 }; 205 206 while let Some(msg) = rx.blocking_recv() { 207 ctx.count_deltas = CountDeltas::default(); 208 let (did, seq) = match &msg.msg { 209 SubscribeReposMessage::Commit(c) => (c.repo.clone(), c.seq), 210 SubscribeReposMessage::Identity(i) => (i.did.clone(), i.seq), 211 SubscribeReposMessage::Account(a) => (a.did.clone(), a.seq), 212 SubscribeReposMessage::Sync(s) => (s.did.clone(), s.seq), 213 _ => continue, 214 }; 215 216 let firehose = msg.firehose.clone(); 217 let _span = info_span!("relay", did = %did, firehose = %firehose, seq = %seq).entered(); 218 219 if let Err(e) = Self::process_message(&mut ctx, msg) { 220 error!(did = %did, err = %e, "relay shard: error processing message"); 221 } 222 223 let mut batch = std::mem::replace(&mut ctx.batch, ctx.state.db.inner.batch()); 224 let reservation = ctx 225 .state 226 .db 227 .stage_count_deltas(&mut batch, &ctx.count_deltas); 228 let res = batch.commit(); 229 if let Err(e) = res { 230 error!(shard = id, err = %e, "relay shard: failed to commit batch"); 231 drop(reservation); 232 continue; 233 } 234 ctx.state.db.apply_count_deltas(&ctx.count_deltas); 235 drop(reservation); 236 237 #[cfg(feature = "relay")] 238 for broadcast in ctx.pending_broadcasts.drain(..) { 239 let _ = state.db.relay_broadcast_tx.send(broadcast); 240 } 241 #[cfg(feature = "indexer")] 242 for msg in ctx.pending_hook_messages.drain(..) { 243 let _ = ctx.hook.blocking_send(msg); 244 } 245 246 // advance cursor for this firehose only if we are the terminal consumer (relay mode) 247 // in events mode, FirehoseWorker will advance the cursor after processing 248 #[cfg(feature = "relay")] 249 { 250 ctx.state 251 .firehose_cursors 252 .peek_with(&firehose, |_, c| c.store(seq, Ordering::SeqCst)); 253 } 254 } 255 } 256 257 fn process_message(ctx: &mut WorkerContext, msg: WorkerMessage) -> Result<()> { 258 let Some(mut repo_state) = ctx.load_repo_state(&msg)? else { 259 return Ok(()); 260 }; 261 let did = msg.msg.did().expect("already checked for did"); 262 263 if let Some(host) = msg.firehose.host_str() 264 && msg.is_pds 265 { 266 let outcome = ctx.check_host_authority(did, &mut repo_state, host)?; 267 if let AuthorityOutcome::WrongHost { expected } = outcome { 268 if !ctx.inc_error(host) { 269 warn!(got = host, expected = %expected, "message rejected: wrong host"); 270 } 271 return Ok(()); 272 } 273 ctx.reset_error(host); 274 } 275 276 match msg.msg { 277 SubscribeReposMessage::Commit(commit) => { 278 trace!("processing commit"); 279 Self::handle_commit(ctx, &mut repo_state, &msg.firehose, *commit) 280 } 281 SubscribeReposMessage::Sync(sync) => { 282 debug!("processing sync"); 283 Self::handle_sync(ctx, &mut repo_state, &msg.firehose, *sync) 284 } 285 SubscribeReposMessage::Identity(identity) => { 286 debug!("processing identity"); 287 Self::handle_identity(ctx, &mut repo_state, &msg.firehose, *identity, msg.is_pds) 288 } 289 SubscribeReposMessage::Account(account) => { 290 debug!("processing account"); 291 Self::handle_account(ctx, &mut repo_state, &msg.firehose, *account, msg.is_pds) 292 } 293 _ => Ok(()), 294 } 295 } 296 297 fn handle_commit( 298 ctx: &mut WorkerContext, 299 repo_state: &mut RepoState, 300 #[allow(unused_variables)] firehose: &Url, 301 #[allow(unused_mut)] mut commit: Commit<'static>, 302 ) -> Result<()> { 303 if !repo_state.active { 304 return Ok(()); 305 } 306 307 repo_state.advance_message_time(commit.time.0.timestamp_millis()); 308 309 let Some(validated) = ctx.validate_commit(repo_state, &commit)? else { 310 return Ok(()); 311 }; 312 let ValidatedCommit { 313 chain_break, 314 commit_obj, 315 parsed_blocks, 316 .. 317 } = validated; 318 319 #[cfg(not(feature = "indexer"))] 320 let _ = parsed_blocks; 321 322 if chain_break.is_broken() { 323 // chain breaks are not grounds for blocking when acting as a relay 324 debug!(broken = ?chain_break, "chain break, forwarding anyway"); 325 } 326 327 let repo_key = keys::repo_key(&commit.repo); 328 329 #[cfg(feature = "indexer")] 330 { 331 ctx.pending_hook_messages 332 .push(crate::ingest::indexer::IndexerMessage::Event(Box::new( 333 crate::ingest::indexer::IndexerEvent { 334 seq: commit.seq, 335 firehose: firehose.clone(), 336 data: crate::ingest::indexer::IndexerEventData::Commit( 337 crate::ingest::indexer::IndexerCommitData { 338 commit, 339 chain_break: chain_break.is_broken(), 340 parsed_blocks, 341 }, 342 ), 343 }, 344 ))); 345 } 346 #[cfg(feature = "relay")] 347 { 348 ctx.queue_emit(|seq| { 349 commit.seq = seq; 350 encode_frame("#commit", &commit) 351 })?; 352 } 353 354 repo_state.root = Some(commit_obj.into()); 355 repo_state.touch(); 356 ctx.batch.insert( 357 &ctx.state.db.repos, 358 repo_key, 359 db::ser_repo_state(repo_state)?, 360 ); 361 362 Ok(()) 363 } 364 365 fn handle_sync( 366 ctx: &mut WorkerContext, 367 repo_state: &mut RepoState, 368 #[allow(unused_variables)] firehose: &Url, 369 #[allow(unused_mut)] mut sync: Sync<'static>, 370 ) -> Result<()> { 371 if !repo_state.active { 372 return Ok(()); 373 } 374 375 repo_state.advance_message_time(sync.time.0.timestamp_millis()); 376 377 let Some(validated) = ctx.validate_sync(repo_state, &sync)? else { 378 return Ok(()); 379 }; 380 381 let repo_key = keys::repo_key(&sync.did); 382 383 #[cfg(feature = "indexer")] 384 { 385 ctx.pending_hook_messages 386 .push(crate::ingest::indexer::IndexerMessage::Event(Box::new( 387 crate::ingest::indexer::IndexerEvent { 388 seq: sync.seq, 389 firehose: firehose.clone(), 390 data: crate::ingest::indexer::IndexerEventData::Sync( 391 sync.did.into_static(), 392 ), 393 }, 394 ))); 395 } 396 #[cfg(feature = "relay")] 397 { 398 ctx.queue_emit(|seq| { 399 sync.seq = seq; 400 encode_frame("#sync", &sync) 401 })?; 402 } 403 404 repo_state.root = Some(validated.commit_obj.into()); 405 repo_state.touch(); 406 ctx.batch.insert( 407 &ctx.state.db.repos, 408 repo_key, 409 db::ser_repo_state(repo_state)?, 410 ); 411 412 Ok(()) 413 } 414 415 fn handle_identity( 416 ctx: &mut WorkerContext, 417 repo_state: &mut RepoState, 418 #[allow(unused_variables)] firehose: &Url, 419 mut identity: Identity<'static>, 420 is_pds: bool, 421 ) -> Result<()> { 422 let event_ms = identity.time.0.timestamp_millis(); 423 if repo_state.last_message_time.is_some_and(|t| event_ms <= t) { 424 debug!("skipping stale/duplicate identity event"); 425 return Ok(()); 426 } 427 repo_state.advance_message_time(event_ms); 428 429 #[cfg(feature = "indexer")] 430 let (was_handle, was_signing_key) = ( 431 repo_state.handle.clone().map(IntoStatic::into_static), 432 repo_state.signing_key.clone().map(IntoStatic::into_static), 433 ); 434 435 // refresh did doc if a pds sent this event 436 // or if there is no handle specified 437 if is_pds || identity.handle.is_none() { 438 ctx.state.resolver.invalidate_sync(&identity.did); 439 let doc = Handle::current().block_on(ctx.state.resolver.resolve_doc(&identity.did)); 440 match doc { 441 Ok(doc) => { 442 repo_state.update_from_doc(doc); 443 } 444 Err(err) => { 445 warn!(err = %err, "couldnt fetch identity"); 446 } 447 } 448 } 449 450 // don't pass handle through if it doesnt match ours for pds events 451 if is_pds && repo_state.handle != identity.handle { 452 identity.handle = None; 453 } 454 455 let repo_key = keys::repo_key(&identity.did); 456 457 #[cfg(feature = "indexer")] 458 { 459 let changed = 460 repo_state.handle != was_handle || repo_state.signing_key != was_signing_key; 461 ctx.pending_hook_messages 462 .push(crate::ingest::indexer::IndexerMessage::Event(Box::new( 463 crate::ingest::indexer::IndexerEvent { 464 seq: identity.seq, 465 firehose: firehose.clone(), 466 data: crate::ingest::indexer::IndexerEventData::Identity( 467 crate::ingest::indexer::IndexerIdentityData { identity, changed }, 468 ), 469 }, 470 ))); 471 } 472 #[cfg(feature = "relay")] 473 { 474 ctx.queue_emit(|seq| { 475 identity.seq = seq; 476 encode_frame("#identity", &identity) 477 })?; 478 } 479 480 ctx.batch.insert( 481 &ctx.state.db.repos, 482 repo_key, 483 db::ser_repo_state(repo_state)?, 484 ); 485 486 Ok(()) 487 } 488 489 fn handle_account( 490 ctx: &mut WorkerContext, 491 repo_state: &mut RepoState, 492 firehose: &Url, 493 #[allow(unused_mut)] mut account: Account<'static>, 494 is_pds: bool, 495 ) -> Result<()> { 496 let event_ms = account.time.0.timestamp_millis(); 497 if repo_state.last_message_time.is_some_and(|t| event_ms <= t) { 498 debug!("skipping stale/duplicate account event"); 499 return Ok(()); 500 } 501 502 repo_state.advance_message_time(event_ms); 503 504 // always capture was_active for count tracking, not just in indexer mode 505 let was_active = repo_state.active; 506 #[cfg(feature = "indexer")] 507 let was_status = repo_state.status.clone(); 508 509 repo_state.active = account.active; 510 if !account.active { 511 use crate::ingest::stream::AccountStatus; 512 match &account.status { 513 Some(AccountStatus::Deleted) => { 514 // keep a Deleted tombstone so any stale commits that arrive later 515 // (e.g. from the upstream backfill window) are not forwarded. 516 // per spec: "if any further #commit messages are emitted for the repo, 517 // all downstream services should ignore the event and not pass it through." 518 repo_state.status = RepoStatus::Deleted; 519 } 520 status => { 521 repo_state.status = ctx.inactive_account_repo_status(&account.did, status); 522 } 523 } 524 } else { 525 // active=true: desynchronized/throttled may still carry active=true per spec. 526 // anything else (including unknown statuses) is treated as synced. 527 use crate::ingest::stream::AccountStatus; 528 repo_state.status = match &account.status { 529 Some(AccountStatus::Desynchronized) => RepoStatus::Desynchronized, 530 Some(AccountStatus::Throttled) => RepoStatus::Throttled, 531 _ => RepoStatus::Synced, 532 }; 533 } 534 535 // update per-PDS active account count on transitions 536 if is_pds { 537 if let Some(host) = firehose.host_str() { 538 let count_key = pds_account_count_key(host); 539 let delta = if !was_active && repo_state.active { 540 1 541 } else if was_active && !repo_state.active { 542 -1 543 } else { 544 0 545 }; 546 547 if delta != 0 { 548 ctx.count_deltas.add(&count_key, delta); 549 let count = ctx.count_deltas.projected_count(&ctx.state.db, &count_key); 550 ctx.state 551 .apply_host_limit_status(&mut ctx.batch, host, count); 552 } 553 } 554 } 555 556 let repo_key = keys::repo_key(&account.did); 557 558 #[cfg(feature = "indexer")] 559 { 560 let changed = repo_state.active != was_active || repo_state.status != was_status; 561 ctx.pending_hook_messages 562 .push(crate::ingest::indexer::IndexerMessage::Event(Box::new( 563 crate::ingest::indexer::IndexerEvent { 564 seq: account.seq, 565 firehose: firehose.clone(), 566 data: crate::ingest::indexer::IndexerEventData::Account( 567 crate::ingest::indexer::IndexerAccountData { 568 account, 569 was_active, 570 changed, 571 }, 572 ), 573 }, 574 ))); 575 } 576 #[cfg(feature = "relay")] 577 { 578 ctx.queue_emit(|seq| { 579 account.seq = seq; 580 encode_frame("#account", &account) 581 })?; 582 } 583 584 repo_state.touch(); 585 ctx.batch.insert( 586 &ctx.state.db.repos, 587 repo_key, 588 db::ser_repo_state(repo_state)?, 589 ); 590 591 Ok(()) 592 } 593} 594 595impl WorkerContext<'_> { 596 /// increments host error counter, returns if host is suppressed or not 597 fn inc_error(&mut self, host: &str) -> bool { 598 let error_count = self.error_counts.entry(util::hash(&host)).or_default(); 599 let is_suppressed = *error_count > 50; 600 *error_count += 1; 601 is_suppressed 602 } 603 604 fn reset_error(&mut self, host: &str) { 605 if let Some(count) = self.error_counts.get_mut(&util::hash(&host)) { 606 *count = 0; 607 } 608 } 609 610 fn check_host_authority( 611 &mut self, 612 did: &Did, 613 repo_state: &mut RepoState, 614 source_host: &str, 615 ) -> Result<AuthorityOutcome> { 616 let pds_host = |pds: &str| { 617 Url::parse(pds) 618 .ok() 619 .and_then(|u| u.host_str().map(SmolStr::new)) 620 }; 621 622 let expected = repo_state.pds.as_deref().and_then(pds_host); 623 if expected.as_deref() == Some(source_host) { 624 return Ok(AuthorityOutcome::Authorized); 625 } 626 627 // try again once 628 self.refresh_doc(did, repo_state)?; 629 let Some(expected) = repo_state.pds.as_deref().and_then(pds_host) else { 630 miette::bail!("can't get pds host???"); 631 }; 632 633 Ok((expected.as_str() == source_host) 634 .then_some(AuthorityOutcome::WasStale) 635 .unwrap_or(AuthorityOutcome::WrongHost { expected })) 636 } 637 638 fn refresh_doc(&mut self, did: &Did, repo_state: &mut RepoState) -> Result<()> { 639 let db = &self.state.db; 640 self.state.resolver.invalidate_sync(did); 641 let doc = Handle::current() 642 .block_on(self.state.resolver.resolve_doc(did)) 643 .map_err(|e| miette::miette!("{e}"))?; 644 repo_state.update_from_doc(doc); 645 repo_state.touch(); 646 647 self.batch.insert( 648 &db.repos, 649 keys::repo_key(did), 650 db::ser_repo_state(repo_state)?, 651 ); 652 Ok(()) 653 } 654 655 fn validate_commit<'c>( 656 &mut self, 657 repo_state: &mut RepoState, 658 commit: &'c Commit<'c>, 659 ) -> Result<Option<ValidatedCommit<'c>>> { 660 let did = &commit.repo; 661 let key = self.fetch_key(did)?; 662 match self.vctx.validate_commit(commit, repo_state, key.as_ref()) { 663 Ok(v) => return Ok(Some(v)), 664 Err(CommitValidationError::StaleRev) => { 665 trace!("skipping replayed commit"); 666 return Ok(None); 667 } 668 Err(CommitValidationError::SigFailure) => {} 669 Err(e) => { 670 debug!(err = %e, "commit rejected"); 671 return Ok(None); 672 } 673 } 674 675 self.refresh_doc(did, repo_state)?; 676 let key = self.fetch_key(did)?; 677 match self.vctx.validate_commit(commit, repo_state, key.as_ref()) { 678 Ok(v) => Ok(Some(v)), 679 Err(e) => { 680 debug!(err = %e, "commit rejected after key refresh"); 681 Ok(None) 682 } 683 } 684 } 685 686 fn validate_sync( 687 &mut self, 688 repo_state: &mut RepoState, 689 sync: &Sync<'_>, 690 ) -> Result<Option<ValidatedSync>> { 691 let did = &sync.did; 692 let key = self.fetch_key(did)?; 693 match self.vctx.validate_sync(sync, key.as_ref()) { 694 Ok(v) => return Ok(Some(v)), 695 Err(SyncValidationError::SigFailure) => {} 696 Err(e) => { 697 debug!(err = %e, "sync rejected"); 698 return Ok(None); 699 } 700 } 701 702 self.refresh_doc(did, repo_state)?; 703 let key = self.fetch_key(did)?; 704 match self.vctx.validate_sync(sync, key.as_ref()) { 705 Ok(v) => Ok(Some(v)), 706 Err(e) => { 707 debug!(err = %e, "sync rejected after key refresh"); 708 Ok(None) 709 } 710 } 711 } 712 713 fn fetch_key(&self, did: &Did) -> Result<Option<PublicKey<'static>>> { 714 if self.verify_signatures { 715 let key = Handle::current() 716 .block_on(self.state.resolver.resolve_signing_key(did)) 717 .map_err(|e| miette::miette!("{e}"))?; 718 Ok(Some(key)) 719 } else { 720 Ok(None) 721 } 722 } 723 724 /// maps an inactive account status to the corresponding `RepoStatus`. 725 /// panics on `AccountStatus::Deleted`, caller must handle that 726 fn inactive_account_repo_status( 727 &self, 728 did: &Did, 729 status: &Option<AccountStatus<'_>>, 730 ) -> RepoStatus { 731 match status { 732 Some(AccountStatus::Takendown) => RepoStatus::Takendown, 733 Some(AccountStatus::Suspended) => RepoStatus::Suspended, 734 Some(AccountStatus::Deactivated) => RepoStatus::Deactivated, 735 Some(AccountStatus::Throttled) => RepoStatus::Throttled, 736 Some(AccountStatus::Desynchronized) => RepoStatus::Desynchronized, 737 Some(AccountStatus::Other(s)) => { 738 warn!(did = %did, status = %s, "unknown account status"); 739 RepoStatus::Error(s.to_smolstr()) 740 } 741 Some(AccountStatus::Deleted) => { 742 unreachable!("deleted is handled before status mapping") 743 } 744 None => { 745 warn!(did = %did, "account inactive but no status provided"); 746 RepoStatus::Error("unknown".into()) 747 } 748 } 749 } 750 751 async fn check_repo_status( 752 &self, 753 did: &Did<'_>, 754 pds: &Url, 755 ) -> Result<Option<RepoState<'static>>> { 756 let req = GetRepoStatus::new().did(did.clone().into_static()).build(); 757 let resp = self 758 .http 759 .xrpc(crate::util::url_to_fluent_uri(pds)) 760 .send(&req) 761 .await; 762 763 let output = match resp { 764 Err(_) => return Ok(None), 765 Ok(r) => match r.into_output() { 766 Ok(o) => o, 767 Err(XrpcError::Xrpc(GetRepoStatusError::RepoNotFound(_))) => { 768 // pds explicitly says it doesn't have this repo 769 // we shouldnt really get here unless the pds is buggy? 770 // or somehow the repo gets gon right after we receive the event 771 let mut repo_state = RepoState::backfilling(); 772 repo_state.active = false; 773 repo_state.status = RepoStatus::Error("not_found".into()); 774 return Ok(Some(repo_state)); 775 } 776 Err(_) => return Ok(None), 777 }, 778 }; 779 780 let mut repo_state = RepoState::backfilling(); 781 repo_state.active = output.active; 782 repo_state.status = match output.status { 783 Some(GetRepoStatusOutputStatus::Takendown) => RepoStatus::Takendown, 784 Some(GetRepoStatusOutputStatus::Suspended) => RepoStatus::Suspended, 785 Some(GetRepoStatusOutputStatus::Deactivated) => RepoStatus::Deactivated, 786 Some(GetRepoStatusOutputStatus::Deleted) => RepoStatus::Deleted, 787 Some(GetRepoStatusOutputStatus::Desynchronized) => RepoStatus::Desynchronized, 788 Some(GetRepoStatusOutputStatus::Throttled) => RepoStatus::Throttled, 789 Some(GetRepoStatusOutputStatus::Other(s)) => RepoStatus::Error(s.into()), 790 None => output 791 .active 792 .then_some(RepoStatus::Synced) 793 .unwrap_or_else(|| RepoStatus::Error("unknown".into())), 794 }; 795 796 Ok(Some(repo_state)) 797 } 798 799 fn load_repo_state(&mut self, msg: &WorkerMessage) -> Result<Option<RepoState<'static>>> { 800 let db = &self.state.db; 801 let did = msg.msg.did().expect("we checked if valid"); 802 let repo_key = keys::repo_key(did); 803 let metadata_key = keys::repo_metadata_key(did); 804 805 let metadata = db 806 .repo_metadata 807 .get(&metadata_key) 808 .into_diagnostic()? 809 .map(|bytes| db::deser_repo_meta(&bytes)) 810 .transpose()?; 811 812 if metadata.map_or(false, |m| !m.tracked) { 813 trace!(did = %did, "ignoring message, repo is explicitly untracked"); 814 return Ok(None); 815 } 816 817 let repo_state_opt = db 818 .repos 819 .get(&repo_key) 820 .into_diagnostic()? 821 .map(|bytes| db::deser_repo_state(bytes.as_ref()).map(|s| s.into_static())) 822 .transpose()?; 823 824 if let Some(repo_state) = repo_state_opt { 825 return Ok(Some(repo_state)); 826 } 827 828 #[cfg(feature = "indexer")] 829 { 830 let filter = self.state.filter.load(); 831 if filter.mode == crate::filter::FilterMode::Filter && !filter.signals.is_empty() { 832 let commit = match &msg.msg { 833 SubscribeReposMessage::Commit(c) => c, 834 _ => return Ok(None), 835 }; 836 let touches_signal = commit.ops.iter().any(|op| { 837 op.path 838 .split_once('/') 839 .map(|(col, _)| { 840 let m = filter.matches_signal(col); 841 debug!( 842 did = %did, path = %op.path, col = %col, 843 signals = ?filter.signals, matched = m, 844 "signal check" 845 ); 846 m 847 }) 848 .unwrap_or(false) 849 }); 850 if !touches_signal { 851 trace!(did = %did, "dropping commit, no signal-matching ops"); 852 return Ok(None); 853 } 854 } 855 } 856 857 debug!(did = %did, "discovered new account from firehose, queueing backfill"); 858 859 // resolve doc to initialize repo state 860 self.state.resolver.invalidate_sync(did); 861 let doc = tokio::runtime::Handle::current() 862 .block_on(self.state.resolver.resolve_doc(did)) 863 .into_diagnostic()?; 864 865 // if it's a PDS, verify it's the authoritative one 866 if msg.is_pds { 867 let pds_host = doc.pds.host_str().map(|h| h.to_string()); 868 if pds_host.as_deref() != msg.firehose.host_str() { 869 warn!(did = %did, got = ?pds_host, expected = ?msg.firehose.host_str(), "message rejected: wrong host for new account"); 870 return Ok(None); 871 } 872 873 if let Some(host) = msg.firehose.host_str() { 874 let count = self.state.db.get_count_sync(&pds_account_count_key(host)); 875 if self.state.is_over_account_limit(host, count) { 876 warn!(did = %did, host, count, "account limit reached for host, dropping new account"); 877 return Ok(None); 878 } 879 } 880 } 881 882 // try to get upstream status 883 let mut repo_state = tokio::runtime::Handle::current() 884 .block_on(self.check_repo_status(did, &doc.pds)) 885 .ok() 886 .flatten() 887 .unwrap_or_else(RepoState::backfilling); 888 889 repo_state.update_from_doc(doc); 890 891 self.batch.insert( 892 &db.repos, 893 &repo_key, 894 crate::db::ser_repo_state(&repo_state)?, 895 ); 896 897 #[cfg(feature = "indexer")] 898 { 899 self.pending_hook_messages 900 .push(crate::ingest::indexer::IndexerMessage::NewRepo( 901 did.clone().into_static(), 902 )); 903 } 904 905 self.count_deltas.add("repos", 1); 906 907 // track initial active state for per-PDS rate limiting 908 if msg.is_pds && repo_state.active { 909 if let Some(host) = msg.firehose.host_str() { 910 self.count_deltas.add(&pds_account_count_key(host), 1); 911 } 912 } 913 914 Ok(Some(repo_state)) 915 } 916 917 #[cfg(feature = "relay")] 918 fn queue_emit(&mut self, make_frame: impl FnOnce(i64) -> Result<bytes::Bytes>) -> Result<()> { 919 let db = &self.state.db; 920 let seq = db.next_relay_seq.fetch_add(1, Ordering::SeqCst); 921 let frame = make_frame(seq as i64)?; 922 self.batch 923 .insert(&db.relay_events, keys::relay_event_key(seq), frame.as_ref()); 924 self.pending_broadcasts 925 .push(RelayBroadcast::Ephemeral(seq, frame)); 926 self.pending_broadcasts.push(RelayBroadcast::Persisted(seq)); 927 Ok(()) 928 } 929} 930 931/// outcome of a host authority check. 932enum AuthorityOutcome { 933 /// stored pds matched the source host immediately. 934 Authorized, 935 /// pds migrated: doc now points to this host, but our stored state was stale. 936 WasStale, 937 /// host did not match even after doc resolution. 938 WrongHost { expected: SmolStr }, 939}