very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
59
fork

Configure Feed

Select the types of activity you want to include in your feed.

[ingest,lib,api] relay mode, sync fixes

dawn 701120fd 676f8f52

+1281 -368
+3 -1
Cargo.toml
··· 4 4 edition = "2024" 5 5 6 6 [features] 7 - default = [] 7 + default = ["events"] 8 8 sync_all = [] 9 9 backlinks = [] 10 + relay = [] 11 + events = [] 10 12 11 13 [dependencies] 12 14 tokio = { version = "1.0", features = ["full"] }
+6 -5
src/api/mod.rs
··· 13 13 mod ingestion; 14 14 mod repos; 15 15 mod stats; 16 + #[cfg(feature = "events")] 16 17 mod stream; 17 18 mod xrpc; 18 19 ··· 20 21 #[allow(unused_mut)] 21 22 let mut app = Router::new() 22 23 .route("/health", get(|| async { "OK" })) 23 - .route("/stats", get(stats::get_stats)) 24 - .nest("/stream", stream::router()) 24 + .route("/stats", get(stats::get_stats)); 25 + #[cfg(feature = "events")] 26 + let app = app.nest("/stream", stream::router()); 27 + let app = app 25 28 .merge(xrpc::router()) 26 29 .merge(filter::router()) 27 30 .merge(repos::router()) ··· 31 34 .merge(db::router()); 32 35 33 36 #[cfg(feature = "backlinks")] 34 - { 35 - app = app.merge(crate::backlinks::api::router()); 36 - } 37 + let app = app.merge(crate::backlinks::api::router()); 37 38 38 39 let app = app 39 40 .with_state(hydrant)
+15 -2
src/api/xrpc/mod.rs
··· 39 39 mod list_hosts; 40 40 mod list_records; 41 41 mod list_repos; 42 + #[cfg(feature = "relay")] 43 + mod subscribe_repos; 42 44 43 45 pub fn router() -> Router<Hydrant> { 44 - Router::new() 46 + #[allow(unused_mut)] 47 + let mut r = Router::new() 45 48 .route(GetRecordRequest::PATH, get(get_record::handle)) 46 49 .route(ListRecordsRequest::PATH, get(list_records::handle)) 47 50 .route(CountRecords::PATH, get(count_records::handle)) ··· 55 58 .route(GetLatestCommitRequest::PATH, get(get_latest_commit::handle)) 56 59 .route(GetRepoRequest::PATH, get(get_repo::handle)) 57 60 .route(GetRepoStatusRequest::PATH, get(get_repo_status::handle)) 58 - .route(ListReposRequest::PATH, get(list_repos::handle)) 61 + .route(ListReposRequest::PATH, get(list_repos::handle)); 62 + 63 + #[cfg(feature = "relay")] 64 + { 65 + r = r.route( 66 + "/xrpc/com.atproto.sync.subscribeRepos", 67 + axum::routing::get(subscribe_repos::handle), 68 + ); 69 + } 70 + 71 + r 59 72 } 60 73 61 74 #[derive(Debug)]
+34
src/api/xrpc/subscribe_repos.rs
··· 1 + use axum::{ 2 + extract::{ 3 + Query, State, 4 + ws::{Message, WebSocket, WebSocketUpgrade}, 5 + }, 6 + response::IntoResponse, 7 + }; 8 + use futures::StreamExt; 9 + use serde::Deserialize; 10 + 11 + use crate::control::Hydrant; 12 + 13 + #[derive(Deserialize)] 14 + pub struct SubscribeReposQuery { 15 + pub cursor: Option<u64>, 16 + } 17 + 18 + pub async fn handle( 19 + State(hydrant): State<Hydrant>, 20 + Query(query): Query<SubscribeReposQuery>, 21 + ws: WebSocketUpgrade, 22 + ) -> impl IntoResponse { 23 + ws.on_upgrade(move |socket| handle_socket(socket, hydrant, query)) 24 + } 25 + 26 + async fn handle_socket(mut socket: WebSocket, hydrant: Hydrant, query: SubscribeReposQuery) { 27 + let mut stream = hydrant.subscribe_repos(query.cursor); 28 + 29 + while let Some(frame) = stream.next().await { 30 + if socket.send(Message::Binary(frame)).await.is_err() { 31 + break; 32 + } 33 + } 34 + }
+3 -3
src/control/firehose.rs
··· 48 48 let start = db::get_firehose_cursor(&state.db, relay_url).await?; 49 49 // insert into relay_cursors if not already present; existing in-memory cursor takes precedence 50 50 let _ = state 51 - .relay_cursors 51 + .firehose_cursors 52 52 .insert_async(relay_url.clone(), AtomicI64::new(start.unwrap_or(0))) 53 53 .await; 54 54 ··· 115 115 .await 116 116 .into_diagnostic()??; 117 117 118 - self.state.relay_cursors.peek_with(&relay_url, |_, c| { 118 + self.state.firehose_cursors.peek_with(&relay_url, |_, c| { 119 119 c.store(0, Ordering::SeqCst); 120 120 }); 121 121 Ok(()) ··· 190 190 } 191 191 192 192 // remove from relay_cursors (persist thread will stop tracking it) 193 - self.state.relay_cursors.remove_async(url).await; 193 + self.state.firehose_cursors.remove_async(url).await; 194 194 195 195 if self.persisted.remove_async(url).await.is_some() { 196 196 let db = self.state.db.clone();
+86 -18
src/control/mod.rs
··· 22 22 use tokio::sync::{mpsc, watch}; 23 23 use tracing::{debug, error, info}; 24 24 25 + #[cfg(feature = "events")] 25 26 use crate::backfill::BackfillWorker; 26 27 use crate::config::{Config, SignatureVerification}; 27 28 use crate::db::{ ··· 29 30 load_persisted_firehose_sources, 30 31 }; 31 32 use crate::filter::FilterMode; 33 + #[cfg(feature = "events")] 32 34 use crate::ingest::worker::FirehoseWorker; 33 35 use crate::state::AppState; 34 36 use crate::types::MarshallableEvt; 35 37 36 38 use crawler::{CrawlerShared, spawn_crawler_producer}; 37 39 use firehose::{FirehoseShared, spawn_firehose_ingestor}; 40 + #[cfg(feature = "events")] 38 41 use stream::event_stream_thread; 42 + #[cfg(feature = "relay")] 43 + use stream::relay_stream_thread; 39 44 40 45 /// infromation about a host hydrant is consuming from. 41 46 pub struct Host { ··· 213 218 // internal buffered channel between ingestors / backfill and the firehose worker 214 219 let (buffer_tx, buffer_rx) = mpsc::unbounded_channel(); 215 220 216 - // 5. spawn the backfill worker 221 + // 5. spawn the backfill worker (not used in relay mode) 222 + #[cfg(feature = "events")] 217 223 tokio::spawn({ 218 224 let state = state.clone(); 219 225 BackfillWorker::new( ··· 232 238 }); 233 239 234 240 // 6. re-queue any repos that lost their backfill state, then start the retry worker 235 - if let Err(e) = tokio::task::spawn_blocking({ 236 - let state = state.clone(); 237 - move || crate::backfill::manager::queue_gone_backfills(&state) 238 - }) 239 - .await 240 - .into_diagnostic()? 241 + #[cfg(feature = "events")] 241 242 { 242 - error!(err = %e, "failed to queue gone backfills"); 243 - db::check_poisoned_report(&e); 244 - } 243 + if let Err(e) = tokio::task::spawn_blocking({ 244 + let state = state.clone(); 245 + move || crate::backfill::manager::queue_gone_backfills(&state) 246 + }) 247 + .await 248 + .into_diagnostic()? 249 + { 250 + error!(err = %e, "failed to queue gone backfills"); 251 + db::check_poisoned_report(&e); 252 + } 245 253 246 - std::thread::spawn({ 247 - let state = state.clone(); 248 - move || crate::backfill::manager::retry_worker(state) 249 - }); 254 + std::thread::spawn({ 255 + let state = state.clone(); 256 + move || crate::backfill::manager::retry_worker(state) 257 + }); 258 + } 250 259 251 - // 7. ephemeral GC thread 260 + // 7. ephemeral GC thread (not used in relay mode) 261 + #[cfg(feature = "events")] 252 262 if config.ephemeral { 253 263 let state = state.clone(); 254 264 std::thread::Builder::new() ··· 264 274 move || loop { 265 275 std::thread::sleep(persist_interval); 266 276 267 - state.relay_cursors.iter_sync(|relay, cursor| { 277 + state.firehose_cursors.iter_sync(|relay, cursor| { 268 278 let seq = cursor.load(Ordering::SeqCst); 269 279 if seq > 0 { 270 280 if let Err(e) = db::set_firehose_cursor(&state.db, relay, seq) { ··· 288 298 }); 289 299 290 300 // 9. events/sec stats ticker 301 + #[cfg(feature = "events")] 291 302 tokio::spawn({ 292 303 let state = state.clone(); 293 304 let mut last_id = state.db.next_event_id.load(Ordering::Relaxed); ··· 529 540 let handle = tokio::runtime::Handle::current(); 530 541 let firehose_worker = std::thread::spawn({ 531 542 let state = state.clone(); 543 + let handle = handle.clone(); 532 544 move || { 533 - FirehoseWorker::new( 545 + #[cfg(feature = "relay")] 546 + return crate::ingest::relay_worker::RelayWorker::new( 547 + state, 548 + buffer_rx, 549 + matches!(config.verify_signatures, SignatureVerification::Full), 550 + config.firehose_workers, 551 + crate::ingest::validation::ValidationOptions { 552 + verify_mst: config.verify_mst, 553 + rev_clock_skew_secs: config.rev_clock_skew_secs, 554 + }, 555 + ) 556 + .run(handle); 557 + #[cfg(feature = "events")] 558 + return FirehoseWorker::new( 534 559 state, 535 560 buffer_rx, 536 561 matches!(config.verify_signatures, SignatureVerification::Full), ··· 541 566 rev_clock_skew_secs: config.rev_clock_skew_secs, 542 567 }, 543 568 ) 544 - .run(handle) 569 + .run(handle); 545 570 } 546 571 }); 547 572 ··· 594 619 /// 595 620 /// multiple concurrent subscribers each receive a full independent copy of the stream. 596 621 /// the stream ends when the `EventStream` is dropped. 622 + #[cfg(feature = "events")] 597 623 pub fn subscribe(&self, cursor: Option<u64>) -> EventStream { 598 624 let (tx, rx) = mpsc::channel(500); 599 625 let state = self.state.clone(); ··· 608 634 .expect("failed to spawn stream thread"); 609 635 610 636 EventStream(rx) 637 + } 638 + 639 + /// subscribe to the relay's ordered `subscribeRepos` event stream. 640 + /// 641 + /// returns a [`RelayEventStream`] that yields pre-encoded CBOR binary frames 642 + /// ready to forward directly to ATProto clients via WebSocket. 643 + /// 644 + /// - if `cursor` is `None`, streaming starts from the current head (live tail only). 645 + /// - if `cursor` is `Some(seq)`, all persisted events from that seq onward are replayed first. 646 + #[cfg(feature = "relay")] 647 + pub fn subscribe_repos(&self, cursor: Option<u64>) -> RelayEventStream { 648 + let (tx, rx) = mpsc::channel(500); 649 + let state = self.state.clone(); 650 + let runtime = tokio::runtime::Handle::current(); 651 + 652 + std::thread::Builder::new() 653 + .name("hydrant-relay-stream".into()) 654 + .spawn(move || { 655 + let _g = runtime.enter(); 656 + relay_stream_thread(state, tx, cursor); 657 + }) 658 + .expect("failed to spawn relay stream thread"); 659 + 660 + RelayEventStream(rx) 611 661 } 612 662 613 663 /// return database counts and on-disk sizes for all keyspaces. ··· 785 835 /// implements [`futures::Stream`] and can be used with `StreamExt::next`, 786 836 /// `while let Some(evt) = stream.next().await`, `forward`, etc. 787 837 /// the stream terminates when the underlying channel closes (i.e. hydrant shuts down). 838 + #[cfg(feature = "events")] 788 839 pub struct EventStream(mpsc::Receiver<Event>); 789 840 841 + #[cfg(feature = "events")] 790 842 impl Stream for EventStream { 791 843 type Item = Event; 792 844 793 845 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 846 + self.0.poll_recv(cx) 847 + } 848 + } 849 + 850 + /// the relay event stream produced by [`Hydrant::subscribe_repos`]. 851 + #[cfg(feature = "relay")] 852 + pub struct RelayEventStream(mpsc::Receiver<bytes::Bytes>); 853 + 854 + #[cfg(feature = "relay")] 855 + impl futures::Stream for RelayEventStream { 856 + type Item = bytes::Bytes; 857 + 858 + fn poll_next( 859 + mut self: std::pin::Pin<&mut Self>, 860 + cx: &mut std::task::Context<'_>, 861 + ) -> std::task::Poll<Option<Self::Item>> { 794 862 self.0.poll_recv(cx) 795 863 } 796 864 }
+80 -10
src/control/stream.rs
··· 1 1 use std::sync::Arc; 2 - use std::sync::atomic::Ordering; 3 2 4 - use jacquard_common::types::cid::{ATP_CID_HASH, IpldCid}; 5 - use jacquard_common::types::nsid::Nsid; 6 - use jacquard_common::types::string::Rkey; 7 - use jacquard_common::{CowStr, IntoStatic, RawData}; 8 - use jacquard_repo::DAG_CBOR_CID_CODEC; 9 - use sha2::{Digest, Sha256}; 10 3 use tokio::sync::mpsc; 11 4 use tracing::error; 12 5 13 - use crate::db::{self, keys}; 6 + use crate::db::keys; 14 7 use crate::state::AppState; 15 - use crate::types::{BroadcastEvent, MarshallableEvt, RecordEvt, StoredData, StoredEvent}; 8 + use std::sync::atomic::Ordering; 16 9 17 - use super::Event; 10 + #[cfg(feature = "events")] 11 + use { 12 + super::Event, 13 + crate::db, 14 + crate::types::{BroadcastEvent, MarshallableEvt, RecordEvt, StoredData, StoredEvent}, 15 + jacquard_common::types::cid::{ATP_CID_HASH, IpldCid}, 16 + jacquard_common::types::nsid::Nsid, 17 + jacquard_common::types::string::Rkey, 18 + jacquard_common::{CowStr, IntoStatic, RawData}, 19 + jacquard_repo::DAG_CBOR_CID_CODEC, 20 + sha2::{Digest, Sha256}, 21 + }; 18 22 23 + #[cfg(feature = "events")] 19 24 pub(super) fn event_stream_thread( 20 25 state: Arc<AppState>, 21 26 tx: mpsc::Sender<Event>, ··· 87 92 } 88 93 } 89 94 95 + #[cfg(feature = "relay")] 96 + pub(super) fn relay_stream_thread( 97 + state: Arc<AppState>, 98 + tx: mpsc::Sender<bytes::Bytes>, 99 + cursor: Option<u64>, 100 + ) { 101 + use crate::types::RelayBroadcast; 102 + use std::sync::atomic::Ordering; 103 + 104 + let mut relay_rx = state.db.relay_broadcast_tx.subscribe(); 105 + let ks = state.db.relay_events.clone(); 106 + let mut current_seq = match cursor { 107 + Some(c) => c.saturating_sub(1), 108 + None => state 109 + .db 110 + .next_relay_seq 111 + .load(Ordering::Relaxed) 112 + .saturating_sub(1), 113 + }; 114 + 115 + loop { 116 + // catch up from db: send all stored frames from current_seq+1 onward 117 + loop { 118 + let mut found = false; 119 + for item in ks.range(crate::db::keys::relay_event_key(current_seq + 1)..) { 120 + let (k, v) = match item.into_inner() { 121 + Ok(kv) => kv, 122 + Err(e) => { 123 + error!(err = %e, "relay stream: failed to read relay_events"); 124 + break; 125 + } 126 + }; 127 + let seq = match k.as_ref().try_into().map(u64::from_be_bytes) { 128 + Ok(s) => s, 129 + Err(_) => { 130 + error!("relay stream: failed to parse relay event seq"); 131 + continue; 132 + } 133 + }; 134 + current_seq = seq; 135 + if tx.blocking_send(bytes::Bytes::copy_from_slice(&v)).is_err() { 136 + return; // subscriber dropped 137 + } 138 + found = true; 139 + } 140 + if !found { 141 + break; 142 + } 143 + } 144 + 145 + // wait for live events 146 + match relay_rx.blocking_recv() { 147 + Ok(RelayBroadcast::Persisted(_)) => {} // re-run catch-up 148 + Ok(RelayBroadcast::Ephemeral(frame)) => { 149 + if tx.blocking_send(frame).is_err() { 150 + return; 151 + } 152 + } 153 + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {} // re-run catch-up 154 + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, 155 + } 156 + } 157 + } 158 + 159 + #[cfg(feature = "events")] 90 160 fn stored_to_event(state: &AppState, id: u64, stored: StoredEvent<'_>) -> Option<Event> { 91 161 let StoredEvent { 92 162 live,
+3
src/db/ephemeral.rs
··· 20 20 let cutoff_ts = now.saturating_sub(ttl.as_secs()); 21 21 22 22 // write current watermark 23 + #[cfg(feature = "events")] 23 24 let current_event_id = db.next_event_id.load(Ordering::SeqCst); 25 + #[cfg(not(feature = "events"))] 26 + let current_event_id = 0u64; 24 27 db.cursors 25 28 .insert( 26 29 keys::event_watermark_key(now),
+6
src/db/keys/mod.rs
··· 209 209 key 210 210 } 211 211 212 + /// key format: {SEQ} (u64 big-endian), mirroring event_key 213 + #[cfg(feature = "relay")] 214 + pub fn relay_event_key(seq: u64) -> [u8; 8] { 215 + seq.to_be_bytes() 216 + } 217 + 212 218 // key format: {collection}|{cid_bytes} 213 219 pub fn block_key(collection: &str, cid: &[u8]) -> Vec<u8> { 214 220 let mut key = Vec::with_capacity(collection.len() + 1 + cid.len());
+73 -13
src/db/mod.rs
··· 1 1 use crate::config::Compression; 2 2 use crate::db::compaction::DropPrefixFilterFactory; 3 - use crate::types::{BroadcastEvent, RepoState}; 3 + #[cfg(feature = "events")] 4 + use crate::types::BroadcastEvent; 5 + #[cfg(feature = "relay")] 6 + use crate::types::RelayBroadcast; 7 + use crate::types::RepoState; 4 8 5 9 use fjall::config::{BlockSizePolicy, CompressionPolicy, RestartIntervalPolicy}; 6 10 use fjall::{ ··· 51 55 pub crawler: Keyspace, 52 56 #[cfg(feature = "backlinks")] 53 57 pub backlinks: Keyspace, 58 + #[cfg(feature = "events")] 54 59 pub(crate) event_tx: broadcast::Sender<BroadcastEvent>, 60 + #[cfg(feature = "events")] 55 61 pub next_event_id: Arc<AtomicU64>, 62 + #[cfg(feature = "relay")] 63 + pub(crate) relay_events: Keyspace, 64 + #[cfg(feature = "relay")] 65 + pub(crate) next_relay_seq: Arc<AtomicU64>, 66 + #[cfg(feature = "relay")] 67 + pub(crate) relay_broadcast_tx: broadcast::Sender<RelayBroadcast>, 56 68 pub counts_map: HashMap<SmolStr, u64>, 57 69 } 58 70 ··· 357 369 .data_block_restart_interval_policy(RestartIntervalPolicy::all(2)), 358 370 )?; 359 371 372 + #[cfg(feature = "relay")] 373 + let relay_events = open_ks( 374 + "relay_events", 375 + opts() 376 + // only iterated for cursor replay 377 + .expect_point_read_hits(true) 378 + .max_memtable_size(mb(cfg.db_events_memtable_size_mb)) 379 + .data_block_size_policy(BlockSizePolicy::new([kb(64), kb(128)])) 380 + .data_block_compression_policy(CompressionPolicy::new([ 381 + CompressionType::None, 382 + get_compression("events", 3), 383 + ])) 384 + .data_block_restart_interval_policy(RestartIntervalPolicy::new([64, 128])), 385 + )?; 386 + 360 387 #[cfg(feature = "backlinks")] 361 388 let backlinks = open_ks( 362 389 "backlinks", ··· 377 404 // when adding new keyspaces, make sure to add them to the /stats endpoint 378 405 // and also update any relevant /debug/* endpoints 379 406 407 + #[cfg(feature = "events")] 380 408 let (event_tx, _) = broadcast::channel(10000); 381 409 382 410 let this = Self { ··· 395 423 crawler, 396 424 #[cfg(feature = "backlinks")] 397 425 backlinks, 426 + #[cfg(feature = "events")] 398 427 event_tx, 399 428 counts_map: HashMap::new(), 429 + #[cfg(feature = "events")] 400 430 next_event_id: Arc::new(AtomicU64::new(0)), 431 + #[cfg(feature = "relay")] 432 + relay_events, 433 + #[cfg(feature = "relay")] 434 + next_relay_seq: Arc::new(AtomicU64::new(0)), 435 + #[cfg(feature = "relay")] 436 + relay_broadcast_tx: { 437 + let (tx, _) = broadcast::channel(10000); 438 + tx 439 + }, 401 440 }; 402 441 403 442 migration::run(&this)?; 404 443 405 - let mut last_id = 0; 406 - if let Some(guard) = this.events.iter().next_back() { 407 - let k = guard.key().into_diagnostic()?; 408 - last_id = u64::from_be_bytes( 409 - k.as_ref() 410 - .try_into() 411 - .into_diagnostic() 412 - .wrap_err("expected to be id (8 bytes)")?, 413 - ); 444 + #[cfg(feature = "relay")] 445 + { 446 + let mut last_relay_seq = 0u64; 447 + if let Some(guard) = this.relay_events.iter().next_back() { 448 + let k = guard.key().into_diagnostic()?; 449 + last_relay_seq = u64::from_be_bytes( 450 + k.as_ref() 451 + .try_into() 452 + .into_diagnostic() 453 + .wrap_err("relay_events: invalid key length")?, 454 + ); 455 + } 456 + this.next_relay_seq 457 + .store(last_relay_seq + 1, std::sync::atomic::Ordering::Relaxed); 458 + } 459 + 460 + #[cfg(feature = "events")] 461 + { 462 + let mut last_id = 0; 463 + if let Some(guard) = this.events.iter().next_back() { 464 + let k = guard.key().into_diagnostic()?; 465 + last_id = u64::from_be_bytes( 466 + k.as_ref() 467 + .try_into() 468 + .into_diagnostic() 469 + .wrap_err("expected to be id (8 bytes)")?, 470 + ); 471 + } 472 + // relaxed is fine since we are just initializing the db 473 + this.next_event_id 474 + .store(last_id + 1, std::sync::atomic::Ordering::Relaxed); 414 475 } 415 - // relaxed is fine since we are just initializing the db 416 - this.next_event_id 417 - .store(last_id + 1, std::sync::atomic::Ordering::Relaxed); 418 476 419 477 // load counts into memory 420 478 for guard in this.counts.prefix(keys::COUNT_KS_PREFIX) { ··· 543 601 )?; 544 602 #[cfg(feature = "backlinks")] 545 603 compact(self.backlinks.clone()).await?; 604 + #[cfg(feature = "relay")] 605 + compact(self.relay_events.clone()).await?; 546 606 Ok(()) 547 607 } 548 608
+20 -4
src/ingest/firehose.rs
··· 1 1 use crate::db::deser_repo_state; 2 2 use crate::filter::{FilterHandle, FilterMode}; 3 - use crate::ingest::stream::{FirehoseStream, SubscribeReposMessage, decode_frame}; 3 + use crate::ingest::stream::{FirehoseError, FirehoseStream, SubscribeReposMessage, decode_frame}; 4 4 use crate::ingest::{BufferTx, IngestMessage}; 5 5 use crate::state::AppState; 6 6 use crate::util::WatchEnabledExt; ··· 11 11 use std::sync::atomic::Ordering; 12 12 use std::time::Duration; 13 13 use tokio::sync::watch; 14 - use tracing::{Span, debug, error, info, trace}; 14 + use tracing::{Span, debug, error, info, trace, warn}; 15 15 use url::Url; 16 16 17 17 pub struct FirehoseIngestor { ··· 52 52 53 53 let start_cursor = self 54 54 .state 55 - .relay_cursors 55 + .firehose_cursors 56 56 .peek_with(&self.relay_host, |_, c| { 57 57 let val = c.load(Ordering::SeqCst); 58 58 (val > 0).then_some(val) ··· 90 90 match decode_frame(&bytes) { 91 91 Ok(msg) => self.handle_message(msg).await, 92 92 Err(e) => { 93 - error!(err = %e, "firehose stream error"); 93 + match e { 94 + // dont disconnect on unknown op or type 95 + FirehoseError::UnknownOp(op) => { 96 + warn!(op = %op, "unknown frame op"); 97 + continue; 98 + }, 99 + FirehoseError::UnknownType(t) => { 100 + warn!(ty = %t, "unknown frame type"); 101 + continue; 102 + }, 103 + // everything else is a hard error 104 + FirehoseError::RelayError { error, message } => { 105 + let message = message.unwrap_or_else(|| "<no message>".to_owned()); 106 + error!(err = %error, "relay sent error: {message}"); 107 + }, 108 + e => error!(err = %e, "firehose stream error"), 109 + } 94 110 break true; 95 111 } 96 112 }
+112 -2
src/ingest/mod.rs
··· 1 1 use tokio::sync::mpsc; 2 + use tracing::warn; 2 3 3 4 pub mod firehose; 5 + #[cfg(feature = "relay")] 6 + pub mod relay_worker; 4 7 pub mod stream; 5 8 pub mod validation; 9 + #[cfg(feature = "events")] 6 10 pub mod worker; 7 11 12 + use jacquard_common::types::crypto::PublicKey; 8 13 use jacquard_common::types::did::Did; 14 + use miette::Result; 15 + use smol_str::{SmolStr, ToSmolStr}; 16 + use url::Url; 9 17 10 - use crate::ingest::stream::SubscribeReposMessage; 11 - use url::Url; 18 + use crate::ingest::stream::{AccountStatus, SubscribeReposMessage}; 19 + use crate::resolver::Resolver; 20 + use crate::types::{RepoState, RepoStatus}; 12 21 13 22 #[derive(Debug)] 14 23 pub enum IngestMessage { ··· 24 33 25 34 pub type BufferTx = mpsc::UnboundedSender<IngestMessage>; 26 35 pub type BufferRx = mpsc::UnboundedReceiver<IngestMessage>; 36 + 37 + /// outcome of a host authority check. 38 + enum AuthorityOutcome { 39 + /// stored pds matched the source host immediately. 40 + Authorized, 41 + /// pds migrated: doc now points to this host, but our stored state was stale. 42 + WasStale, 43 + /// host did not match even after doc resolution. 44 + WrongHost { expected: SmolStr }, 45 + } 46 + 47 + fn pds_host(pds: Option<&str>) -> Option<SmolStr> { 48 + // todo: add faster host parsing since we only need that 49 + pds.and_then(|pds| Url::parse(pds).ok()).map(|u| { 50 + u.host_str() 51 + .map(SmolStr::new) 52 + .expect("that there is host in pds url") 53 + }) 54 + } 55 + 56 + /// invalidates the resolver cache for `did`, fetches a fresh document, and updates `repo_state`. 57 + /// 58 + /// panics if called outside a tokio runtime context. 59 + fn refresh_doc(resolver: &Resolver, did: &Did, repo_state: &mut RepoState) -> Result<()> { 60 + resolver.invalidate_sync(did); 61 + let doc = tokio::runtime::Handle::current() 62 + .block_on(resolver.resolve_doc(did)) 63 + .map_err(|e| miette::miette!("{e}"))?; 64 + repo_state.update_from_doc(doc); 65 + repo_state.touch(); 66 + Ok(()) 67 + } 68 + 69 + /// checks that `source_host` is the authoritative PDS for `did`. 70 + /// 71 + /// updates `repo_state` in place when a doc refresh is performed (i.e. on any outcome other than 72 + /// `Authorized`). callers that persist state (e.g. the indexer worker) should write `repo_state` 73 + /// to their batch after this call when the outcome is not `Authorized`. 74 + /// 75 + /// panics if called outside a tokio runtime context. 76 + fn check_host_authority( 77 + resolver: &Resolver, 78 + did: &Did, 79 + repo_state: &mut RepoState, 80 + source_host: &str, 81 + ) -> Result<AuthorityOutcome> { 82 + let expected = pds_host(repo_state.pds.as_deref()); 83 + if expected.as_deref() == Some(source_host) { 84 + return Ok(AuthorityOutcome::Authorized); 85 + } 86 + 87 + // try again once 88 + refresh_doc(resolver, did, repo_state)?; 89 + let Some(expected) = pds_host(repo_state.pds.as_deref()) else { 90 + miette::bail!("can't get pds host???"); 91 + }; 92 + if expected.as_str() == source_host { 93 + Ok(AuthorityOutcome::WasStale) 94 + } else { 95 + Ok(AuthorityOutcome::WrongHost { expected }) 96 + } 97 + } 98 + 99 + /// resolves the signing key for `did` if `verify_signatures` is true. 100 + /// 101 + /// panics if called outside a tokio runtime context. 102 + fn fetch_key( 103 + resolver: &Resolver, 104 + verify_signatures: bool, 105 + did: &Did, 106 + ) -> Result<Option<PublicKey<'static>>> { 107 + if verify_signatures { 108 + let key = tokio::runtime::Handle::current() 109 + .block_on(resolver.resolve_signing_key(did)) 110 + .map_err(|e| miette::miette!("{e}"))?; 111 + Ok(Some(key)) 112 + } else { 113 + Ok(None) 114 + } 115 + } 116 + 117 + /// maps an inactive account status to the corresponding `RepoStatus`. 118 + /// panics on `AccountStatus::Deleted`, caller must handle that 119 + fn inactive_account_repo_status(did: &Did, status: &Option<AccountStatus<'_>>) -> RepoStatus { 120 + match status { 121 + Some(AccountStatus::Takendown) => RepoStatus::Takendown, 122 + Some(AccountStatus::Suspended) => RepoStatus::Suspended, 123 + Some(AccountStatus::Deactivated) => RepoStatus::Deactivated, 124 + Some(AccountStatus::Throttled) => RepoStatus::Error("throttled".into()), 125 + Some(AccountStatus::Desynchronized) => RepoStatus::Error("desynchronized".into()), 126 + Some(AccountStatus::Other(s)) => { 127 + warn!(did = %did, status = %s, "unknown account status"); 128 + RepoStatus::Error(s.to_smolstr()) 129 + } 130 + Some(AccountStatus::Deleted) => unreachable!("deleted is handled before status mapping"), 131 + None => { 132 + warn!(did = %did, "account inactive but no status provided"); 133 + RepoStatus::Error("unknown".into()) 134 + } 135 + } 136 + }
+529
src/ingest/relay_worker.rs
··· 1 + use std::collections::hash_map::DefaultHasher; 2 + use std::hash::{Hash, Hasher}; 3 + use std::sync::Arc; 4 + use std::sync::atomic::Ordering; 5 + 6 + use fjall::OwnedWriteBatch; 7 + 8 + use jacquard_common::types::crypto::PublicKey; 9 + use jacquard_common::types::did::Did; 10 + use jacquard_common::{CowStr, IntoStatic}; 11 + use miette::{IntoDiagnostic, Result}; 12 + use tokio::runtime::Handle; 13 + use tokio::sync::mpsc; 14 + use tracing::{debug, error, info, info_span, trace, warn}; 15 + use url::Url; 16 + 17 + use crate::db::{self, keys}; 18 + use crate::ingest::stream::{ 19 + Account, Commit, Identity, InfoName, SubscribeReposMessage, Sync, encode_frame, 20 + }; 21 + use crate::ingest::validation::{ 22 + CommitValidationError, SyncValidationError, ValidatedCommit, ValidatedSync, ValidationContext, 23 + ValidationOptions, 24 + }; 25 + use crate::ingest::{BufferRx, IngestMessage}; 26 + use crate::state::AppState; 27 + use crate::types::{RelayBroadcast, RepoState, RepoStatus}; 28 + 29 + struct WorkerContext<'a> { 30 + verify_signatures: bool, 31 + state: &'a AppState, 32 + vctx: ValidationContext<'a>, 33 + batch: OwnedWriteBatch, 34 + pending_broadcasts: Vec<RelayBroadcast>, 35 + } 36 + 37 + struct WorkerMessage { 38 + is_pds: bool, 39 + firehose: Url, 40 + msg: SubscribeReposMessage<'static>, 41 + } 42 + 43 + pub struct RelayWorker { 44 + state: Arc<AppState>, 45 + rx: BufferRx, 46 + verify_signatures: bool, 47 + num_shards: usize, 48 + validation_opts: Arc<ValidationOptions>, 49 + } 50 + 51 + impl RelayWorker { 52 + pub fn new( 53 + state: Arc<AppState>, 54 + rx: BufferRx, 55 + verify_signatures: bool, 56 + num_shards: usize, 57 + validation_opts: ValidationOptions, 58 + ) -> Self { 59 + Self { 60 + state, 61 + rx, 62 + verify_signatures, 63 + num_shards, 64 + validation_opts: Arc::new(validation_opts), 65 + } 66 + } 67 + 68 + pub fn run(mut self, handle: Handle) -> Result<()> { 69 + let mut shards = Vec::with_capacity(self.num_shards); 70 + 71 + for i in 0..self.num_shards { 72 + let (tx, rx) = mpsc::unbounded_channel(); 73 + shards.push(tx); 74 + 75 + let state = self.state.clone(); 76 + let verify = self.verify_signatures; 77 + let h = handle.clone(); 78 + let opts = self.validation_opts.clone(); 79 + 80 + std::thread::Builder::new() 81 + .name(format!("relay-shard-{i}")) 82 + .spawn(move || { 83 + Self::shard(i, rx, state, verify, h, opts); 84 + }) 85 + .into_diagnostic()?; 86 + } 87 + 88 + info!(num = self.num_shards, "relay worker: started shards"); 89 + 90 + let _g = handle.enter(); 91 + 92 + while let Some(msg) = self.rx.blocking_recv() { 93 + let IngestMessage::Firehose { 94 + relay: firehose, 95 + is_pds, 96 + msg, 97 + } = msg 98 + else { 99 + continue; 100 + }; 101 + 102 + // #info only pertains to us, the direct consumer 103 + if let SubscribeReposMessage::Info(inf) = msg { 104 + match inf.name { 105 + InfoName::OutdatedCursor => { 106 + // todo: handle 107 + } 108 + InfoName::Other(name) => { 109 + let message = inf 110 + .message 111 + .unwrap_or_else(|| CowStr::Borrowed("<no message>")); 112 + info!(name = %name, "relay sent info: {message}"); 113 + } 114 + } 115 + continue; 116 + } 117 + 118 + let shard_idx = { 119 + let did = match &msg { 120 + SubscribeReposMessage::Commit(c) => &c.repo, 121 + SubscribeReposMessage::Identity(i) => &i.did, 122 + SubscribeReposMessage::Account(a) => &a.did, 123 + SubscribeReposMessage::Sync(s) => &s.did, 124 + _ => continue, 125 + }; 126 + let mut hasher = DefaultHasher::new(); 127 + did.hash(&mut hasher); 128 + let idx = (hasher.finish() as usize) % self.num_shards; 129 + idx 130 + }; 131 + 132 + if let Err(e) = shards[shard_idx].send(WorkerMessage { 133 + firehose, 134 + is_pds, 135 + msg, 136 + }) { 137 + error!(shard = shard_idx, err = %e, "relay worker: failed to send to shard"); 138 + break; 139 + } 140 + } 141 + 142 + Err(miette::miette!("relay worker dispatcher shutting down")) 143 + } 144 + 145 + fn shard( 146 + id: usize, 147 + mut rx: mpsc::UnboundedReceiver<WorkerMessage>, 148 + state: Arc<AppState>, 149 + verify_signatures: bool, 150 + handle: Handle, 151 + validation_opts: Arc<ValidationOptions>, 152 + ) { 153 + let _guard = handle.enter(); 154 + let span = info_span!("worker_shard", shard = id, did = tracing::field::Empty); 155 + let _entered = span.clone().entered(); 156 + debug!("relay shard started"); 157 + 158 + let mut ctx = WorkerContext { 159 + verify_signatures, 160 + state: &state, 161 + vctx: ValidationContext { 162 + opts: &validation_opts, 163 + }, 164 + batch: state.db.inner.batch(), 165 + pending_broadcasts: Vec::with_capacity(1), 166 + }; 167 + 168 + while let Some(msg) = rx.blocking_recv() { 169 + let (did, seq) = match &msg.msg { 170 + SubscribeReposMessage::Commit(c) => (&c.repo, c.seq), 171 + SubscribeReposMessage::Identity(i) => (&i.did, i.seq), 172 + SubscribeReposMessage::Account(a) => (&a.did, a.seq), 173 + SubscribeReposMessage::Sync(s) => (&s.did, s.seq), 174 + _ => continue, 175 + }; 176 + 177 + span.record("did", &**did); 178 + 179 + let firehose = msg.firehose.clone(); 180 + if let Err(e) = Self::process_message(&mut ctx, msg) { 181 + error!(err = %e, "relay shard: error processing message"); 182 + } 183 + 184 + let res = std::mem::replace(&mut ctx.batch, ctx.state.db.inner.batch()).commit(); 185 + if let Err(e) = res { 186 + error!(shard = id, err = %e, "relay shard: failed to commit batch"); 187 + continue; 188 + } 189 + 190 + for broadcast in ctx.pending_broadcasts.drain(..) { 191 + let _ = state.db.relay_broadcast_tx.send(broadcast); 192 + } 193 + 194 + // advance cursor for this firehose 195 + ctx.state 196 + .firehose_cursors 197 + .peek_with(&firehose, |_, c| c.store(seq, Ordering::SeqCst)); 198 + } 199 + } 200 + 201 + fn process_message(ctx: &mut WorkerContext, msg: WorkerMessage) -> Result<()> { 202 + let did = msg 203 + .msg 204 + .did() 205 + .expect("that we checked if we are in valid commit"); 206 + let mut repo_state = ctx.load_repo_state(did)?; 207 + 208 + if let Some(host) = msg.firehose.host_str() 209 + && msg.is_pds 210 + { 211 + let outcome = ctx.check_host_authority(did, &mut repo_state, host)?; 212 + if let super::AuthorityOutcome::WrongHost { expected } = outcome { 213 + warn!(got = host, expected = %expected, "message rejected: wrong host"); 214 + return Ok(()); 215 + } 216 + } 217 + 218 + match msg.msg { 219 + SubscribeReposMessage::Commit(commit) => { 220 + trace!("processing commit"); 221 + Self::handle_commit(ctx, &mut repo_state, *commit) 222 + } 223 + SubscribeReposMessage::Sync(sync) => { 224 + debug!("processing sync"); 225 + Self::handle_sync(ctx, &mut repo_state, *sync) 226 + } 227 + SubscribeReposMessage::Identity(identity) => { 228 + debug!("processing identity"); 229 + Self::handle_identity(ctx, &mut repo_state, *identity, msg.is_pds) 230 + } 231 + SubscribeReposMessage::Account(account) => { 232 + debug!("processing account"); 233 + Self::handle_account(ctx, &mut repo_state, *account) 234 + } 235 + _ => Ok(()), 236 + } 237 + } 238 + 239 + fn handle_commit( 240 + ctx: &mut WorkerContext, 241 + repo_state: &mut RepoState, 242 + mut commit: Commit<'static>, 243 + ) -> Result<()> { 244 + if repo_state.status != RepoStatus::Synced { 245 + return Ok(()); 246 + } 247 + 248 + let Some(validated) = ctx.validate_commit(repo_state, &commit)? else { 249 + return Ok(()); 250 + }; 251 + let ValidatedCommit { 252 + chain_break, 253 + commit_obj, 254 + .. 255 + } = validated; 256 + 257 + if chain_break.is_broken() { 258 + warn!(broken = ?chain_break, "out of sync"); 259 + // todo: we need Desynchronized on RepoStatus (and Throttled) 260 + repo_state.status = RepoStatus::Error("desynchronized".into()); 261 + } 262 + 263 + let repo_key = keys::repo_key(&commit.repo); 264 + ctx.queue_emit(|seq| { 265 + commit.seq = seq; 266 + encode_frame("#commit", &commit) 267 + })?; 268 + 269 + repo_state.root = Some(commit_obj.into()); 270 + repo_state.touch(); 271 + ctx.batch.insert( 272 + &ctx.state.db.repos, 273 + repo_key, 274 + db::ser_repo_state(repo_state)?, 275 + ); 276 + 277 + Ok(()) 278 + } 279 + 280 + fn handle_sync( 281 + ctx: &mut WorkerContext, 282 + repo_state: &mut RepoState, 283 + mut sync: Sync<'static>, 284 + ) -> Result<()> { 285 + if repo_state.status != RepoStatus::Synced { 286 + return Ok(()); 287 + } 288 + 289 + let Some(validated) = ctx.validate_sync(repo_state, &sync)? else { 290 + return Ok(()); 291 + }; 292 + 293 + let repo_key = keys::repo_key(&sync.did); 294 + ctx.queue_emit(|seq| { 295 + sync.seq = seq; 296 + encode_frame("#sync", &sync) 297 + })?; 298 + 299 + repo_state.root = Some(validated.commit_obj.into()); 300 + repo_state.touch(); 301 + ctx.batch.insert( 302 + &ctx.state.db.repos, 303 + repo_key, 304 + db::ser_repo_state(repo_state)?, 305 + ); 306 + 307 + Ok(()) 308 + } 309 + 310 + fn handle_identity( 311 + ctx: &mut WorkerContext, 312 + repo_state: &mut RepoState, 313 + mut identity: Identity<'static>, 314 + is_pds: bool, 315 + ) -> Result<()> { 316 + let event_ms = identity.time.0.timestamp_millis(); 317 + if repo_state.last_message_time.is_some_and(|t| event_ms <= t) { 318 + debug!("skipping stale/duplicate identity event"); 319 + return Ok(()); 320 + } 321 + repo_state.advance_message_time(event_ms); 322 + 323 + // refresh did doc if a pds sent this event 324 + // or if there is no handle specified 325 + if is_pds || identity.handle.is_none() { 326 + ctx.state.resolver.invalidate_sync(&identity.did); 327 + let doc = Handle::current().block_on(ctx.state.resolver.resolve_doc(&identity.did)); 328 + match doc { 329 + Ok(doc) => { 330 + repo_state.update_from_doc(doc); 331 + } 332 + Err(err) => { 333 + warn!(err = %err, "couldnt fetch identity"); 334 + } 335 + } 336 + } 337 + 338 + // don't pass handle through if it doesnt match ours for pds events 339 + if is_pds && repo_state.handle != identity.handle { 340 + identity.handle = None; 341 + } 342 + 343 + let repo_key = keys::repo_key(&identity.did); 344 + ctx.queue_emit(|seq| { 345 + identity.seq = seq; 346 + encode_frame("#identity", &identity) 347 + })?; 348 + 349 + ctx.batch.insert( 350 + &ctx.state.db.repos, 351 + repo_key, 352 + db::ser_repo_state(repo_state)?, 353 + ); 354 + 355 + Ok(()) 356 + } 357 + 358 + fn handle_account( 359 + ctx: &mut WorkerContext, 360 + repo_state: &mut RepoState, 361 + mut account: Account<'static>, 362 + ) -> Result<()> { 363 + let event_ms = account.time.0.timestamp_millis(); 364 + if repo_state.last_message_time.is_some_and(|t| event_ms <= t) { 365 + debug!("skipping stale/duplicate account event"); 366 + return Ok(()); 367 + } 368 + repo_state.advance_message_time(event_ms); 369 + 370 + if !account.active { 371 + use crate::ingest::stream::AccountStatus; 372 + match &account.status { 373 + Some(AccountStatus::Deleted) => { 374 + // todo: dont remove repo state? 375 + // forward the event and remove repo state 376 + let repo_key = keys::repo_key(&account.did); 377 + ctx.queue_emit(|seq| { 378 + account.seq = seq; 379 + encode_frame("#account", &account) 380 + })?; 381 + ctx.batch.remove(&ctx.state.db.repos, repo_key); 382 + return Ok(()); 383 + } 384 + status => { 385 + repo_state.status = super::inactive_account_repo_status(&account.did, status); 386 + } 387 + } 388 + } else { 389 + repo_state.status = RepoStatus::Synced; 390 + } 391 + 392 + let repo_key = keys::repo_key(&account.did); 393 + ctx.queue_emit(|seq| { 394 + account.seq = seq; 395 + encode_frame("#account", &account) 396 + })?; 397 + 398 + repo_state.touch(); 399 + ctx.batch.insert( 400 + &ctx.state.db.repos, 401 + repo_key, 402 + db::ser_repo_state(repo_state)?, 403 + ); 404 + 405 + Ok(()) 406 + } 407 + } 408 + 409 + impl WorkerContext<'_> { 410 + fn check_host_authority( 411 + &mut self, 412 + did: &Did, 413 + repo_state: &mut RepoState, 414 + source_host: &str, 415 + ) -> Result<super::AuthorityOutcome> { 416 + let outcome = 417 + super::check_host_authority(&self.state.resolver, did, repo_state, source_host)?; 418 + if !matches!(outcome, super::AuthorityOutcome::Authorized) { 419 + self.batch.insert( 420 + &self.state.db.repos, 421 + keys::repo_key(did), 422 + db::ser_repo_state(repo_state)?, 423 + ); 424 + } 425 + Ok(outcome) 426 + } 427 + 428 + fn refresh_doc(&mut self, did: &Did, repo_state: &mut RepoState) -> Result<()> { 429 + super::refresh_doc(&self.state.resolver, did, repo_state)?; 430 + self.batch.insert( 431 + &self.state.db.repos, 432 + keys::repo_key(did), 433 + db::ser_repo_state(repo_state)?, 434 + ); 435 + Ok(()) 436 + } 437 + 438 + fn validate_commit<'c>( 439 + &mut self, 440 + repo_state: &mut RepoState, 441 + commit: &'c Commit<'c>, 442 + ) -> Result<Option<ValidatedCommit<'c>>> { 443 + let did = &commit.repo; 444 + let key = self.fetch_key(did)?; 445 + match self.vctx.validate_commit(commit, repo_state, key.as_ref()) { 446 + Ok(v) => return Ok(Some(v)), 447 + Err(CommitValidationError::StaleRev) => { 448 + trace!("skipping replayed commit"); 449 + return Ok(None); 450 + } 451 + Err(CommitValidationError::SigFailure) => {} 452 + Err(e) => { 453 + warn!(err = %e, "commit rejected"); 454 + return Ok(None); 455 + } 456 + } 457 + 458 + self.refresh_doc(did, repo_state)?; 459 + let key = self.fetch_key(did)?; 460 + match self.vctx.validate_commit(commit, repo_state, key.as_ref()) { 461 + Ok(v) => Ok(Some(v)), 462 + Err(e) => { 463 + warn!(err = %e, "commit rejected after key refresh"); 464 + Ok(None) 465 + } 466 + } 467 + } 468 + 469 + fn validate_sync( 470 + &mut self, 471 + repo_state: &mut RepoState, 472 + sync: &Sync<'_>, 473 + ) -> Result<Option<ValidatedSync>> { 474 + let did = &sync.did; 475 + let key = self.fetch_key(did)?; 476 + match self.vctx.validate_sync(sync, key.as_ref()) { 477 + Ok(v) => return Ok(Some(v)), 478 + Err(SyncValidationError::SigFailure) => {} 479 + Err(e) => { 480 + warn!(err = %e, "sync rejected"); 481 + return Ok(None); 482 + } 483 + } 484 + 485 + self.refresh_doc(did, repo_state)?; 486 + let key = self.fetch_key(did)?; 487 + match self.vctx.validate_sync(sync, key.as_ref()) { 488 + Ok(v) => Ok(Some(v)), 489 + Err(e) => { 490 + warn!(err = %e, "sync rejected after key refresh"); 491 + Ok(None) 492 + } 493 + } 494 + } 495 + 496 + fn fetch_key(&self, did: &Did) -> Result<Option<PublicKey<'static>>> { 497 + super::fetch_key(&self.state.resolver, self.verify_signatures, did) 498 + } 499 + 500 + fn load_repo_state(&self, did: &Did) -> Result<RepoState<'static>> { 501 + let key = keys::repo_key(did); 502 + let Some(bytes) = self.state.db.repos.get(&key).into_diagnostic()? else { 503 + return Ok(RepoState { 504 + status: RepoStatus::Synced, 505 + root: None, 506 + last_updated_at: chrono::Utc::now().timestamp(), 507 + index_id: 0, 508 + tracked: true, 509 + handle: None, 510 + pds: None, 511 + signing_key: None, 512 + last_message_time: None, 513 + }); 514 + }; 515 + Ok(db::deser_repo_state(&bytes)?.into_static()) 516 + } 517 + 518 + fn queue_emit(&mut self, make_frame: impl FnOnce(i64) -> Result<bytes::Bytes>) -> Result<()> { 519 + let seq = self.state.db.next_relay_seq.fetch_add(1, Ordering::SeqCst); 520 + let frame = make_frame(seq as i64)?; 521 + self.batch.insert( 522 + &self.state.db.relay_events, 523 + keys::relay_event_key(seq), 524 + frame.as_ref(), 525 + ); 526 + self.pending_broadcasts.push(RelayBroadcast::Persisted(seq)); 527 + Ok(()) 528 + } 529 + }
+34 -5
src/ingest/stream.rs
··· 256 256 pub too_big: bool, 257 257 } 258 258 259 - #[derive(serde::Deserialize, Debug, Clone, jacquard_derive::IntoStatic)] 259 + #[derive(serde::Deserialize, serde::Serialize, Debug, Clone, jacquard_derive::IntoStatic)] 260 260 #[serde(rename_all = "camelCase")] 261 261 pub struct Identity<'a> { 262 262 #[serde(borrow)] ··· 377 377 } 378 378 } 379 379 380 - #[derive(serde::Deserialize, Debug, Clone, jacquard_derive::IntoStatic)] 380 + #[derive(serde::Deserialize, serde::Serialize, Debug, Clone, jacquard_derive::IntoStatic)] 381 381 #[serde(rename_all = "camelCase")] 382 382 pub struct Account<'a> { 383 383 pub active: bool, ··· 388 388 pub time: Datetime, 389 389 } 390 390 391 - #[derive(serde::Deserialize, Debug, Clone, jacquard_derive::IntoStatic)] 391 + #[derive(serde::Deserialize, serde::Serialize, Debug, Clone, jacquard_derive::IntoStatic)] 392 392 #[serde(rename_all = "camelCase")] 393 393 pub struct Sync<'a> { 394 394 #[serde(with = "jacquard_common::serde_bytes_helper")] ··· 484 484 } 485 485 } 486 486 487 - #[derive(serde::Deserialize, Debug, Clone, jacquard_derive::IntoStatic)] 487 + #[derive(serde::Deserialize, serde::Serialize, Debug, Clone, jacquard_derive::IntoStatic)] 488 488 #[serde(rename_all = "camelCase")] 489 489 pub struct Info<'a> { 490 490 #[serde(skip_serializing_if = "Option::is_none")] ··· 501 501 Identity(Box<Identity<'i>>), 502 502 Account(Box<Account<'i>>), 503 503 Info(Box<Info<'i>>), 504 + } 505 + 506 + impl<'i> SubscribeReposMessage<'i> { 507 + pub fn did<'s>(&'s self) -> Option<&'s Did<'i>> { 508 + Some(match self { 509 + SubscribeReposMessage::Commit(c) => &c.repo, 510 + SubscribeReposMessage::Identity(i) => &i.did, 511 + SubscribeReposMessage::Account(a) => &a.did, 512 + SubscribeReposMessage::Sync(s) => &s.did, 513 + _ => return None, 514 + }) 515 + } 504 516 } 505 517 506 518 use serde::Deserialize; ··· 522 534 } 523 535 } 524 536 525 - #[derive(Debug, Deserialize)] 537 + #[derive(Debug, Deserialize, serde::Serialize)] 526 538 struct EventHeader { 527 539 op: i64, 528 540 t: Option<String>, ··· 564 576 }; 565 577 566 578 Ok(msg) 579 + } 580 + 581 + #[cfg(feature = "relay")] 582 + #[derive(serde::Serialize)] 583 + struct EncodeHeader<'a> { 584 + op: i64, 585 + t: &'a str, 586 + } 587 + 588 + #[cfg(feature = "relay")] 589 + pub fn encode_frame<T: serde::Serialize>(t: &str, body: &T) -> miette::Result<bytes::Bytes> { 590 + let mut buf = serde_ipld_dagcbor::to_vec(&EncodeHeader { op: 1, t }) 591 + .map_err(|e| miette::miette!("encode_frame header: {e}"))?; 592 + buf.extend_from_slice( 593 + &serde_ipld_dagcbor::to_vec(body).map_err(|e| miette::miette!("encode_frame body: {e}"))?, 594 + ); 595 + Ok(bytes::Bytes::from(buf)) 567 596 } 568 597 569 598 #[cfg(test)]
+58 -42
src/ingest/validation.rs
··· 1 1 use jacquard_common::IntoStatic; 2 - use jacquard_common::types::cid::Cid; 3 2 use jacquard_common::types::crypto::PublicKey; 4 3 use jacquard_repo::MemoryBlockStore; 5 4 use jacquard_repo::Mst; ··· 50 49 #[error("field mismatch in {field}")] 51 50 FieldMismatch { field: &'static str }, 52 51 /// signature verification failed. 53 - /// `refreshed: false` means the key has not been re-fetched yet; 54 - /// the caller should refresh and retry with the same commit. 55 52 #[error("signature verification failed")] 56 - SigFailure { refreshed: bool }, 53 + SigFailure, 57 54 /// a block, op count, or record exceeds the ATProto size limits 58 55 #[error("size limit exceeded: {0}")] 59 56 SizeLimitExceeded(SizeLimitKind), ··· 80 77 FieldMismatch { field: &'static str }, 81 78 /// signature verification failed 82 79 #[error("signature verification failed")] 83 - SigFailure { refreshed: bool }, 80 + SigFailure, 84 81 } 85 82 86 83 /// indicates that the commit's chain pointers do not match the last known repo state. 87 84 /// this is not a hard rejection so callers can decide whta they want to do 85 + #[derive(Default, Debug)] 88 86 pub struct ChainBreak { 89 87 /// msg.since is present and does not match the last known rev 90 88 pub since_mismatch: bool, ··· 92 90 pub prev_data_mismatch: bool, 93 91 } 94 92 93 + impl ChainBreak { 94 + pub fn is_broken(&self) -> bool { 95 + self.since_mismatch || self.prev_data_mismatch 96 + } 97 + } 98 + 95 99 /// a successfully validated `#commit` message, carrying pre-parsed data for apply_commit 96 100 pub struct ValidatedCommit<'c> { 97 101 pub commit: &'c Commit<'c>, 98 102 /// result of parse_car_bytes, already done so apply_commit does not re-parse 99 103 pub parsed_blocks: ParsedCar, 100 - /// deserialized and signature-verified commit object 101 104 pub commit_obj: AtpCommit<'static>, 102 - /// Some if chain pointers are inconsistent with last known state 103 - pub chain_break: Option<ChainBreak>, 105 + pub chain_break: ChainBreak, 104 106 } 105 107 106 108 /// a successfully validated `#sync` message 107 109 pub struct ValidatedSync { 108 - /// MST root CID from the commit object, used to detect noop syncs 109 - pub data_cid: Cid<'static>, 110 - /// rev string from the commit object, used to detect stale syncs 111 - pub rev: String, 110 + pub commit_obj: AtpCommit<'static>, 112 111 } 113 112 114 113 pub struct ValidationOptions { ··· 127 126 } 128 127 } 129 128 129 + /// all methods panic if called outside a tokio runtime context. 130 + pub struct ValidationContext<'a> { 131 + pub opts: &'a ValidationOptions, 132 + } 133 + 134 + impl ValidationContext<'_> { 135 + pub fn validate_commit<'c>( 136 + &self, 137 + msg: &'c Commit<'c>, 138 + repo_state: &RepoState, 139 + signing_key: Option<&PublicKey>, 140 + ) -> Result<ValidatedCommit<'c>, CommitValidationError> { 141 + validate_commit(msg, repo_state, signing_key, self.opts) 142 + } 143 + 144 + pub fn validate_sync( 145 + &self, 146 + msg: &Sync<'_>, 147 + signing_key: Option<&PublicKey>, 148 + ) -> Result<ValidatedSync, SyncValidationError> { 149 + validate_sync(msg, signing_key) 150 + } 151 + } 152 + 130 153 /// validate an incoming `#commit` message. 131 154 /// 132 155 /// on success, returns a `ValidatedCommit` carrying pre-parsed data so that ··· 135 158 /// chain-break (since/prevData mismatch) is NOT an error. callers check 136 159 /// `validated.chain_break.is_some()` and decide how to respond. 137 160 /// 138 - /// - `repo_state`: `None` for the first-ever commit for this DID. 139 161 /// - `signing_key`: `None` when signature verification is disabled. 162 + /// 163 + /// panics if called outside a tokio runtime context. 140 164 pub fn validate_commit<'c>( 141 165 msg: &'c Commit<'c>, 142 - repo_state: Option<&RepoState>, 166 + repo_state: &RepoState, 143 167 signing_key: Option<&PublicKey>, 144 168 opts: &ValidationOptions, 145 - handle: &tokio::runtime::Handle, 146 169 ) -> Result<ValidatedCommit<'c>, CommitValidationError> { 170 + let handle = tokio::runtime::Handle::current(); 147 171 const MAX_BLOCKS_BYTES: usize = 2_097_152; // 2 MiB 148 172 const MAX_OPS: usize = 200; 149 173 const MAX_RECORD_BYTES: usize = 1_048_576; // 1 MiB ··· 161 185 } 162 186 163 187 // 2. stale rev, skip if msg.rev <= last known rev (lexicographic order) 164 - if let Some(state) = repo_state { 165 - if let Some(root) = &state.root { 166 - if msg.rev.as_str() <= root.rev.to_tid().as_str() { 167 - return Err(CommitValidationError::StaleRev); 168 - } 188 + if let Some(root) = &repo_state.root { 189 + if msg.rev.as_str() <= root.rev.to_tid().as_str() { 190 + return Err(CommitValidationError::StaleRev); 169 191 } 170 192 } 171 193 ··· 203 225 if let Some(key) = signing_key { 204 226 commit_obj 205 227 .verify(key) 206 - .map_err(|_| CommitValidationError::SigFailure { refreshed: false })?; 228 + .map_err(|_| CommitValidationError::SigFailure)?; 207 229 } 208 230 209 231 let commit_obj = commit_obj.into_static(); 210 232 211 233 // 8. chain break checks 212 - let chain_break = chain_break_check(msg, repo_state); 234 + let chain_break = repo_state 235 + .root 236 + .as_ref() 237 + .map(|r| breaks_chain(msg, r)) 238 + .unwrap_or_default(); 213 239 214 240 // 9–10. per-record size limits and basic CBOR validity 215 241 for op in &msg.ops { ··· 236 262 237 263 // 11. MST inversion 238 264 if opts.verify_mst { 239 - verify_mst(msg, &parsed, &commit_obj, handle).map_err(CommitValidationError::MstInvalid)?; 265 + verify_mst(msg, &parsed, &commit_obj, &handle) 266 + .map_err(CommitValidationError::MstInvalid)?; 240 267 } 241 268 242 269 Ok(ValidatedCommit { ··· 247 274 }) 248 275 } 249 276 250 - /// validate an incoming `#sync` message. 251 - /// 252 - /// replaces `ops::verify_sync_event`, adding field consistency checks. 277 + /// panics if called outside a tokio runtime context. 253 278 pub fn validate_sync<'c>( 254 279 msg: &'c Sync<'c>, 255 280 signing_key: Option<&PublicKey>, 256 - handle: &tokio::runtime::Handle, 257 281 ) -> Result<ValidatedSync, SyncValidationError> { 282 + let handle = tokio::runtime::Handle::current(); 258 283 const MAX_BLOCKS_BYTES: usize = 2_097_152; 259 284 260 285 // 1. size limit ··· 287 312 if let Some(key) = signing_key { 288 313 commit_obj 289 314 .verify(key) 290 - .map_err(|_| SyncValidationError::SigFailure { refreshed: false })?; 315 + .map_err(|_| SyncValidationError::SigFailure)?; 291 316 } 292 317 293 318 Ok(ValidatedSync { 294 - data_cid: Cid::ipld(commit_obj.data).into_static(), 295 - rev: commit_obj.rev.to_string(), 319 + commit_obj: commit_obj.into_static(), 296 320 }) 297 321 } 298 322 299 - /// compare msg chain pointers against known repo state and return a `ChainBreak` if inconsistent. 300 - fn chain_break_check(msg: &Commit<'_>, repo_state: Option<&RepoState>) -> Option<ChainBreak> { 301 - let state = repo_state?; 302 - let root = state.root.as_ref()?; 303 - 323 + fn breaks_chain(msg: &Commit<'_>, root: &crate::types::Commit) -> ChainBreak { 304 324 // since should equal the rev of the previous commit; only flag when since is present and wrong 305 325 let since_mismatch = msg 306 326 .since ··· 317 337 None => true, // no prev_data but we have a previous state is a chain break 318 338 }; 319 339 320 - if since_mismatch || prev_data_mismatch { 321 - Some(ChainBreak { 322 - since_mismatch, 323 - prev_data_mismatch, 324 - }) 325 - } else { 326 - None 340 + ChainBreak { 341 + since_mismatch, 342 + prev_data_mismatch, 327 343 } 328 344 } 329 345
+198 -259
src/ingest/worker.rs
··· 1 + use super::*; 1 2 use crate::db::{self, keys}; 2 3 use crate::filter::FilterMode; 3 4 use crate::ingest::stream::{Account, Commit, Identity, SubscribeReposMessage, Sync}; 4 5 use crate::ingest::validation::{ 5 - CommitValidationError, SyncValidationError, ValidationOptions, validate_commit, validate_sync, 6 + CommitValidationError, SyncValidationError, ValidatedCommit, ValidatedSync, ValidationContext, 7 + ValidationOptions, 6 8 }; 7 - use crate::ingest::{BufferRx, IngestMessage}; 8 9 use crate::ops; 9 10 use crate::resolver::{NoSigningKeyError, ResolverError}; 10 11 use crate::state::AppState; ··· 14 15 15 16 use jacquard_common::IntoStatic; 16 17 use jacquard_common::cowstr::ToCowStr; 17 - use jacquard_common::types::crypto::PublicKey; 18 18 use jacquard_common::types::did::Did; 19 19 use jacquard_repo::error::CommitError; 20 20 use miette::{Diagnostic, IntoDiagnostic, Result}; 21 21 use rand::Rng; 22 - use smol_str::ToSmolStr; 23 22 use std::collections::hash_map::DefaultHasher; 24 23 use std::hash::{Hash, Hasher}; 25 24 use std::sync::Arc; 26 25 use std::sync::atomic::Ordering::SeqCst; 27 26 use thiserror::Error; 27 + use tokio::runtime::Handle; 28 28 use tokio::sync::mpsc; 29 29 use tracing::{debug, error, info, trace, warn}; 30 30 ··· 52 52 } 53 53 } 54 54 55 - enum HostAuthorityOutcome { 56 - /// stored pds matched the source host immediately. 57 - Authorized, 58 - /// pds migrated: doc now points to this host, but our stored state was stale. trigger backfill. 59 - Migration, 60 - /// host did not match even after doc resolution. reject the message. 61 - WrongHost, 62 - } 63 - 64 55 // gate returned by check_repo_state, tells the shard loop what to do with the message 65 56 enum ProcessGate<'s, 'c> { 66 57 // did not exist in db, newly queued for backfill, drop ··· 101 92 added_blocks: &'a mut i64, 102 93 records_delta: &'a mut i64, 103 94 broadcast_events: &'a mut Vec<BroadcastEvent>, 104 - handle: &'a tokio::runtime::Handle, 105 - validation_opts: &'a ValidationOptions, 95 + vctx: ValidationContext<'a>, 106 96 } 107 97 108 98 impl FirehoseWorker { ··· 127 117 // starts the worker threads and the main dispatch loop 128 118 // the dispatch loop reads from the firehose channel and 129 119 // distributes messages to shards based on the hash of the DID 130 - pub fn run(mut self, handle: tokio::runtime::Handle) -> Result<()> { 120 + pub fn run(mut self, handle: Handle) -> Result<()> { 131 121 let mut shards = Vec::with_capacity(self.num_shards); 132 122 133 123 for i in 0..self.num_shards { ··· 191 181 state: Arc<AppState>, 192 182 verify_signatures: bool, 193 183 ephemeral: bool, 194 - handle: tokio::runtime::Handle, 184 + handle: Handle, 195 185 validation_opts: Arc<ValidationOptions>, 196 186 ) { 197 187 let _guard = handle.enter(); ··· 212 202 added_blocks: &mut added_blocks, 213 203 records_delta: &mut records_delta, 214 204 broadcast_events: &mut broadcast_events, 215 - handle: &handle, 205 + vctx: ValidationContext { 206 + opts: &validation_opts, 207 + }, 216 208 verify_signatures, 217 209 ephemeral, 218 - validation_opts: &validation_opts, 219 210 }; 220 211 221 212 match msg { ··· 258 249 } 259 250 } 260 251 } 261 - IngestMessage::Firehose { relay, is_pds, msg } => { 262 - let _span = tracing::info_span!("firehose", relay = %relay).entered(); 263 - // only enforce host authority when the source is a direct PDS connection 264 - let source_host = is_pds.then(|| relay.host_str()).flatten(); 252 + IngestMessage::Firehose { 253 + relay: firehose, 254 + is_pds, 255 + msg, 256 + } => { 257 + let _span = tracing::info_span!("firehose", relay = %firehose).entered(); 265 258 let (did, seq) = match &msg { 266 259 SubscribeReposMessage::Commit(c) => (&c.repo, c.seq), 267 260 SubscribeReposMessage::Identity(i) => (&i.did, i.seq), ··· 278 271 } 279 272 error!(did = %did, err = %e, "error in check_repo_state"); 280 273 state 281 - .relay_cursors 282 - .peek_with(&relay, |_, c| c.store(seq, SeqCst)); 274 + .firehose_cursors 275 + .peek_with(&firehose, |_, c| c.store(seq, SeqCst)); 283 276 continue; 284 277 } 285 278 }; ··· 299 292 } 300 293 } 301 294 ProcessGate::Ready(mut repo_state) => { 295 + // first validate the pds host 296 + if let Some(host) = firehose.host_str() 297 + && is_pds 298 + { 299 + let authority = match Self::check_host_authority( 300 + &mut ctx, 301 + did, 302 + &mut repo_state, 303 + host, 304 + ) { 305 + Ok(a) => a, 306 + Err(e) => { 307 + error!(did = %did, err = %e, "failed to check host authority"); 308 + state 309 + .firehose_cursors 310 + .peek_with(&firehose, |_, c| c.store(seq, SeqCst)); 311 + continue; 312 + } 313 + }; 314 + match authority { 315 + AuthorityOutcome::Authorized => {} 316 + AuthorityOutcome::WasStale => { 317 + // pds migrated: our data may be stale, backfill from the new host 318 + warn!(did = %did, source_host = host, "pds migration detected, triggering backfill"); 319 + if let Err(e) = 320 + Self::trigger_backfill(&mut ctx, did, repo_state) 321 + { 322 + error!(did = %did, err = %e, "failed to trigger backfill"); 323 + } else if let SubscribeReposMessage::Commit(commit) = &msg { 324 + if let Err(e) = ops::persist_to_resync_buffer( 325 + &state.db, did, commit, 326 + ) { 327 + error!( 328 + did = %did, err = %e, 329 + "failed to persist commit to resync_buffer" 330 + ); 331 + } 332 + } 333 + state 334 + .firehose_cursors 335 + .peek_with(&firehose, |_, c| c.store(seq, SeqCst)); 336 + continue; 337 + } 338 + // todo: ideally ban pds 339 + AuthorityOutcome::WrongHost { expected } => { 340 + warn!(did = %did, got = host, expected = %expected, "commit rejected: wrong host"); 341 + state 342 + .firehose_cursors 343 + .peek_with(&firehose, |_, c| c.store(seq, SeqCst)); 344 + continue; 345 + } 346 + } 347 + } 348 + 302 349 let pre_status = repo_state.status.clone(); 303 350 304 351 // if it was in deactivated/takendown/suspended state, we can mark it ··· 332 379 "failed to transition inactive repo to synced" 333 380 ); 334 381 state 335 - .relay_cursors 336 - .peek_with(&relay, |_, c| c.store(seq, SeqCst)); 382 + .firehose_cursors 383 + .peek_with(&firehose, |_, c| { 384 + c.store(seq, SeqCst) 385 + }); 337 386 continue; 338 387 } 339 388 } ··· 341 390 } 342 391 } 343 392 344 - match Self::process_message( 345 - &mut ctx, 346 - &msg, 347 - did, 348 - repo_state, 349 - pre_status, 350 - source_host, 351 - ) { 393 + match Self::process_message(&mut ctx, &msg, did, repo_state, pre_status) 394 + { 352 395 Ok(RepoProcessResult::Ok(_)) => {} 353 396 Ok(RepoProcessResult::Deleted) => { 354 397 state.db.update_count("repos", -1); ··· 386 429 } 387 430 } 388 431 389 - // todo: consider not using seqcst 390 432 state 391 - .relay_cursors 392 - .peek_with(&relay, |_, c| c.store(seq, SeqCst)); 433 + .firehose_cursors 434 + .peek_with(&firehose, |_, c| c.store(seq, SeqCst)); 393 435 } 394 436 } 395 437 ··· 407 449 let _ = state.db.event_tx.send(evt); 408 450 } 409 451 410 - state.db.inner.persist(fjall::PersistMode::Buffer).ok(); 452 + // state.db.inner.persist(fjall::PersistMode::Buffer).ok(); 411 453 } 412 454 } 413 455 ··· 428 470 did: &Did, 429 471 repo_state: RepoState<'s>, 430 472 pre_status: RepoStatus, 431 - source_host: Option<&str>, 432 473 ) -> Result<RepoProcessResult<'s, 'c>, IngestError> { 433 474 match msg { 434 475 SubscribeReposMessage::Commit(commit) => { 435 476 trace!(did = %did, "processing commit"); 436 - Self::handle_commit(ctx, did, repo_state, commit, source_host) 477 + Self::handle_commit(ctx, did, repo_state, commit) 437 478 } 438 479 SubscribeReposMessage::Sync(sync) => { 439 480 debug!(did = %did, "processing sync"); 440 - Self::handle_sync(ctx, did, repo_state, sync, source_host) 481 + Self::handle_sync(ctx, did, repo_state, sync) 441 482 } 442 483 SubscribeReposMessage::Identity(identity) => { 443 484 debug!(did = %did, "processing identity"); ··· 459 500 did: &Did, 460 501 mut repo_state: RepoState<'s>, 461 502 commit: &'c Commit<'c>, 462 - source_host: Option<&str>, 463 503 ) -> Result<RepoProcessResult<'s, 'c>, IngestError> { 464 504 repo_state.advance_message_time(commit.time.0.timestamp_millis()); 465 505 466 - if let Some(host) = source_host { 467 - match Self::check_host_authority(ctx, did, &mut repo_state, host)? { 468 - HostAuthorityOutcome::Authorized => {} 469 - HostAuthorityOutcome::Migration => { 470 - // pds migrated: our data may be stale, backfill from the new host 471 - warn!(did = %did, source_host = host, "pds migration detected, triggering backfill"); 472 - let mut batch = ctx.state.db.inner.batch(); 473 - let _repo_state = ops::update_repo_status( 474 - &mut batch, 475 - &ctx.state.db, 476 - did, 477 - repo_state, 478 - RepoStatus::Backfilling, 479 - )?; 480 - batch.commit().into_diagnostic()?; 481 - ctx.state 482 - .db 483 - .update_gauge_diff(&GaugeState::Synced, &GaugeState::Pending); 484 - ctx.state.notify_backfill(); 485 - return Ok(RepoProcessResult::NeedsBackfill(Some(commit))); 486 - } 487 - // todo: ideally ban pds 488 - HostAuthorityOutcome::WrongHost => { 489 - warn!(did = %did, source_host = host, pds = ?repo_state.pds, "commit rejected: wrong host"); 490 - return Ok(RepoProcessResult::Ok(repo_state)); 491 - } 492 - } 493 - } 494 - 495 - // validate the commit: stale rev, size limits, future rev, CAR parse, field 496 - // consistency, signature, and chain-break detection 497 - let signing_key = Self::fetch_key(ctx, did)?; 498 - let validated = match validate_commit( 499 - commit, 500 - Some(&repo_state), 501 - signing_key.as_ref(), 502 - ctx.validation_opts, 503 - ctx.handle, 504 - ) { 505 - Ok(v) => v, 506 - Err(CommitValidationError::StaleRev) => { 507 - debug!( 508 - did = %did, 509 - commit_rev = %commit.rev, 510 - "skipping replayed commit" 511 - ); 512 - return Ok(RepoProcessResult::Ok(repo_state)); 513 - } 514 - Err(CommitValidationError::SigFailure { .. }) => { 515 - // refresh key and retry once 516 - Self::refresh_doc(ctx, &mut repo_state, did)?; 517 - let refreshed_key = Self::fetch_key(ctx, did)?; 518 - match validate_commit( 519 - commit, 520 - Some(&repo_state), 521 - refreshed_key.as_ref(), 522 - ctx.validation_opts, 523 - ctx.handle, 524 - ) { 525 - Ok(v) => v, 526 - Err(e) => { 527 - warn!(did = %did, err = %e, "commit rejected after key refresh"); 528 - return Ok(RepoProcessResult::Ok(repo_state)); 529 - } 530 - } 531 - } 532 - Err(e) => { 533 - warn!(did = %did, err = %e, "commit rejected"); 534 - return Ok(RepoProcessResult::Ok(repo_state)); 535 - } 506 + let Some(validated) = ctx.validate_commit(did, &mut repo_state, commit)? else { 507 + return Ok(RepoProcessResult::Ok(repo_state)); 536 508 }; 537 509 538 - // chain break: prev_data or since mismatch against last known state → backfill 539 - if let Some(cb) = &validated.chain_break { 510 + if validated.chain_break.is_broken() { 540 511 warn!( 541 512 did = %did, 542 - since_mismatch = cb.since_mismatch, 543 - prev_data_mismatch = cb.prev_data_mismatch, 513 + broken = ?validated.chain_break, 544 514 "chain break detected, triggering backfill" 545 515 ); 546 - let mut batch = ctx.state.db.inner.batch(); 547 - let _repo_state = ops::update_repo_status( 548 - &mut batch, 549 - &ctx.state.db, 550 - did, 551 - repo_state, 552 - RepoStatus::Backfilling, 553 - )?; 554 - batch.commit().into_diagnostic()?; 555 - ctx.state 556 - .db 557 - .update_gauge_diff(&GaugeState::Synced, &GaugeState::Pending); 558 - ctx.state.notify_backfill(); 516 + Self::trigger_backfill(ctx, did, repo_state)?; 517 + // not updating repo state root commit since we are backfilling anyway 559 518 return Ok(RepoProcessResult::NeedsBackfill(Some(commit))); 560 519 } 561 520 ··· 582 541 did: &Did, 583 542 mut repo_state: RepoState<'s>, 584 543 sync: &'c Sync<'c>, 585 - source_host: Option<&str>, 586 544 ) -> Result<RepoProcessResult<'s, 'c>, IngestError> { 587 545 repo_state.advance_message_time(sync.time.0.timestamp_millis()); 588 546 589 - if let Some(host) = source_host { 590 - match Self::check_host_authority(ctx, did, &mut repo_state, host)? { 591 - HostAuthorityOutcome::Authorized | HostAuthorityOutcome::Migration => { 592 - // migration is fine here — sync already triggers a backfill below 593 - } 594 - // todo: ideally ban pds 595 - HostAuthorityOutcome::WrongHost => { 596 - warn!(did = %did, source_host = host, pds = ?repo_state.pds, "sync rejected: wrong host"); 597 - return Ok(RepoProcessResult::Ok(repo_state)); 598 - } 599 - } 600 - } 601 - 602 - // validate: size limit, CAR parse, field consistency, signature 603 - let signing_key = Self::fetch_key(ctx, did)?; 604 - let validated = match validate_sync(sync, signing_key.as_ref(), ctx.handle) { 605 - Ok(v) => v, 606 - Err(SyncValidationError::SigFailure { .. }) => { 607 - // refresh key and retry once (same pattern as handle_commit) 608 - Self::refresh_doc(ctx, &mut repo_state, did)?; 609 - let refreshed_key = Self::fetch_key(ctx, did)?; 610 - match validate_sync(sync, refreshed_key.as_ref(), ctx.handle) { 611 - Ok(v) => v, 612 - Err(e) => { 613 - warn!(did = %did, err = %e, "sync rejected after key refresh"); 614 - return Ok(RepoProcessResult::Ok(repo_state)); 615 - } 616 - } 617 - } 618 - Err(e) => { 619 - warn!(did = %did, err = %e, "sync rejected"); 620 - return Ok(RepoProcessResult::Ok(repo_state)); 621 - } 547 + let Some(validated) = ctx.validate_sync(did, &mut repo_state, sync)? else { 548 + return Ok(RepoProcessResult::Ok(repo_state)); 622 549 }; 623 550 624 551 // skip noop syncs (data CID unchanged) 625 552 if let Some(current_commit) = &repo_state.root { 626 - if current_commit.data == validated.data_cid.to_ipld().expect("valid cid") { 553 + if current_commit.data == validated.commit_obj.data { 627 554 debug!(did = %did, "skipping noop sync"); 628 555 return Ok(RepoProcessResult::Ok(repo_state)); 629 556 } 630 557 631 - if validated.rev.as_str() <= current_commit.rev.to_tid().as_str() { 558 + if validated.commit_obj.rev.as_str() <= current_commit.rev.to_tid().as_str() { 632 559 debug!(did = %did, "skipping replayed sync"); 633 560 return Ok(RepoProcessResult::Ok(repo_state)); 634 561 } 635 562 } 563 + // not updating repo state root commit since we are backfilling anyway 636 564 637 565 warn!(did = %did, "sync event, triggering backfill"); 638 - let mut batch = ctx.state.db.inner.batch(); 639 - repo_state = ops::update_repo_status( 640 - &mut batch, 641 - &ctx.state.db, 642 - did, 643 - repo_state, 644 - RepoStatus::Backfilling, 645 - )?; 646 - batch.commit().into_diagnostic()?; 647 - ctx.state 648 - .db 649 - .update_gauge_diff(&GaugeState::Synced, &GaugeState::Pending); 650 - ctx.state.notify_backfill(); 566 + let repo_state = Self::trigger_backfill(ctx, did, repo_state)?; 651 567 Ok(RepoProcessResult::Ok(repo_state)) 652 568 } 653 569 ··· 664 580 } 665 581 repo_state.advance_message_time(event_ms); 666 582 583 + // todo: make this match relay sync behaviour 667 584 let changed = if identity.handle.is_none() { 668 585 // no handle sent is basically "invalidate your caches" 669 586 ctx.state.resolver.invalidate_sync(did); 670 - let doc = ctx.handle.block_on(ctx.state.resolver.resolve_doc(did))?; 587 + let doc = Handle::current().block_on(ctx.state.resolver.resolve_doc(did))?; 671 588 repo_state.update_from_doc(doc) 672 589 } else { 673 590 let old_handle = repo_state.handle.clone(); ··· 724 641 status: account.status.as_ref().map(|s| s.to_cowstr().into_static()), 725 642 }; 726 643 727 - Self::refresh_doc(ctx, &mut repo_state, did)?; 644 + ctx.refresh_doc(&mut repo_state, did)?; 728 645 729 646 if !account.active { 730 647 use crate::ingest::stream::AccountStatus; ··· 735 652 return Ok(RepoProcessResult::Deleted); 736 653 } 737 654 status => { 738 - let target_status = match status { 739 - Some(status) => match status { 740 - AccountStatus::Deleted => { 741 - unreachable!("deleted account status is handled before") 742 - } 743 - AccountStatus::Takendown => RepoStatus::Takendown, 744 - AccountStatus::Suspended => RepoStatus::Suspended, 745 - AccountStatus::Deactivated => RepoStatus::Deactivated, 746 - AccountStatus::Throttled => RepoStatus::Error("throttled".into()), 747 - AccountStatus::Desynchronized => { 748 - RepoStatus::Error("desynchronized".into()) 749 - } 750 - AccountStatus::Other(s) => { 751 - warn!( 752 - did = %did, status = %s, 753 - "unknown account status, will put in error state" 754 - ); 755 - RepoStatus::Error(s.to_smolstr()) 756 - } 757 - }, 758 - None => { 759 - warn!(did = %did, "account inactive but no status provided"); 760 - RepoStatus::Error("unknown".into()) 761 - } 762 - }; 655 + let target_status = inactive_account_repo_status(did, status); 763 656 764 657 if repo_state.status == target_status { 765 658 debug!(did = %did, ?target_status, "account status unchanged"); ··· 923 816 let commit: Commit = rmp_serde::from_slice(&value).into_diagnostic()?; 924 817 925 818 // buffered commits have already been source-checked on arrival; skip host check 926 - let res = Self::handle_commit(ctx, did, repo_state, &commit, None); 819 + let res = Self::handle_commit(ctx, did, repo_state, &commit); 927 820 let res = match res { 928 821 Ok(r) => r, 929 822 Err(e) => { ··· 952 845 Ok(RepoProcessResult::Ok(repo_state)) 953 846 } 954 847 955 - /// check that `source_host` is the authoritative PDS for `did`. 956 - /// 957 - /// - `Authorized`: stored pds matched immediately (fast path). 958 - /// - `Migration`: stored pds was wrong but doc resolved to this host; caller should backfill. 959 - /// - `WrongHost`: host did not match even after doc resolution; caller should reject. 848 + // transitions repo to Backfilling, commits the status change immediately (separate from 849 + // ctx.batch), updates the gauge, and pings the backfill worker. returns the updated state. 850 + fn trigger_backfill<'s>( 851 + ctx: &mut WorkerContext, 852 + did: &Did, 853 + repo_state: RepoState<'s>, 854 + ) -> Result<RepoState<'s>, IngestError> { 855 + let mut batch = ctx.state.db.inner.batch(); 856 + let repo_state = ops::update_repo_status( 857 + &mut batch, 858 + &ctx.state.db, 859 + did, 860 + repo_state, 861 + RepoStatus::Backfilling, 862 + )?; 863 + batch.commit().into_diagnostic()?; 864 + ctx.state 865 + .db 866 + .update_gauge_diff(&GaugeState::Synced, &GaugeState::Pending); 867 + ctx.state.notify_backfill(); 868 + Ok(repo_state) 869 + } 870 + 960 871 fn check_host_authority( 961 872 ctx: &mut WorkerContext, 962 873 did: &Did, 963 874 repo_state: &mut RepoState, 964 875 source_host: &str, 965 - ) -> Result<HostAuthorityOutcome, IngestError> { 966 - let pds_host = repo_state 967 - .pds 968 - .as_deref() 969 - .and_then(|pds| url::Url::parse(pds).ok()) 970 - .and_then(|u| u.host_str().map(str::to_owned)); 971 - 972 - if pds_host.as_deref() == Some(source_host) { 973 - return Ok(HostAuthorityOutcome::Authorized); 876 + ) -> Result<AuthorityOutcome, IngestError> { 877 + let outcome = 878 + super::check_host_authority(&ctx.state.resolver, did, repo_state, source_host)?; 879 + if !matches!(outcome, AuthorityOutcome::Authorized) { 880 + ctx.batch.insert( 881 + &ctx.state.db.repos, 882 + keys::repo_key(did), 883 + crate::db::ser_repo_state(repo_state)?, 884 + ); 974 885 } 975 - 976 - // unknown pds or host mismatch — resolve doc to verify or detect a migration 977 - Self::refresh_doc(ctx, repo_state, did)?; 978 - 979 - let updated_host = repo_state 980 - .pds 981 - .as_deref() 982 - .and_then(|pds| url::Url::parse(pds).ok()) 983 - .and_then(|u| u.host_str().map(str::to_owned)); 984 - 985 - if updated_host.as_deref() == Some(source_host) { 986 - Ok(HostAuthorityOutcome::Migration) 987 - } else { 988 - Ok(HostAuthorityOutcome::WrongHost) 989 - } 886 + Ok(outcome) 990 887 } 888 + } 991 889 992 - // refreshes the handle, pds url and signing key of a did 993 - fn refresh_doc( 994 - ctx: &mut WorkerContext, 995 - repo_state: &mut RepoState, 996 - did: &Did, 997 - ) -> Result<(), IngestError> { 998 - ctx.state.resolver.invalidate_sync(did); 999 - let doc = ctx.handle.block_on(ctx.state.resolver.resolve_doc(did))?; 1000 - repo_state.update_from_doc(doc); 1001 - repo_state.touch(); 1002 - ctx.batch.insert( 1003 - &ctx.state.db.repos, 890 + impl WorkerContext<'_> { 891 + fn refresh_doc(&mut self, repo_state: &mut RepoState, did: &Did) -> Result<(), IngestError> { 892 + super::refresh_doc(&self.state.resolver, did, repo_state)?; 893 + self.batch.insert( 894 + &self.state.db.repos, 1004 895 keys::repo_key(did), 1005 - crate::db::ser_repo_state(&repo_state)?, 896 + crate::db::ser_repo_state(repo_state)?, 1006 897 ); 1007 898 Ok(()) 1008 899 } 1009 900 1010 - fn fetch_key( 1011 - ctx: &WorkerContext, 901 + fn fetch_key(&self, did: &Did) -> Result<Option<PublicKey<'static>>> { 902 + super::fetch_key(&self.state.resolver, self.verify_signatures, did) 903 + } 904 + 905 + fn validate_commit<'s, 'c>( 906 + &mut self, 1012 907 did: &Did, 1013 - ) -> Result<Option<PublicKey<'static>>, IngestError> { 1014 - if ctx.verify_signatures { 1015 - let key = ctx 1016 - .handle 1017 - .block_on(ctx.state.resolver.resolve_signing_key(did))?; 1018 - Ok(Some(key)) 1019 - } else { 1020 - Ok(None) 908 + repo_state: &mut RepoState<'s>, 909 + commit: &'c Commit<'c>, 910 + ) -> Result<Option<ValidatedCommit<'c>>, IngestError> { 911 + let key = self.fetch_key(did)?; 912 + match self.vctx.validate_commit(commit, repo_state, key.as_ref()) { 913 + Ok(v) => return Ok(Some(v)), 914 + Err(CommitValidationError::StaleRev) => { 915 + debug!(did = %did, commit_rev = %commit.rev, "skipping replayed commit"); 916 + return Ok(None); 917 + } 918 + Err(CommitValidationError::SigFailure) => {} 919 + Err(e) => { 920 + warn!(did = %did, err = %e, "commit rejected"); 921 + return Ok(None); 922 + } 923 + } 924 + 925 + self.refresh_doc(repo_state, did)?; 926 + let key = self.fetch_key(did)?; 927 + match self.vctx.validate_commit(commit, repo_state, key.as_ref()) { 928 + Ok(v) => Ok(Some(v)), 929 + Err(e) => { 930 + warn!(did = %did, err = %e, "commit rejected after key refresh"); 931 + Ok(None) 932 + } 933 + } 934 + } 935 + 936 + fn validate_sync<'s>( 937 + &mut self, 938 + did: &Did, 939 + repo_state: &mut RepoState<'s>, 940 + sync: &Sync<'_>, 941 + ) -> Result<Option<ValidatedSync>, IngestError> { 942 + let key = self.fetch_key(did)?; 943 + match self.vctx.validate_sync(sync, key.as_ref()) { 944 + Ok(v) => return Ok(Some(v)), 945 + Err(SyncValidationError::SigFailure) => {} 946 + Err(e) => { 947 + warn!(did = %did, err = %e, "sync rejected"); 948 + return Ok(None); 949 + } 950 + } 951 + 952 + self.refresh_doc(repo_state, did)?; 953 + let key = self.fetch_key(did)?; 954 + match self.vctx.validate_sync(sync, key.as_ref()) { 955 + Ok(v) => Ok(Some(v)), 956 + Err(e) => { 957 + warn!(did = %did, err = %e, "sync rejected after key refresh"); 958 + Ok(None) 959 + } 1021 960 } 1022 961 } 1023 962 }
+8
src/lib.rs
··· 3 3 pub mod filter; 4 4 pub mod types; 5 5 6 + #[cfg(all(feature = "relay", feature = "events", not(debug_assertions)))] 7 + compile_error!("`relay` and `events` features are mutually exclusive"); 8 + 9 + #[cfg(all(feature = "relay", feature = "backlinks", not(debug_assertions)))] 10 + compile_error!("`relay` and `backlinks` features are mutually exclusive"); 11 + 6 12 pub(crate) mod api; 13 + #[cfg(feature = "events")] 7 14 pub(crate) mod backfill; 8 15 #[cfg(feature = "backlinks")] 9 16 pub(crate) mod backlinks; 10 17 pub(crate) mod crawler; 11 18 pub(crate) mod db; 12 19 pub(crate) mod ingest; 20 + #[cfg(feature = "events")] 13 21 pub(crate) mod ops; 14 22 pub(crate) mod resolver; 15 23 pub(crate) mod state;
+2 -4
src/state.rs
··· 16 16 pub db: Db, 17 17 pub resolver: Resolver, 18 18 pub(crate) filter: FilterHandle, 19 - /// per-relay firehose cursors. values use interior mutability so they can be 20 - /// updated through the lock-free `peek_with` reads in the ingest worker. 21 - pub relay_cursors: scc::HashIndex<Url, AtomicI64>, 19 + pub firehose_cursors: scc::HashIndex<Url, AtomicI64>, 22 20 pub backfill_notify: Notify, 23 21 pub crawler_enabled: watch::Sender<bool>, 24 22 pub firehose_enabled: watch::Sender<bool>, ··· 53 51 db, 54 52 resolver, 55 53 filter, 56 - relay_cursors, 54 + firehose_cursors: relay_cursors, 57 55 backfill_notify: Notify::new(), 58 56 crawler_enabled, 59 57 firehose_enabled,
+11
src/types.rs
··· 16 16 pub(crate) mod v2 { 17 17 use super::*; 18 18 19 + // todo: add desynchronized and throttled fields 19 20 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] 20 21 pub enum RepoStatus { 21 22 Backfilling, ··· 39 40 #[derive(Debug, Clone, Serialize, Deserialize)] 40 41 #[serde(bound(deserialize = "'i: 'de"))] 41 42 pub(crate) struct RepoState<'i> { 43 + // todo: add active field 42 44 pub status: RepoStatus, 43 45 pub root: Option<Commit>, 44 46 // todo: is this actually valid? the spec says this is informal and intermadiate ··· 242 244 pub account: Option<AccountEvt<'i>>, 243 245 } 244 246 247 + #[cfg(feature = "events")] 245 248 #[derive(Clone, Debug)] 246 249 pub(crate) enum BroadcastEvent { 247 250 #[allow(dead_code)] ··· 343 346 matches!(self, GaugeState::Resync(_)) 344 347 } 345 348 } 349 + 350 + #[cfg(feature = "relay")] 351 + #[derive(Clone)] 352 + pub(crate) enum RelayBroadcast { 353 + Persisted(#[allow(dead_code)] u64), 354 + #[allow(dead_code)] 355 + Ephemeral(bytes::Bytes), 356 + }