very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
60
fork

Configure Feed

Select the types of activity you want to include in your feed.

[ingest] sync validation pass

dawn 697ef37b 7fe0698e

+549 -141
+14 -1
src/config.rs
··· 100 100 "list_repos" | "list-repos" => Ok(Self::ListRepos), 101 101 "by_collection" | "by-collection" => Ok(Self::ByCollection), 102 102 _ => Err(miette::miette!( 103 - "invalid crawler mode: expected 'relay' or 'by_collection'" 103 + "invalid crawler mode: expected 'list_repos' or 'by_collection'" 104 104 )), 105 105 } 106 106 } ··· 261 261 /// number of resolved identities to keep in the in-memory LRU cache. 262 262 /// set via `HYDRANT_IDENTITY_CACHE_SIZE`. 263 263 pub identity_cache_size: u64, 264 + /// enable MST inversion validation on incoming commits (expensive). 265 + /// set via `HYDRANT_VERIFY_MST`. 266 + pub verify_mst: bool, 267 + /// clock drift window for future-rev rejection, in seconds. 268 + /// commits with a rev timestamp more than this many seconds in the future are rejected. 269 + /// set via `HYDRANT_REV_CLOCK_SKEW`. default: 300 (5 minutes). 270 + pub rev_clock_skew_secs: i64, 264 271 265 272 /// NSID patterns that trigger auto-discovery in filter mode (e.g. `app.bsky.feed.post`). 266 273 /// set via `HYDRANT_FILTER_SIGNALS` as a comma-separated list. ··· 346 353 }], 347 354 verify_signatures: SignatureVerification::Full, 348 355 identity_cache_size: 1_000_000, 356 + verify_mst: false, 357 + rev_clock_skew_secs: 300, 349 358 filter_signals: None, 350 359 filter_collections: None, 351 360 filter_excludes: None, ··· 438 447 439 448 let verify_signatures = cfg!("VERIFY_SIGNATURES", defaults.verify_signatures); 440 449 let identity_cache_size = cfg!("IDENTITY_CACHE_SIZE", defaults.identity_cache_size); 450 + let verify_mst: bool = cfg!("VERIFY_MST", defaults.verify_mst); 451 + let rev_clock_skew_secs: i64 = cfg!("REV_CLOCK_SKEW", defaults.rev_clock_skew_secs); 441 452 let enable_firehose = cfg!("ENABLE_FIREHOSE", defaults.enable_firehose); 442 453 let enable_crawler = std::env::var("HYDRANT_ENABLE_CRAWLER") 443 454 .ok() ··· 541 552 crawler_sources, 542 553 verify_signatures, 543 554 identity_cache_size, 555 + verify_mst, 556 + rev_clock_skew_secs, 544 557 filter_signals, 545 558 filter_collections, 546 559 filter_excludes,
+4
src/control/mod.rs
··· 518 518 matches!(config.verify_signatures, SignatureVerification::Full), 519 519 config.ephemeral, 520 520 config.firehose_workers, 521 + crate::ingest::validation::ValidationOptions { 522 + verify_mst: config.verify_mst, 523 + rev_clock_skew_secs: config.rev_clock_skew_secs, 524 + }, 521 525 ) 522 526 .run(handle) 523 527 }
+1
src/ingest/mod.rs
··· 2 2 3 3 pub mod firehose; 4 4 pub mod stream; 5 + pub mod validation; 5 6 pub mod worker; 6 7 7 8 use jacquard_common::types::did::Did;
+410
src/ingest/validation.rs
··· 1 + use jacquard_common::IntoStatic; 2 + use jacquard_common::types::cid::Cid; 3 + use jacquard_common::types::crypto::PublicKey; 4 + use jacquard_repo::MemoryBlockStore; 5 + use jacquard_repo::Mst; 6 + use jacquard_repo::car::reader::{ParsedCar, parse_car_bytes}; 7 + use jacquard_repo::commit::Commit as AtpCommit; 8 + use jacquard_repo::mst::VerifiedWriteOp; 9 + use miette::IntoDiagnostic; 10 + use smol_str::ToSmolStr; 11 + use std::sync::Arc; 12 + use thiserror::Error; 13 + 14 + use crate::ingest::stream::{Commit, RepoOpAction, Sync}; 15 + use crate::types::RepoState; 16 + 17 + /// describes which size limit was exceeded 18 + #[derive(Debug, Clone, Copy, PartialEq, Eq)] 19 + pub enum SizeLimitKind { 20 + /// msg.blocks.len() > 2 MiB 21 + BlocksField, 22 + /// msg.ops.len() > 200 23 + OpCount, 24 + /// individual record block > 1 MiB 25 + RecordSize, 26 + } 27 + 28 + impl std::fmt::Display for SizeLimitKind { 29 + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 30 + match self { 31 + Self::BlocksField => write!(f, "blocks field exceeds 2MiB"), 32 + Self::OpCount => write!(f, "op count exceeds 200"), 33 + Self::RecordSize => write!(f, "record block exceeds 1MiB"), 34 + } 35 + } 36 + } 37 + 38 + #[derive(Debug, Error)] 39 + pub enum CommitValidationError { 40 + /// rev is not greater than the last known rev 41 + #[error("stale rev")] 42 + StaleRev, 43 + /// rev timestamp exceeds the clock skew window 44 + #[error("future rev")] 45 + FutureRev, 46 + /// CAR could not be parsed, or a required block is missing 47 + #[error("malformed CAR: {0}")] 48 + MalformedCar(miette::Report), 49 + /// wire message fields differ from the signed commit object 50 + #[error("field mismatch in {field}")] 51 + FieldMismatch { field: &'static str }, 52 + /// signature verification failed. 53 + /// `refreshed: false` means the key has not been re-fetched yet; 54 + /// the caller should refresh and retry with the same commit. 55 + #[error("signature verification failed")] 56 + SigFailure { refreshed: bool }, 57 + /// a block, op count, or record exceeds the ATProto size limits 58 + #[error("size limit exceeded: {0}")] 59 + SizeLimitExceeded(SizeLimitKind), 60 + /// MST inversion check failed (only when verify_mst = true) 61 + #[error("MST inversion failed: {0}")] 62 + MstInvalid(miette::Report), 63 + /// commit arrived from a host that is not the authoritative PDS for this DID 64 + /// (enforced in phase 2 relay worker) 65 + #[allow(dead_code)] 66 + #[error("commit from wrong host")] 67 + WrongHost, 68 + } 69 + 70 + #[derive(Debug, Error)] 71 + pub enum SyncValidationError { 72 + /// blocks field exceeds 2MiB 73 + #[error("size limit exceeded")] 74 + SizeLimitExceeded, 75 + /// CAR could not be parsed 76 + #[error("malformed CAR: {0}")] 77 + MalformedCar(miette::Report), 78 + /// wire message fields differ from the signed commit object 79 + #[error("field mismatch in {field}")] 80 + FieldMismatch { field: &'static str }, 81 + /// signature verification failed 82 + #[error("signature verification failed")] 83 + SigFailure { refreshed: bool }, 84 + } 85 + 86 + /// indicates that the commit's chain pointers do not match the last known repo state. 87 + /// this is not a hard rejection so callers can decide whta they want to do 88 + pub struct ChainBreak { 89 + /// msg.since is present and does not match the last known rev 90 + pub since_mismatch: bool, 91 + /// msg.prev_data does not match the last known data CID 92 + pub prev_data_mismatch: bool, 93 + } 94 + 95 + /// a successfully validated `#commit` message, carrying pre-parsed data for apply_commit 96 + pub struct ValidatedCommit<'c> { 97 + pub commit: &'c Commit<'c>, 98 + /// result of parse_car_bytes, already done so apply_commit does not re-parse 99 + pub parsed_blocks: ParsedCar, 100 + /// deserialized and signature-verified commit object 101 + pub commit_obj: AtpCommit<'static>, 102 + /// Some if chain pointers are inconsistent with last known state 103 + pub chain_break: Option<ChainBreak>, 104 + } 105 + 106 + /// a successfully validated `#sync` message 107 + pub struct ValidatedSync { 108 + /// MST root CID from the commit object, used to detect noop syncs 109 + pub data_cid: Cid<'static>, 110 + /// rev string from the commit object, used to detect stale syncs 111 + pub rev: String, 112 + } 113 + 114 + pub struct ValidationOptions { 115 + /// clock drift window for future-rev rejection (seconds). default: 300 116 + pub rev_clock_skew_secs: i64, 117 + /// run MST inversion validation (expensive). default: false 118 + pub verify_mst: bool, 119 + } 120 + 121 + impl Default for ValidationOptions { 122 + fn default() -> Self { 123 + Self { 124 + rev_clock_skew_secs: 300, 125 + verify_mst: false, 126 + } 127 + } 128 + } 129 + 130 + /// validate an incoming `#commit` message. 131 + /// 132 + /// on success, returns a `ValidatedCommit` carrying pre-parsed data so that 133 + /// `apply_commit` does not need to repeat the work. 134 + /// 135 + /// chain-break (since/prevData mismatch) is NOT an error. callers check 136 + /// `validated.chain_break.is_some()` and decide how to respond. 137 + /// 138 + /// - `repo_state`: `None` for the first-ever commit for this DID. 139 + /// - `signing_key`: `None` when signature verification is disabled. 140 + pub fn validate_commit<'c>( 141 + msg: &'c Commit<'c>, 142 + repo_state: Option<&RepoState>, 143 + signing_key: Option<&PublicKey>, 144 + opts: &ValidationOptions, 145 + handle: &tokio::runtime::Handle, 146 + ) -> Result<ValidatedCommit<'c>, CommitValidationError> { 147 + const MAX_BLOCKS_BYTES: usize = 2_097_152; // 2 MiB 148 + const MAX_OPS: usize = 200; 149 + const MAX_RECORD_BYTES: usize = 1_048_576; // 1 MiB 150 + 151 + // 1. size limits 152 + if msg.blocks.len() > MAX_BLOCKS_BYTES { 153 + return Err(CommitValidationError::SizeLimitExceeded( 154 + SizeLimitKind::BlocksField, 155 + )); 156 + } 157 + if msg.ops.len() > MAX_OPS { 158 + return Err(CommitValidationError::SizeLimitExceeded( 159 + SizeLimitKind::OpCount, 160 + )); 161 + } 162 + 163 + // 2. stale rev, skip if msg.rev <= last known rev (lexicographic order) 164 + if let Some(state) = repo_state { 165 + if let Some(root) = &state.root { 166 + if msg.rev.as_str() <= root.rev.to_tid().as_str() { 167 + return Err(CommitValidationError::StaleRev); 168 + } 169 + } 170 + } 171 + 172 + // 3. future rev, reject if rev timestamp is more than clock_skew_secs ahead of now 173 + { 174 + let rev_us = msg.rev.timestamp() as i64; 175 + let now_us = chrono::Utc::now().timestamp_micros(); 176 + if rev_us > now_us + opts.rev_clock_skew_secs * 1_000_000 { 177 + return Err(CommitValidationError::FutureRev); 178 + } 179 + } 180 + 181 + // 4. CAR parse 182 + let parsed = handle 183 + .block_on(parse_car_bytes(msg.blocks.as_ref())) 184 + .map_err(|e| CommitValidationError::MalformedCar(miette::miette!("{e}")))?; 185 + 186 + let root_bytes = parsed.blocks.get(&parsed.root).ok_or_else(|| { 187 + CommitValidationError::MalformedCar(miette::miette!("root block missing from CAR")) 188 + })?; 189 + 190 + // 5. commit object deserialization 191 + let commit_obj = AtpCommit::from_cbor(root_bytes) 192 + .map_err(|e| CommitValidationError::MalformedCar(miette::miette!("{e}")))?; 193 + 194 + // 6. field consistency: wire message vs signed commit object 195 + if commit_obj.did.as_str() != msg.repo.as_str() { 196 + return Err(CommitValidationError::FieldMismatch { field: "repo" }); 197 + } 198 + if commit_obj.rev.as_str() != msg.rev.as_str() { 199 + return Err(CommitValidationError::FieldMismatch { field: "rev" }); 200 + } 201 + 202 + // 7. signature verification 203 + if let Some(key) = signing_key { 204 + commit_obj 205 + .verify(key) 206 + .map_err(|_| CommitValidationError::SigFailure { refreshed: false })?; 207 + } 208 + 209 + let commit_obj = commit_obj.into_static(); 210 + 211 + // 8. chain break checks 212 + let chain_break = chain_break_check(msg, repo_state); 213 + 214 + // 9–10. per-record size limits and basic CBOR validity 215 + for op in &msg.ops { 216 + let Some(cid_link) = &op.cid else { continue }; 217 + let cid = cid_link.to_ipld().map_err(|e| { 218 + CommitValidationError::MalformedCar(miette::miette!("invalid op CID: {e}")) 219 + })?; 220 + let Some(block) = parsed.blocks.get(&cid) else { 221 + return Err(CommitValidationError::MalformedCar(miette::miette!( 222 + "block for op CID {cid} missing from CAR" 223 + ))); 224 + }; 225 + 226 + if block.len() > MAX_RECORD_BYTES { 227 + return Err(CommitValidationError::SizeLimitExceeded( 228 + SizeLimitKind::RecordSize, 229 + )); 230 + } 231 + 232 + serde_ipld_dagcbor::from_slice::<jacquard_common::Data>(block).map_err(|e| { 233 + CommitValidationError::MalformedCar(miette::miette!("record is not valid CBOR: {e}")) 234 + })?; 235 + } 236 + 237 + // 11. MST inversion 238 + if opts.verify_mst { 239 + verify_mst(msg, &parsed, &commit_obj, handle).map_err(CommitValidationError::MstInvalid)?; 240 + } 241 + 242 + Ok(ValidatedCommit { 243 + commit: msg, 244 + parsed_blocks: parsed, 245 + commit_obj, 246 + chain_break, 247 + }) 248 + } 249 + 250 + /// validate an incoming `#sync` message. 251 + /// 252 + /// replaces `ops::verify_sync_event`, adding field consistency checks. 253 + pub fn validate_sync<'c>( 254 + msg: &'c Sync<'c>, 255 + signing_key: Option<&PublicKey>, 256 + handle: &tokio::runtime::Handle, 257 + ) -> Result<ValidatedSync, SyncValidationError> { 258 + const MAX_BLOCKS_BYTES: usize = 2_097_152; 259 + 260 + // 1. size limit 261 + if msg.blocks.len() > MAX_BLOCKS_BYTES { 262 + return Err(SyncValidationError::SizeLimitExceeded); 263 + } 264 + 265 + // 2. CAR parse 266 + let parsed = handle 267 + .block_on(parse_car_bytes(msg.blocks.as_ref())) 268 + .map_err(|e| SyncValidationError::MalformedCar(miette::miette!("{e}")))?; 269 + 270 + let root_bytes = parsed.blocks.get(&parsed.root).ok_or_else(|| { 271 + SyncValidationError::MalformedCar(miette::miette!("root block missing from CAR")) 272 + })?; 273 + 274 + // 3. commit object deserialization 275 + let commit_obj = AtpCommit::from_cbor(root_bytes) 276 + .map_err(|e| SyncValidationError::MalformedCar(miette::miette!("{e}")))?; 277 + 278 + // 4. field consistency 279 + if commit_obj.did.as_str() != msg.did.as_str() { 280 + return Err(SyncValidationError::FieldMismatch { field: "did" }); 281 + } 282 + if commit_obj.rev.as_str() != msg.rev.as_str() { 283 + return Err(SyncValidationError::FieldMismatch { field: "rev" }); 284 + } 285 + 286 + // 5. signature verification 287 + if let Some(key) = signing_key { 288 + commit_obj 289 + .verify(key) 290 + .map_err(|_| SyncValidationError::SigFailure { refreshed: false })?; 291 + } 292 + 293 + Ok(ValidatedSync { 294 + data_cid: Cid::ipld(commit_obj.data).into_static(), 295 + rev: commit_obj.rev.to_string(), 296 + }) 297 + } 298 + 299 + /// compare msg chain pointers against known repo state and return a `ChainBreak` if inconsistent. 300 + fn chain_break_check(msg: &Commit<'_>, repo_state: Option<&RepoState>) -> Option<ChainBreak> { 301 + let state = repo_state?; 302 + let root = state.root.as_ref()?; 303 + 304 + // since should equal the rev of the previous commit; only flag when since is present and wrong 305 + let since_mismatch = msg 306 + .since 307 + .as_ref() 308 + .map(|since| since.as_str() != root.rev.to_tid().as_str()) 309 + .unwrap_or(false); 310 + 311 + // prev_data must equal the last known data CID when both are present 312 + let prev_data_mismatch = match &msg.prev_data { 313 + Some(prev_link) => match prev_link.to_ipld() { 314 + Ok(cid) => cid != root.data, 315 + Err(_) => true, // unparseable CID is a mismatch 316 + }, 317 + None => true, // no prev_data but we have a previous state is a chain break 318 + }; 319 + 320 + if since_mismatch || prev_data_mismatch { 321 + Some(ChainBreak { 322 + since_mismatch, 323 + prev_data_mismatch, 324 + }) 325 + } else { 326 + None 327 + } 328 + } 329 + 330 + /// apply the inverse of each op (in reverse order) to the new MST and verify the resulting root 331 + /// equals `msg.prev_data`. called only when `opts.verify_mst` is true. 332 + fn verify_mst( 333 + msg: &Commit<'_>, 334 + parsed: &ParsedCar, 335 + commit_obj: &AtpCommit<'_>, 336 + handle: &tokio::runtime::Handle, 337 + ) -> miette::Result<()> { 338 + let store = Arc::new(MemoryBlockStore::new_from_blocks(parsed.blocks.clone())); 339 + let mut mst: Mst<MemoryBlockStore> = Mst::load(store, commit_obj.data, None); 340 + 341 + handle.block_on(async { 342 + for op in msg.ops.iter().rev() { 343 + let inv = match &op.action { 344 + RepoOpAction::Create => { 345 + let cid_link = op 346 + .cid 347 + .as_ref() 348 + .ok_or_else(|| miette::miette!("create op missing CID"))?; 349 + let cid = cid_link.to_ipld().into_diagnostic()?; 350 + VerifiedWriteOp::Create { 351 + key: op.path.to_smolstr(), 352 + cid, 353 + } 354 + } 355 + RepoOpAction::Update => { 356 + let cid_link = op 357 + .cid 358 + .as_ref() 359 + .ok_or_else(|| miette::miette!("update op missing CID"))?; 360 + let Some(prev_link) = op.prev.as_ref() else { 361 + // prev is optional in inductive firehose (v3); skip if absent 362 + continue; 363 + }; 364 + let cid = cid_link.to_ipld().into_diagnostic()?; 365 + let prev = prev_link.to_ipld().into_diagnostic()?; 366 + VerifiedWriteOp::Update { 367 + key: op.path.to_smolstr(), 368 + cid, 369 + prev, 370 + } 371 + } 372 + RepoOpAction::Delete => { 373 + let Some(prev_link) = op.prev.as_ref() else { 374 + // prev is optional in inductive firehose (v3); skip if absent 375 + continue; 376 + }; 377 + let prev = prev_link.to_ipld().into_diagnostic()?; 378 + VerifiedWriteOp::Delete { 379 + key: op.path.to_smolstr(), 380 + prev, 381 + } 382 + } 383 + RepoOpAction::Other(action) => { 384 + return Err(miette::miette!("unknown op action: {action}")); 385 + } 386 + }; 387 + 388 + let ok = mst.invert_op(inv).await.into_diagnostic()?; 389 + if !ok { 390 + return Err(miette::miette!( 391 + "MST inversion inconsistent with tree state for op on {}", 392 + op.path 393 + )); 394 + } 395 + } 396 + 397 + // verify the resulting root CID equals prev_data (skip for genesis commits) 398 + if let Some(prev_link) = &msg.prev_data { 399 + let expected = prev_link.to_ipld().into_diagnostic()?; 400 + let root_cid = mst.get_pointer().await.into_diagnostic()?; 401 + if root_cid != expected { 402 + return Err(miette::miette!( 403 + "MST inversion root mismatch: expected {expected}, got {root_cid}" 404 + )); 405 + } 406 + } 407 + 408 + Ok(()) 409 + }) 410 + }
+106 -57
src/ingest/worker.rs
··· 1 1 use crate::db::{self, keys}; 2 2 use crate::filter::FilterMode; 3 3 use crate::ingest::stream::{Account, Commit, Identity, SubscribeReposMessage, Sync}; 4 + use crate::ingest::validation::{ 5 + CommitValidationError, SyncValidationError, ValidationOptions, validate_commit, validate_sync, 6 + }; 4 7 use crate::ingest::{BufferRx, IngestMessage}; 5 8 use crate::ops; 6 9 use crate::resolver::{NoSigningKeyError, ResolverError}; ··· 14 17 use jacquard_common::types::crypto::PublicKey; 15 18 use jacquard_common::types::did::Did; 16 19 use jacquard_repo::error::CommitError; 17 - use miette::{Context, Diagnostic, IntoDiagnostic, Result}; 20 + use miette::{Diagnostic, IntoDiagnostic, Result}; 18 21 use rand::Rng; 19 22 use smol_str::ToSmolStr; 20 23 use std::collections::hash_map::DefaultHasher; ··· 78 81 verify_signatures: bool, 79 82 ephemeral: bool, 80 83 num_shards: usize, 84 + validation_opts: Arc<ValidationOptions>, 81 85 } 82 86 83 87 struct WorkerContext<'a> { ··· 89 93 records_delta: &'a mut i64, 90 94 broadcast_events: &'a mut Vec<BroadcastEvent>, 91 95 handle: &'a tokio::runtime::Handle, 96 + validation_opts: &'a ValidationOptions, 92 97 } 93 98 94 99 impl FirehoseWorker { ··· 98 103 verify_signatures: bool, 99 104 ephemeral: bool, 100 105 num_shards: usize, 106 + validation_opts: ValidationOptions, 101 107 ) -> Self { 102 108 Self { 103 109 state, ··· 105 111 verify_signatures, 106 112 ephemeral, 107 113 num_shards, 114 + validation_opts: Arc::new(validation_opts), 108 115 } 109 116 } 110 117 ··· 124 131 let verify = self.verify_signatures; 125 132 let ephemeral = self.ephemeral; 126 133 let handle = handle.clone(); 134 + let validation_opts = self.validation_opts.clone(); 127 135 128 136 std::thread::Builder::new() 129 137 .name(format!("ingest-shard-{i}")) 130 138 .spawn(move || { 131 - Self::shard(i, rx, state, verify, ephemeral, handle); 139 + Self::shard(i, rx, state, verify, ephemeral, handle, validation_opts); 132 140 }) 133 141 .into_diagnostic()?; 134 142 } ··· 175 183 verify_signatures: bool, 176 184 ephemeral: bool, 177 185 handle: tokio::runtime::Handle, 186 + validation_opts: Arc<ValidationOptions>, 178 187 ) { 179 188 let _guard = handle.enter(); 180 189 debug!(shard = id, "shard started"); ··· 197 206 handle: &handle, 198 207 verify_signatures, 199 208 ephemeral, 209 + validation_opts: &validation_opts, 200 210 }; 201 211 202 212 match msg { ··· 434 444 ) -> Result<RepoProcessResult<'s, 'c>, IngestError> { 435 445 repo_state.advance_message_time(commit.time.0.timestamp_millis()); 436 446 437 - // skip replayed events (already seen revision) 438 - if matches!(repo_state.root, Some(ref root) if commit.rev.as_str() <= root.rev.to_tid().as_str()) 439 - { 440 - debug!( 441 - did = %did, 442 - commit_rev = %commit.rev, 443 - state_rev = %repo_state.root.as_ref().map(|c| c.rev.to_tid()).expect("we checked in if"), 444 - "skipping replayed event" 445 - ); 446 - return Ok(RepoProcessResult::Ok(repo_state)); 447 - } 447 + // TODO phase 2: host authority check (source_host not available in indexer mode) 448 448 449 - if let (Some(repo_commit), Some(prev_commit)) = (&repo_state.root, &commit.prev_data) 450 - && repo_commit.data 451 - != prev_commit 452 - .0 453 - .to_ipld() 454 - .into_diagnostic() 455 - .wrap_err("invalid cid from relay")? 456 - { 449 + // validate the commit: stale rev, size limits, future rev, CAR parse, field 450 + // consistency, signature, and chain-break detection 451 + let signing_key = Self::fetch_key(ctx, did)?; 452 + let validated = match validate_commit( 453 + commit, 454 + Some(&repo_state), 455 + signing_key.as_ref(), 456 + ctx.validation_opts, 457 + ctx.handle, 458 + ) { 459 + Ok(v) => v, 460 + Err(CommitValidationError::StaleRev) => { 461 + debug!( 462 + did = %did, 463 + commit_rev = %commit.rev, 464 + "skipping replayed commit" 465 + ); 466 + return Ok(RepoProcessResult::Ok(repo_state)); 467 + } 468 + Err(CommitValidationError::SigFailure { .. }) => { 469 + // refresh key and retry once 470 + Self::refresh_doc(ctx, &mut repo_state, did)?; 471 + let refreshed_key = Self::fetch_key(ctx, did)?; 472 + match validate_commit( 473 + commit, 474 + Some(&repo_state), 475 + refreshed_key.as_ref(), 476 + ctx.validation_opts, 477 + ctx.handle, 478 + ) { 479 + Ok(v) => v, 480 + Err(e) => { 481 + warn!(did = %did, err = %e, "commit rejected after key refresh"); 482 + return Ok(RepoProcessResult::Ok(repo_state)); 483 + } 484 + } 485 + } 486 + Err(e) => { 487 + warn!(did = %did, err = %e, "commit rejected"); 488 + return Ok(RepoProcessResult::Ok(repo_state)); 489 + } 490 + }; 491 + 492 + // chain break: prev_data or since mismatch against last known state → backfill 493 + if let Some(cb) = &validated.chain_break { 457 494 warn!( 458 495 did = %did, 459 - repo = %repo_commit.data, 460 - prev_commit = %prev_commit.0, 461 - "gap detected, triggering backfill" 496 + since_mismatch = cb.since_mismatch, 497 + prev_data_mismatch = cb.prev_data_mismatch, 498 + "chain break detected, triggering backfill" 462 499 ); 463 - 464 500 let mut batch = ctx.state.db.inner.batch(); 465 501 let _repo_state = ops::update_repo_status( 466 502 &mut batch, ··· 477 513 return Ok(RepoProcessResult::NeedsBackfill(Some(commit))); 478 514 } 479 515 480 - let signing_key = Self::fetch_key(ctx, did)?; 481 516 let res = ops::apply_commit( 482 517 &mut ctx.batch, 483 518 &ctx.state.db, 484 519 repo_state, 485 - commit, 486 - signing_key.as_ref(), 520 + validated, 487 521 &ctx.state.filter.load(), 488 522 ctx.ephemeral, 489 523 )?; ··· 505 539 ) -> Result<RepoProcessResult<'s, 'c>, IngestError> { 506 540 repo_state.advance_message_time(sync.time.0.timestamp_millis()); 507 541 508 - Self::refresh_doc(ctx, &mut repo_state, did)?; 542 + // TODO phase 2: host authority check 509 543 510 - match ops::verify_sync_event(sync.blocks.as_ref(), Self::fetch_key(ctx, did)?.as_ref()) { 511 - Ok((root, rev)) => { 512 - if let Some(current_commit) = &repo_state.root { 513 - if current_commit.data == root.to_ipld().expect("valid cid") { 514 - debug!(did = %did, "skipping noop sync"); 515 - return Ok(RepoProcessResult::Ok(repo_state)); 516 - } 517 - 518 - if rev.as_str() <= current_commit.rev.to_tid().as_str() { 519 - debug!(did = %did, "skipping replayed sync"); 544 + // validate: size limit, CAR parse, field consistency, signature 545 + let signing_key = Self::fetch_key(ctx, did)?; 546 + let validated = match validate_sync(sync, signing_key.as_ref(), ctx.handle) { 547 + Ok(v) => v, 548 + Err(SyncValidationError::SigFailure { .. }) => { 549 + // refresh key and retry once (same pattern as handle_commit) 550 + Self::refresh_doc(ctx, &mut repo_state, did)?; 551 + let refreshed_key = Self::fetch_key(ctx, did)?; 552 + match validate_sync(sync, refreshed_key.as_ref(), ctx.handle) { 553 + Ok(v) => v, 554 + Err(e) => { 555 + warn!(did = %did, err = %e, "sync rejected after key refresh"); 520 556 return Ok(RepoProcessResult::Ok(repo_state)); 521 557 } 522 558 } 559 + } 560 + Err(e) => { 561 + warn!(did = %did, err = %e, "sync rejected"); 562 + return Ok(RepoProcessResult::Ok(repo_state)); 563 + } 564 + }; 523 565 524 - warn!(did = %did, "sync event, triggering backfill"); 525 - let mut batch = ctx.state.db.inner.batch(); 526 - repo_state = ops::update_repo_status( 527 - &mut batch, 528 - &ctx.state.db, 529 - did, 530 - repo_state, 531 - RepoStatus::Backfilling, 532 - )?; 533 - batch.commit().into_diagnostic()?; 534 - ctx.state 535 - .db 536 - .update_gauge_diff(&GaugeState::Synced, &GaugeState::Pending); 537 - ctx.state.notify_backfill(); 538 - Ok(RepoProcessResult::Ok(repo_state)) 566 + // skip noop syncs (data CID unchanged) 567 + if let Some(current_commit) = &repo_state.root { 568 + if current_commit.data == validated.data_cid.to_ipld().expect("valid cid") { 569 + debug!(did = %did, "skipping noop sync"); 570 + return Ok(RepoProcessResult::Ok(repo_state)); 539 571 } 540 - Err(e) => { 541 - error!(did = %did, err = %e, "failed to process sync event"); 542 - Ok(RepoProcessResult::Ok(repo_state)) 572 + 573 + if validated.rev.as_str() <= current_commit.rev.to_tid().as_str() { 574 + debug!(did = %did, "skipping replayed sync"); 575 + return Ok(RepoProcessResult::Ok(repo_state)); 543 576 } 544 577 } 578 + 579 + warn!(did = %did, "sync event, triggering backfill"); 580 + let mut batch = ctx.state.db.inner.batch(); 581 + repo_state = ops::update_repo_status( 582 + &mut batch, 583 + &ctx.state.db, 584 + did, 585 + repo_state, 586 + RepoStatus::Backfilling, 587 + )?; 588 + batch.commit().into_diagnostic()?; 589 + ctx.state 590 + .db 591 + .update_gauge_diff(&GaugeState::Synced, &GaugeState::Pending); 592 + ctx.state.notify_backfill(); 593 + Ok(RepoProcessResult::Ok(repo_state)) 545 594 } 546 595 547 596 fn handle_identity<'s>(
+7 -60
src/ops.rs
··· 4 4 use jacquard_common::CowStr; 5 5 #[cfg(feature = "backlinks")] 6 6 use jacquard_common::Data; 7 - use jacquard_common::IntoStatic; 8 - use jacquard_common::types::cid::Cid; 9 - use jacquard_common::types::crypto::PublicKey; 10 7 use jacquard_common::types::did::Did; 11 - use jacquard_repo::car::reader::parse_car_bytes; 12 8 use miette::{Context, IntoDiagnostic, Result}; 13 9 use rand::{Rng, rng}; 14 10 use std::collections::HashMap; 15 11 use std::sync::atomic::Ordering; 16 - use std::time::Instant; 17 - use tracing::{debug, trace}; 12 + use tracing::debug; 18 13 19 14 use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid}; 20 15 use crate::db::{self, Db, keys, ser_repo_state}; 21 16 use crate::filter::FilterConfig; 22 17 use crate::ingest::stream::Commit; 18 + use crate::ingest::validation::ValidatedCommit; 23 19 use crate::types::StoredData; 24 20 use crate::types::{ 25 21 AccountEvt, BroadcastEvent, IdentityEvt, MarshallableEvt, RepoState, RepoStatus, ResyncState, ··· 197 193 Ok(repo_state) 198 194 } 199 195 200 - pub fn verify_sync_event(blocks: &[u8], key: Option<&PublicKey>) -> Result<(Cid<'static>, String)> { 201 - let parsed = tokio::task::block_in_place(|| { 202 - tokio::runtime::Handle::current() 203 - .block_on(parse_car_bytes(blocks)) 204 - .into_diagnostic() 205 - })?; 206 - 207 - let root_bytes = parsed 208 - .blocks 209 - .get(&parsed.root) 210 - .ok_or_else(|| miette::miette!("root block missing from CAR"))?; 211 - 212 - let repo_commit = jacquard_repo::commit::Commit::from_cbor(root_bytes).into_diagnostic()?; 213 - 214 - if let Some(key) = key { 215 - repo_commit 216 - .verify(key) 217 - .map_err(|e| miette::miette!("signature verification failed: {e}"))?; 218 - } 219 - 220 - Ok(( 221 - Cid::ipld(repo_commit.data).into_static(), 222 - repo_commit.rev.to_string(), 223 - )) 224 - } 225 - 226 196 pub struct ApplyCommitResults<'s> { 227 197 pub repo_state: RepoState<'s>, 228 198 pub records_delta: i64, 229 199 pub blocks_count: i64, 230 200 } 231 201 232 - pub fn apply_commit<'commit, 's>( 202 + pub fn apply_commit<'s>( 233 203 batch: &mut OwnedWriteBatch, 234 204 db: &Db, 235 205 mut repo_state: RepoState<'s>, 236 - commit: &'commit Commit<'commit>, 237 - signing_key: Option<&PublicKey>, 206 + validated: ValidatedCommit<'_>, 238 207 filter: &FilterConfig, 239 208 ephemeral: bool, 240 209 ) -> Result<ApplyCommitResults<'s>> { 210 + let commit = validated.commit; 211 + let parsed = validated.parsed_blocks; 241 212 let did = &commit.repo; 242 213 debug!(did = %did, commit = %commit.commit, "applying commit"); 243 214 244 - // 1. parse CAR blocks and store them in CAS 245 - let start = Instant::now(); 246 - let parsed = tokio::task::block_in_place(|| { 247 - tokio::runtime::Handle::current() 248 - .block_on(parse_car_bytes(commit.blocks.as_ref())) 249 - .into_diagnostic() 250 - })?; 251 - 252 - trace!(did = %did, elapsed = ?start.elapsed(), "parsed car"); 253 - 254 - let root_bytes = parsed 255 - .blocks 256 - .get(&parsed.root) 257 - .ok_or_else(|| miette::miette!("root block missing from CAR"))?; 258 - 259 - let root_commit = jacquard_repo::commit::Commit::from_cbor(root_bytes).into_diagnostic()?; 260 - 261 - if let Some(key) = signing_key { 262 - root_commit 263 - .verify(key) 264 - .map_err(|e| miette::miette!("signature verification failed for {did}: {e}"))?; 265 - trace!(did = %did, "signature verified"); 266 - } 267 - 268 - repo_state.root = Some(root_commit.into()); 215 + repo_state.root = Some(validated.commit_obj.into()); 269 216 repo_state.touch(); 270 217 271 218 batch.insert(&db.repos, keys::repo_key(did), ser_repo_state(&repo_state)?);
+5 -5
tests/api.nu
··· 12 12 } 13 13 print " ok: starts empty" 14 14 15 - # add a relay source 16 - print " POST /crawler/sources (relay)..." 15 + # add a list_repos source 16 + print " POST /crawler/sources (list_repos)..." 17 17 http post -f -e -t application/json $"($url)/crawler/sources" { 18 18 url: "https://bsky.network", 19 - mode: "relay" 19 + mode: "list_repos" 20 20 } | assert-status 201 "POST /crawler/sources" $pid 21 21 print " ok: 201 Created" 22 22 ··· 27 27 fail $"expected 1 source, got ($sources | length)" $pid 28 28 } 29 29 let s = ($sources | first) 30 - if $s.mode != "relay" { 31 - fail $"expected mode=relay, got ($s.mode)" $pid 30 + if $s.mode != "list_repos" { 31 + fail $"expected mode=list_repos, got ($s.mode)" $pid 32 32 } 33 33 if not $s.persisted { 34 34 fail "expected persisted=true for dynamically added source" $pid
+2 -18
tests/repos_api.nu
··· 12 12 } 13 13 print $" count: ($items | length)" 14 14 15 - # 2. test partition=all 16 - print " testing partition=all..." 17 - let all_items = (http get $"($url)/repos?partition=all" | from json -o) 15 + # 2. test 16 + let all_items = (http get $"($url)/repos" | from json -o) 18 17 print $" count: ($all_items | length)" 19 18 20 19 # 3. test cursor (if we have enough items) ··· 31 30 } 32 31 } 33 32 34 - # 4. test partition=pending 35 - print " testing partition=pending..." 36 - let pending_items = (http get $"($url)/repos?partition=pending" | from json -o) 37 - print $" pending count: ($pending_items | length)" 38 - 39 - # 5. test partition=resync 40 - print " testing partition=resync..." 41 - let resync_items = (http get $"($url)/repos?partition=resync" | from json -o) 42 - print $" resync count: ($resync_items | length)" 43 - 44 33 print "all /repos pagination and filtering tests passed!" 45 34 } 46 35 ··· 61 50 print " testing GET /repos with invalid cursor..." 62 51 http get -f -e $"($url)/repos?cursor=invalid" 63 52 | assert-status 400 "GET /repos invalid cursor" 64 - 65 - # invalid partition in GET 66 - print " testing GET /repos with invalid partition..." 67 - http get -f -e $"($url)/repos?partition=invalid" 68 - | assert-status 400 "GET /repos invalid partition" 69 53 70 54 print "all /repos error handling tests passed!" 71 55 }