very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
60
fork

Configure Feed

Select the types of activity you want to include in your feed.

[ingest] host authority validation for pds'

dawn 676f8f52 697ef37b

+325 -50
+21 -6
README.md
··· 64 64 events are de-duplicated using the `time` field. todo: decide what to do on 65 65 relay-side account takedowns or if relays set the `time` field. 66 66 67 + #### direct PDS connections 68 + 69 + a firehose source can also be a direct connection to a PDS rather than a relay. 70 + prefix the URL with `pds::` to mark it as such: 71 + 72 + ``` 73 + HYDRANT_RELAY_HOSTS=wss://bsky.network,pds::wss://pds.example.com 74 + ``` 75 + 76 + only when a source is marked as a direct PDS (`is_pds: true`), hydrant enforces 77 + host authority. relays (`is_pds: false`, the default) are exempt from this check, 78 + since they forward commits from many PDSes by design. this means you will trust 79 + the relay on this though. 80 + 67 81 ### crawler sources 68 82 69 83 <small>[<- back to toc](#table-of-contents)</small> ··· 104 118 | `DATABASE_PATH` | `./hydrant.db` | path to the database folder. | 105 119 | `RUST_LOG` | `info` | log filter directives (e.g., `debug`, `hydrant=trace`). [`tracing` env-filter syntax](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html). | 106 120 | `RELAY_HOST` | `wss://relay.fire.hose.cam/` | URL of the relay (firehose only). | 107 - | `RELAY_HOSTS` | | comma-separated list of relay URLs (firehose only). if unset, falls back to `RELAY_HOST`. | 121 + | `RELAY_HOSTS` | | comma-separated list of firehose sources (firehose only). if unset, falls back to `RELAY_HOST`. prefix a URL with `pds::` to mark it as a direct PDS connection (e.g. `pds::wss://pds.example.com`). bare URLs are treated as relays. | 108 122 | `CRAWLER_URLS` | relay hosts in full-network mode, `https://lightrail.microcosm.blue` in filter mode | comma-separated list of `[mode::]url` crawler sources. mode is `relay` or `by_collection`; bare URLs use the default mode. set to empty string to disable crawling. | 109 123 | `PLC_URL` | `https://plc.wtf`, `https://plc.directory` if full network | base URL(s) of the PLC directory (comma-separated for multiple). | 110 124 | `EPHEMERAL` | `false` | if enabled, no records are stored. events are deleted after a certain duration (`EPHEMERAL_TTL`). | ··· 233 247 234 248 <small>[<- back to toc](#table-of-contents)</small> 235 249 236 - - `GET /firehose/sources`: list all currently active firehose relay sources. 237 - - returns a JSON array of `{ "url": string, "persisted": bool }`. 250 + - `GET /firehose/sources`: list all currently active firehose sources. 251 + - returns a JSON array of `{ "url": string, "persisted": bool, "is_pds": bool }`. 238 252 - `persisted: true` means the source was added via the API and is stored in the 239 253 database, it will survive a restart. `persisted: false` means the source 240 254 came from `RELAY_HOSTS` and is not written to the database. 241 - - `POST /firehose/sources`: add a firehose relay at runtime. 242 - - body: `{ "url": string }`. 255 + - `is_pds: true` means the source is a direct PDS connection with host authority enforcement enabled. 256 + - `POST /firehose/sources`: add a firehose source at runtime. 257 + - body: `{ "url": string, "is_pds": bool }`. `is_pds` defaults to `false`. 243 258 - the source is persisted to the database before the ingestor task is started. 244 - - if a relay with the same URL already exists, it is replaced: the running 259 + - if a source with the same URL already exists, it is replaced: the running 245 260 task is stopped and a new one is started. any existing cursor state for that 246 261 URL is preserved. 247 262 - returns `201 Created` on success.
+5 -1
src/api/firehose.rs
··· 24 24 #[derive(Deserialize)] 25 25 pub struct AddSourceRequest { 26 26 pub url: Url, 27 + /// true to treat this as a direct PDS connection; enables host authority enforcement. 28 + /// defaults to false (aggregating relay). 29 + #[serde(default)] 30 + pub is_pds: bool, 27 31 } 28 32 29 33 pub async fn add_source( ··· 32 36 ) -> Result<StatusCode, (StatusCode, String)> { 33 37 hydrant 34 38 .firehose 35 - .add_source(body.url) 39 + .add_source(body.url, body.is_pds) 36 40 .await 37 41 .map(|_| StatusCode::CREATED) 38 42 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))
+60 -11
src/config.rs
··· 122 122 pub mode: CrawlerMode, 123 123 } 124 124 125 + /// a single firehose source: a URL and whether it is a direct PDS connection. 126 + /// 127 + /// set via `HYDRANT_RELAY_HOSTS` as a comma-separated list of `[pds::]url` entries. 128 + /// e.g. `wss://bsky.network,pds::wss://pds.example.com`. 129 + /// a bare URL (no `pds::` prefix) is treated as an aggregating relay (`is_pds = false`). 130 + #[derive(Debug, Clone)] 131 + pub struct FirehoseSource { 132 + pub url: Url, 133 + /// true when this is a direct PDS connection; enables host authority enforcement. 134 + pub is_pds: bool, 135 + } 136 + 137 + impl FirehoseSource { 138 + /// parse `[pds::]url`. the `pds::` prefix marks the source as a direct PDS connection. 139 + pub fn parse(s: &str) -> Option<Self> { 140 + if let Some(url_str) = s.strip_prefix("pds::") { 141 + let url = Url::parse(url_str).ok()?; 142 + Some(Self { url, is_pds: true }) 143 + } else { 144 + let url = Url::parse(s).ok()?; 145 + Some(Self { url, is_pds: false }) 146 + } 147 + } 148 + } 149 + 125 150 impl CrawlerSource { 126 151 /// parse `[mode::]url`. mode prefix is optional, falls back to `default_mode`. 127 152 fn parse(s: &str, default_mode: CrawlerMode) -> Option<Self> { ··· 214 239 /// set via `HYDRANT_EPHEMERAL_TTL` (humantime duration, e.g. `60min`). 215 240 pub ephemeral_ttl: Duration, 216 241 217 - /// relay URLs used for firehose ingestion. set via `HYDRANT_RELAY_HOST` (single) 242 + /// firehose sources for ingestion. set via `HYDRANT_RELAY_HOST` (single) 218 243 /// or `HYDRANT_RELAY_HOSTS` (comma-separated; takes precedence). 219 - pub relays: Vec<Url>, 244 + /// prefix a URL with `pds::` to mark it as a direct PDS connection. 245 + pub relays: Vec<FirehoseSource>, 220 246 /// base URL(s) of the PLC directory (comma-separated for multiple). 221 247 /// defaults to `https://plc.wtf`, or `https://plc.directory` in full-network mode. 222 248 /// set via `HYDRANT_PLC_URL`. ··· 337 363 full_network: false, 338 364 ephemeral: false, 339 365 ephemeral_ttl: Duration::from_secs(3600), 340 - relays: vec![Url::parse("wss://relay.fire.hose.cam/").unwrap()], 366 + relays: vec![FirehoseSource { 367 + url: Url::parse("wss://relay.fire.hose.cam/").unwrap(), 368 + is_pds: false, 369 + }], 341 370 plc_urls: vec![Url::parse("https://plc.wtf").unwrap()], 342 371 enable_firehose: true, 343 372 firehose_workers: 8, ··· 412 441 let s = s.trim(); 413 442 (!s.is_empty()) 414 443 .then(|| { 415 - Url::parse(s) 416 - .inspect_err(|e| tracing::warn!("invalid relay host URL: {e}")) 417 - .ok() 444 + FirehoseSource::parse(s).or_else(|| { 445 + tracing::warn!("invalid relay host URL: {s}"); 446 + None 447 + }) 418 448 }) 419 449 .flatten() 420 450 }) 421 451 .collect(), 422 452 // HYDRANT_RELAY_HOSTS explicitly set to "" 423 453 Ok(_) => vec![], 424 - // not set at all, fall back to RELAY_HOST 425 - Err(_) => vec![cfg!("RELAY_HOST", defaults.relays[0].clone())], 454 + // not set at all, fall back to RELAY_HOST (bare URL, no pds:: prefix support here) 455 + Err(_) => match std::env::var("HYDRANT_RELAY_HOST") { 456 + Ok(s) if !s.trim().is_empty() => { 457 + FirehoseSource::parse(s.trim()).into_iter().collect() 458 + } 459 + _ => defaults.relays.clone(), 460 + }, 426 461 }; 427 462 428 463 let plc_urls: Vec<Url> = std::env::var("HYDRANT_PLC_URL") ··· 525 560 Err(_) => match default_mode { 526 561 CrawlerMode::ListRepos => relay_hosts 527 562 .iter() 528 - .map(|url| CrawlerSource { 529 - url: url.clone(), 563 + .map(|source| CrawlerSource { 564 + url: source.url.clone(), 530 565 mode: CrawlerMode::ListRepos, 531 566 }) 532 567 .collect(), ··· 582 617 const LABEL_WIDTH: usize = 27; 583 618 584 619 writeln!(f, "hydrant configuration:")?; 585 - config_line!(f, "relay hosts", format_args!("{:?}", self.relays))?; 620 + config_line!( 621 + f, 622 + "relay hosts", 623 + format_args!( 624 + "{:?}", 625 + self.relays 626 + .iter() 627 + .map(|s| if s.is_pds { 628 + format!("pds::{}", s.url) 629 + } else { 630 + s.url.to_string() 631 + }) 632 + .collect::<Vec<_>>() 633 + ) 634 + )?; 586 635 config_line!(f, "plc urls", format_args!("{:?}", self.plc_urls))?; 587 636 config_line!(f, "full network indexing", self.full_network)?; 588 637 config_line!(f, "verify signatures", self.verify_signatures)?;
+14 -6
src/control/firehose.rs
··· 12 12 13 13 pub(super) struct FirehoseIngestorHandle { 14 14 abort: tokio::task::AbortHandle, 15 + pub(super) is_pds: bool, 15 16 } 16 17 17 18 impl Drop for FirehoseIngestorHandle { ··· 31 32 pub url: Url, 32 33 /// true if added via the API and persisted to the database; false for `RELAY_HOSTS` sources. 33 34 pub persisted: bool, 35 + /// true when this is a direct PDS connection; enables host authority enforcement. 36 + pub is_pds: bool, 34 37 } 35 38 36 39 pub(super) async fn spawn_firehose_ingestor( 37 40 relay_url: &Url, 41 + is_pds: bool, 38 42 state: &Arc<AppState>, 39 43 shared: &FirehoseShared, 40 44 enabled: watch::Receiver<bool>, ··· 48 52 .insert_async(relay_url.clone(), AtomicI64::new(start.unwrap_or(0))) 49 53 .await; 50 54 51 - info!(relay = %relay_url, cursor = ?start, "starting firehose ingestor"); 55 + info!(relay = %relay_url, is_pds, cursor = ?start, "starting firehose ingestor"); 52 56 53 57 let ingestor = FirehoseIngestor::new( 54 58 state.clone(), 55 59 shared.buffer_tx.clone(), 56 60 relay_url.clone(), 61 + is_pds, 57 62 state.filter.clone(), 58 63 enabled, 59 64 shared.verify_signatures, ··· 67 72 }) 68 73 .abort_handle(); 69 74 70 - Ok(FirehoseIngestorHandle { abort }) 75 + Ok(FirehoseIngestorHandle { abort, is_pds }) 71 76 } 72 77 73 78 /// runtime control over the firehose ingestor component. ··· 120 125 pub async fn list_sources(&self) -> Vec<FirehoseSourceInfo> { 121 126 let mut sources = Vec::new(); 122 127 self.tasks 123 - .iter_async(|url, _| { 128 + .iter_async(|url, handle| { 124 129 sources.push(FirehoseSourceInfo { 125 130 url: url.clone(), 126 131 persisted: self.persisted.contains_sync(url), 132 + is_pds: handle.is_pds, 127 133 }); 128 134 true 129 135 }) ··· 138 144 /// is started. any cursor state for that URL is preserved. 139 145 /// 140 146 /// returns an error if called before [`Hydrant::run`]. 141 - pub async fn add_source(&self, url: Url) -> Result<()> { 147 + pub async fn add_source(&self, url: Url, is_pds: bool) -> Result<()> { 142 148 let Some(shared) = self.shared.get() else { 143 149 miette::bail!("firehose not yet started: call Hydrant::run() first"); 144 150 }; 145 151 146 152 let db = self.state.db.clone(); 147 153 let key = keys::firehose_source_key(url.as_str()); 148 - tokio::task::spawn_blocking(move || db.crawler.insert(key, b"").into_diagnostic()) 154 + let value = rmp_serde::to_vec(&crate::db::FirehoseSourceMeta { is_pds }) 155 + .map_err(|e| miette::miette!("failed to serialize firehose source meta: {e}"))?; 156 + tokio::task::spawn_blocking(move || db.crawler.insert(key, value).into_diagnostic()) 149 157 .await 150 158 .into_diagnostic()??; 151 159 152 160 let enabled_rx = self.state.firehose_enabled.subscribe(); 153 - let handle = spawn_firehose_ingestor(&url, &self.state, shared, enabled_rx).await?; 161 + let handle = spawn_firehose_ingestor(&url, is_pds, &self.state, shared, enabled_rx).await?; 154 162 155 163 let _ = self.persisted.insert_async(url.clone()).await; 156 164 match self.tasks.entry_async(url).await {
+30 -12
src/control/mod.rs
··· 350 350 relay_count = relay_hosts.len(), 351 351 hosts = relay_hosts 352 352 .iter() 353 - .map(|h| h.as_str()) 353 + .map(|h| h.url.as_str()) 354 354 .collect::<Vec<_>>() 355 355 .join(", "), 356 356 "starting firehose ingestor(s)" 357 357 ); 358 - for relay_url in &relay_hosts { 358 + for source in &relay_hosts { 359 359 let enabled_rx = state.firehose_enabled.subscribe(); 360 - let handle = 361 - spawn_firehose_ingestor(relay_url, &state, fire_shared, enabled_rx).await?; 362 - let _ = firehose.tasks.insert_async(relay_url.clone(), handle).await; 360 + let handle = spawn_firehose_ingestor( 361 + &source.url, 362 + source.is_pds, 363 + &state, 364 + fire_shared, 365 + enabled_rx, 366 + ) 367 + .await?; 368 + let _ = firehose 369 + .tasks 370 + .insert_async(source.url.clone(), handle) 371 + .await; 363 372 } 364 373 } 365 374 366 - let persisted_relay_urls = tokio::task::spawn_blocking({ 375 + let persisted_sources = tokio::task::spawn_blocking({ 367 376 let state = state.clone(); 368 377 move || load_persisted_firehose_sources(&state.db) 369 378 }) 370 379 .await 371 380 .into_diagnostic()??; 372 381 373 - for relay_url in &persisted_relay_urls { 374 - let _ = firehose.persisted.insert_async(relay_url.clone()).await; 375 - if firehose.tasks.contains_async(relay_url).await { 382 + for source in &persisted_sources { 383 + let _ = firehose.persisted.insert_async(source.url.clone()).await; 384 + if firehose.tasks.contains_async(&source.url).await { 376 385 continue; 377 386 } 378 387 let enabled_rx = state.firehose_enabled.subscribe(); 379 - let handle = 380 - spawn_firehose_ingestor(relay_url, &state, fire_shared, enabled_rx).await?; 381 - let _ = firehose.tasks.insert_async(relay_url.clone(), handle).await; 388 + let handle = spawn_firehose_ingestor( 389 + &source.url, 390 + source.is_pds, 391 + &state, 392 + fire_shared, 393 + enabled_rx, 394 + ) 395 + .await?; 396 + let _ = firehose 397 + .tasks 398 + .insert_async(source.url.clone(), handle) 399 + .await; 382 400 } 383 401 384 402 // 11. spawn crawler infrastructure (always, to support dynamic source management)
+2
src/db/migration/mod.rs
··· 6 6 7 7 mod v1; 8 8 mod v2; 9 + mod v3; 9 10 10 11 type MigrationFn = fn(&Db, &mut OwnedWriteBatch) -> Result<()>; 11 12 ··· 13 14 const MIGRATIONS: &[(&str, MigrationFn)] = &[ 14 15 ("stable_firehose_cursors", v1::stable_firehose_cursors), 15 16 ("repo_state_root_commit", v2::repo_state_root_commit), 17 + ("firehose_source_is_pds", v3::firehose_source_is_pds), 16 18 ]; 17 19 18 20 fn read_version(db: &Db) -> Result<u64> {
+63
src/db/migration/v3.rs
··· 1 + use fjall::OwnedWriteBatch; 2 + use miette::{IntoDiagnostic, Result}; 3 + 4 + use crate::db::{Db, FirehoseSourceMeta, keys}; 5 + 6 + // for this migration, we default to everything being a PDS unless it matches 7 + // one of the hosts mentioned in this list. this is best effort but its 8 + // better than defaulting to false and entirely disabling validation, 9 + // the user can manage the firehose sources when they see the error anyway. 10 + // (and i like to be correct :P) 11 + const KNOWN_RELAY_HOSTS: &[&str] = &[ 12 + "atproto.africa", 13 + "bsky.network", 14 + "relay1.us-east.bsky.network", 15 + "relay1.us-west.bsky.network", 16 + "relay.fire.hose.cam", 17 + "relay3.fr.hose.cam", 18 + "relay.upcloud.world", 19 + "relay.hayescmd.net", 20 + "relay.xero.systems", 21 + "relay.feeds.blue", 22 + "zlay.waow.tech", 23 + "asia.firehose.network", 24 + "europe.firehose.network", 25 + "northamerica.firehose.network", 26 + "relay.bas.sh", 27 + "relay.t4tlabs.net", 28 + "relay.waow.tech", 29 + ]; 30 + 31 + fn is_known_relay(url_bytes: &[u8]) -> bool { 32 + let Ok(url_str) = std::str::from_utf8(url_bytes) else { 33 + return false; 34 + }; 35 + let Ok(url) = url::Url::parse(url_str) else { 36 + return false; 37 + }; 38 + url.host_str() 39 + .is_some_and(|h| KNOWN_RELAY_HOSTS.contains(&h)) 40 + } 41 + 42 + pub(super) fn firehose_source_is_pds(db: &Db, batch: &mut OwnedWriteBatch) -> Result<()> { 43 + let relay_bytes = rmp_serde::to_vec(&FirehoseSourceMeta { is_pds: false }) 44 + .map_err(|e| miette::miette!("failed to serialize meta: {e}"))?; 45 + let pds_bytes = rmp_serde::to_vec(&FirehoseSourceMeta { is_pds: true }) 46 + .map_err(|e| miette::miette!("failed to serialize meta: {e}"))?; 47 + 48 + for item in db.crawler.prefix(keys::FIREHOSE_SOURCE_PREFIX) { 49 + let (key, val) = item.into_inner().into_diagnostic()?; 50 + if !val.is_empty() { 51 + continue; 52 + } 53 + let url_bytes = &key[keys::FIREHOSE_SOURCE_PREFIX.len()..]; 54 + let value = if is_known_relay(url_bytes) { 55 + &relay_bytes 56 + } else { 57 + &pds_bytes 58 + }; 59 + batch.insert(&db.crawler, key, value); 60 + } 61 + 62 + Ok(()) 63 + }
+18 -5
src/db/mod.rs
··· 861 861 Ok(count.unwrap_or(0)) 862 862 } 863 863 864 - pub fn load_persisted_firehose_sources(db: &crate::db::Db) -> Result<Vec<Url>> { 864 + #[derive(serde::Serialize, serde::Deserialize, Default)] 865 + pub(crate) struct FirehoseSourceMeta { 866 + #[serde(default)] 867 + pub(crate) is_pds: bool, 868 + } 869 + 870 + pub fn load_persisted_firehose_sources( 871 + db: &crate::db::Db, 872 + ) -> Result<Vec<crate::config::FirehoseSource>> { 865 873 use crate::db::keys::FIREHOSE_SOURCE_PREFIX; 866 874 867 - let mut urls = Vec::new(); 875 + let mut sources = Vec::new(); 868 876 for entry in db.crawler.prefix(FIREHOSE_SOURCE_PREFIX) { 869 - let (key, _) = entry.into_inner().into_diagnostic()?; 877 + let (key, val) = entry.into_inner().into_diagnostic()?; 870 878 let url_bytes = &key[FIREHOSE_SOURCE_PREFIX.len()..]; 871 879 let url_str = std::str::from_utf8(url_bytes).into_diagnostic()?; 872 880 let url = Url::parse(url_str).into_diagnostic()?; 873 - urls.push(url); 881 + let meta: FirehoseSourceMeta = rmp_serde::from_slice(&val) 882 + .map_err(|e| miette::miette!("failed to deserialize firehose source meta: {e}"))?; 883 + sources.push(crate::config::FirehoseSource { 884 + url, 885 + is_pds: meta.is_pds, 886 + }); 874 887 } 875 - Ok(urls) 888 + Ok(sources) 876 889 } 877 890 878 891 pub fn load_persisted_crawler_sources(
+4
src/ingest/firehose.rs
··· 18 18 state: Arc<AppState>, 19 19 buffer_tx: BufferTx, 20 20 relay_host: Url, 21 + is_pds: bool, 21 22 filter: FilterHandle, 22 23 enabled: watch::Receiver<bool>, 23 24 _verify_signatures: bool, ··· 28 29 state: Arc<AppState>, 29 30 buffer_tx: BufferTx, 30 31 relay_host: Url, 32 + is_pds: bool, 31 33 filter: FilterHandle, 32 34 enabled: watch::Receiver<bool>, 33 35 verify_signatures: bool, ··· 36 38 state, 37 39 buffer_tx, 38 40 relay_host, 41 + is_pds, 39 42 filter, 40 43 enabled, 41 44 _verify_signatures: verify_signatures, ··· 130 133 131 134 if let Err(e) = self.buffer_tx.send(IngestMessage::Firehose { 132 135 relay: self.relay_host.clone(), 136 + is_pds: self.is_pds, 133 137 msg: msg.into_static(), 134 138 }) { 135 139 error!(err = %e, "failed to send message to buffer processor");
+3
src/ingest/mod.rs
··· 14 14 pub enum IngestMessage { 15 15 Firehose { 16 16 relay: Url, 17 + /// true when `relay` is a direct PDS connection (not an aggregating relay). 18 + /// enables host authority enforcement in the worker. 19 + is_pds: bool, 17 20 msg: SubscribeReposMessage<'static>, 18 21 }, 19 22 BackfillFinished(Did<'static>),
+104 -8
src/ingest/worker.rs
··· 52 52 } 53 53 } 54 54 55 + enum HostAuthorityOutcome { 56 + /// stored pds matched the source host immediately. 57 + Authorized, 58 + /// pds migrated: doc now points to this host, but our stored state was stale. trigger backfill. 59 + Migration, 60 + /// host did not match even after doc resolution. reject the message. 61 + WrongHost, 62 + } 63 + 55 64 // gate returned by check_repo_state, tells the shard loop what to do with the message 56 65 enum ProcessGate<'s, 'c> { 57 66 // did not exist in db, newly queued for backfill, drop ··· 249 258 } 250 259 } 251 260 } 252 - IngestMessage::Firehose { relay, msg } => { 261 + IngestMessage::Firehose { relay, is_pds, msg } => { 253 262 let _span = tracing::info_span!("firehose", relay = %relay).entered(); 263 + // only enforce host authority when the source is a direct PDS connection 264 + let source_host = is_pds.then(|| relay.host_str()).flatten(); 254 265 let (did, seq) = match &msg { 255 266 SubscribeReposMessage::Commit(c) => (&c.repo, c.seq), 256 267 SubscribeReposMessage::Identity(i) => (&i.did, i.seq), ··· 330 341 } 331 342 } 332 343 333 - match Self::process_message(&mut ctx, &msg, did, repo_state, pre_status) 334 - { 344 + match Self::process_message( 345 + &mut ctx, 346 + &msg, 347 + did, 348 + repo_state, 349 + pre_status, 350 + source_host, 351 + ) { 335 352 Ok(RepoProcessResult::Ok(_)) => {} 336 353 Ok(RepoProcessResult::Deleted) => { 337 354 state.db.update_count("repos", -1); ··· 411 428 did: &Did, 412 429 repo_state: RepoState<'s>, 413 430 pre_status: RepoStatus, 431 + source_host: Option<&str>, 414 432 ) -> Result<RepoProcessResult<'s, 'c>, IngestError> { 415 433 match msg { 416 434 SubscribeReposMessage::Commit(commit) => { 417 435 trace!(did = %did, "processing commit"); 418 - Self::handle_commit(ctx, did, repo_state, commit) 436 + Self::handle_commit(ctx, did, repo_state, commit, source_host) 419 437 } 420 438 SubscribeReposMessage::Sync(sync) => { 421 439 debug!(did = %did, "processing sync"); 422 - Self::handle_sync(ctx, did, repo_state, sync) 440 + Self::handle_sync(ctx, did, repo_state, sync, source_host) 423 441 } 424 442 SubscribeReposMessage::Identity(identity) => { 425 443 debug!(did = %did, "processing identity"); ··· 441 459 did: &Did, 442 460 mut repo_state: RepoState<'s>, 443 461 commit: &'c Commit<'c>, 462 + source_host: Option<&str>, 444 463 ) -> Result<RepoProcessResult<'s, 'c>, IngestError> { 445 464 repo_state.advance_message_time(commit.time.0.timestamp_millis()); 446 465 447 - // TODO phase 2: host authority check (source_host not available in indexer mode) 466 + if let Some(host) = source_host { 467 + match Self::check_host_authority(ctx, did, &mut repo_state, host)? { 468 + HostAuthorityOutcome::Authorized => {} 469 + HostAuthorityOutcome::Migration => { 470 + // pds migrated: our data may be stale, backfill from the new host 471 + warn!(did = %did, source_host = host, "pds migration detected, triggering backfill"); 472 + let mut batch = ctx.state.db.inner.batch(); 473 + let _repo_state = ops::update_repo_status( 474 + &mut batch, 475 + &ctx.state.db, 476 + did, 477 + repo_state, 478 + RepoStatus::Backfilling, 479 + )?; 480 + batch.commit().into_diagnostic()?; 481 + ctx.state 482 + .db 483 + .update_gauge_diff(&GaugeState::Synced, &GaugeState::Pending); 484 + ctx.state.notify_backfill(); 485 + return Ok(RepoProcessResult::NeedsBackfill(Some(commit))); 486 + } 487 + // todo: ideally ban pds 488 + HostAuthorityOutcome::WrongHost => { 489 + warn!(did = %did, source_host = host, pds = ?repo_state.pds, "commit rejected: wrong host"); 490 + return Ok(RepoProcessResult::Ok(repo_state)); 491 + } 492 + } 493 + } 448 494 449 495 // validate the commit: stale rev, size limits, future rev, CAR parse, field 450 496 // consistency, signature, and chain-break detection ··· 536 582 did: &Did, 537 583 mut repo_state: RepoState<'s>, 538 584 sync: &'c Sync<'c>, 585 + source_host: Option<&str>, 539 586 ) -> Result<RepoProcessResult<'s, 'c>, IngestError> { 540 587 repo_state.advance_message_time(sync.time.0.timestamp_millis()); 541 588 542 - // TODO phase 2: host authority check 589 + if let Some(host) = source_host { 590 + match Self::check_host_authority(ctx, did, &mut repo_state, host)? { 591 + HostAuthorityOutcome::Authorized | HostAuthorityOutcome::Migration => { 592 + // migration is fine here — sync already triggers a backfill below 593 + } 594 + // todo: ideally ban pds 595 + HostAuthorityOutcome::WrongHost => { 596 + warn!(did = %did, source_host = host, pds = ?repo_state.pds, "sync rejected: wrong host"); 597 + return Ok(RepoProcessResult::Ok(repo_state)); 598 + } 599 + } 600 + } 543 601 544 602 // validate: size limit, CAR parse, field consistency, signature 545 603 let signing_key = Self::fetch_key(ctx, did)?; ··· 864 922 let (key, value) = guard.into_inner().into_diagnostic()?; 865 923 let commit: Commit = rmp_serde::from_slice(&value).into_diagnostic()?; 866 924 867 - let res = Self::handle_commit(ctx, did, repo_state, &commit); 925 + // buffered commits have already been source-checked on arrival; skip host check 926 + let res = Self::handle_commit(ctx, did, repo_state, &commit, None); 868 927 let res = match res { 869 928 Ok(r) => r, 870 929 Err(e) => { ··· 891 950 } 892 951 893 952 Ok(RepoProcessResult::Ok(repo_state)) 953 + } 954 + 955 + /// check that `source_host` is the authoritative PDS for `did`. 956 + /// 957 + /// - `Authorized`: stored pds matched immediately (fast path). 958 + /// - `Migration`: stored pds was wrong but doc resolved to this host; caller should backfill. 959 + /// - `WrongHost`: host did not match even after doc resolution; caller should reject. 960 + fn check_host_authority( 961 + ctx: &mut WorkerContext, 962 + did: &Did, 963 + repo_state: &mut RepoState, 964 + source_host: &str, 965 + ) -> Result<HostAuthorityOutcome, IngestError> { 966 + let pds_host = repo_state 967 + .pds 968 + .as_deref() 969 + .and_then(|pds| url::Url::parse(pds).ok()) 970 + .and_then(|u| u.host_str().map(str::to_owned)); 971 + 972 + if pds_host.as_deref() == Some(source_host) { 973 + return Ok(HostAuthorityOutcome::Authorized); 974 + } 975 + 976 + // unknown pds or host mismatch — resolve doc to verify or detect a migration 977 + Self::refresh_doc(ctx, repo_state, did)?; 978 + 979 + let updated_host = repo_state 980 + .pds 981 + .as_deref() 982 + .and_then(|pds| url::Url::parse(pds).ok()) 983 + .and_then(|u| u.host_str().map(str::to_owned)); 984 + 985 + if updated_host.as_deref() == Some(source_host) { 986 + Ok(HostAuthorityOutcome::Migration) 987 + } else { 988 + Ok(HostAuthorityOutcome::WrongHost) 989 + } 894 990 } 895 991 896 992 // refreshes the handle, pds url and signing key of a did
+1 -1
tests/common.nu
··· 56 56 } 57 57 58 58 export def resolve-pds [did: string] { 59 - let doc = (http get $"https://plc.wtf/($did)" | from json) 59 + let doc = (http get $"https://plc.gaze.systems/($did)" | from json) 60 60 ($doc.service | where type == "AtprotoPersonalDataServer" | first).serviceEndpoint 61 61 } 62 62