very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
59
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 426 lines 15 kB view raw
1use jacquard_common::IntoStatic; 2use jacquard_common::types::crypto::PublicKey; 3use jacquard_repo::MemoryBlockStore; 4use jacquard_repo::Mst; 5use jacquard_repo::car::reader::{ParsedCar, parse_car_bytes}; 6use jacquard_repo::commit::Commit as AtpCommit; 7use jacquard_repo::mst::VerifiedWriteOp; 8use miette::IntoDiagnostic; 9use smol_str::ToSmolStr; 10use std::sync::Arc; 11use thiserror::Error; 12 13use crate::ingest::stream::{Commit, RepoOpAction, Sync}; 14use crate::types::RepoState; 15 16/// describes which size limit was exceeded 17#[derive(Debug, Clone, Copy, PartialEq, Eq)] 18pub enum SizeLimitKind { 19 /// msg.blocks.len() > 2 MiB 20 BlocksField, 21 /// msg.ops.len() > 200 22 OpCount, 23 /// individual record block > 1 MiB 24 RecordSize, 25} 26 27impl std::fmt::Display for SizeLimitKind { 28 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 29 match self { 30 Self::BlocksField => write!(f, "blocks field exceeds 2MiB"), 31 Self::OpCount => write!(f, "op count exceeds 200"), 32 Self::RecordSize => write!(f, "record block exceeds 1MiB"), 33 } 34 } 35} 36 37#[derive(Debug, Error)] 38pub enum CommitValidationError { 39 /// rev is not greater than the last known rev 40 #[error("stale rev")] 41 StaleRev, 42 /// rev timestamp exceeds the clock skew window 43 #[error("future rev")] 44 FutureRev, 45 /// CAR could not be parsed, or a required block is missing 46 #[error("malformed CAR: {0}")] 47 MalformedCar(miette::Report), 48 /// wire message fields differ from the signed commit object 49 #[error("field mismatch in {field}")] 50 FieldMismatch { field: &'static str }, 51 /// signature verification failed. 52 #[error("signature verification failed")] 53 SigFailure, 54 /// a block, op count, or record exceeds the ATProto size limits 55 #[error("size limit exceeded: {0}")] 56 SizeLimitExceeded(SizeLimitKind), 57 /// MST inversion check failed (only when verify_mst = true) 58 #[error("MST inversion failed: {0}")] 59 MstInvalid(miette::Report), 60 /// commit arrived from a host that is not the authoritative PDS for this DID 61 /// (enforced in phase 2 relay worker) 62 #[allow(dead_code)] 63 #[error("commit from wrong host")] 64 WrongHost, 65} 66 67#[derive(Debug, Error)] 68pub enum SyncValidationError { 69 /// blocks field exceeds 2MiB 70 #[error("size limit exceeded")] 71 SizeLimitExceeded, 72 /// CAR could not be parsed 73 #[error("malformed CAR: {0}")] 74 MalformedCar(miette::Report), 75 /// wire message fields differ from the signed commit object 76 #[error("field mismatch in {field}")] 77 FieldMismatch { field: &'static str }, 78 /// signature verification failed 79 #[error("signature verification failed")] 80 SigFailure, 81} 82 83/// indicates that the commit's chain pointers do not match the last known repo state. 84/// this is not a hard rejection so callers can decide whta they want to do 85#[derive(Default, Debug)] 86pub struct ChainBreak { 87 /// msg.since is present and does not match the last known rev 88 pub since_mismatch: bool, 89 /// msg.prev_data does not match the last known data CID 90 pub prev_data_mismatch: bool, 91} 92 93impl ChainBreak { 94 pub fn is_broken(&self) -> bool { 95 self.since_mismatch || self.prev_data_mismatch 96 } 97} 98 99/// a successfully validated `#commit` message, carrying pre-parsed data for apply_commit 100pub struct ValidatedCommit<'c> { 101 pub commit: &'c Commit<'c>, 102 /// result of parse_car_bytes, already done so apply_commit does not re-parse 103 pub parsed_blocks: ParsedCar, 104 pub commit_obj: AtpCommit<'static>, 105 pub chain_break: ChainBreak, 106} 107 108/// a successfully validated `#sync` message 109pub struct ValidatedSync { 110 pub commit_obj: AtpCommit<'static>, 111} 112 113pub struct ValidationOptions { 114 /// clock drift window for future-rev rejection (seconds). default: 300 115 pub rev_clock_skew_secs: i64, 116 /// run MST inversion validation (expensive). default: false 117 pub verify_mst: bool, 118} 119 120impl Default for ValidationOptions { 121 fn default() -> Self { 122 Self { 123 rev_clock_skew_secs: 300, 124 verify_mst: false, 125 } 126 } 127} 128 129/// all methods panic if called outside a tokio runtime context. 130pub struct ValidationContext<'a> { 131 pub opts: &'a ValidationOptions, 132} 133 134impl ValidationContext<'_> { 135 pub fn validate_commit<'c>( 136 &self, 137 msg: &'c Commit<'c>, 138 repo_state: &RepoState, 139 signing_key: Option<&PublicKey>, 140 ) -> Result<ValidatedCommit<'c>, CommitValidationError> { 141 validate_commit(msg, repo_state, signing_key, self.opts) 142 } 143 144 pub fn validate_sync( 145 &self, 146 msg: &Sync<'_>, 147 signing_key: Option<&PublicKey>, 148 ) -> Result<ValidatedSync, SyncValidationError> { 149 validate_sync(msg, signing_key) 150 } 151} 152 153/// validate an incoming `#commit` message. 154/// 155/// on success, returns a `ValidatedCommit` carrying pre-parsed data so that 156/// `apply_commit` does not need to repeat the work. 157/// 158/// chain-break (since/prevData mismatch) is NOT an error. callers check 159/// `validated.chain_break.is_some()` and decide how to respond. 160/// 161/// - `signing_key`: `None` when signature verification is disabled. 162/// 163/// panics if called outside a tokio runtime context. 164pub fn validate_commit<'c>( 165 msg: &'c Commit<'c>, 166 repo_state: &RepoState, 167 signing_key: Option<&PublicKey>, 168 opts: &ValidationOptions, 169) -> Result<ValidatedCommit<'c>, CommitValidationError> { 170 let handle = tokio::runtime::Handle::current(); 171 const MAX_BLOCKS_BYTES: usize = 2_097_152; // 2 MiB 172 const MAX_OPS: usize = 200; 173 const MAX_RECORD_BYTES: usize = 1_048_576; // 1 MiB 174 175 // 1. size limits 176 if msg.blocks.len() > MAX_BLOCKS_BYTES { 177 return Err(CommitValidationError::SizeLimitExceeded( 178 SizeLimitKind::BlocksField, 179 )); 180 } 181 if msg.ops.len() > MAX_OPS { 182 return Err(CommitValidationError::SizeLimitExceeded( 183 SizeLimitKind::OpCount, 184 )); 185 } 186 187 // 2. stale rev, skip if msg.rev <= last known rev (lexicographic order) 188 if let Some(root) = &repo_state.root { 189 if msg.rev.as_str() <= root.rev.to_tid().as_str() { 190 return Err(CommitValidationError::StaleRev); 191 } 192 } 193 194 // 3. future rev, reject if rev timestamp is more than clock_skew_secs ahead of now 195 { 196 let rev_us = msg.rev.timestamp() as i64; 197 let now_us = chrono::Utc::now().timestamp_micros(); 198 if rev_us > now_us + opts.rev_clock_skew_secs * 1_000_000 { 199 return Err(CommitValidationError::FutureRev); 200 } 201 } 202 203 // 4. CAR parse 204 let parsed = handle 205 .block_on(parse_car_bytes(msg.blocks.as_ref())) 206 .map_err(|e| CommitValidationError::MalformedCar(miette::miette!("{e}")))?; 207 208 let root_bytes = parsed.blocks.get(&parsed.root).ok_or_else(|| { 209 CommitValidationError::MalformedCar(miette::miette!("root block missing from CAR")) 210 })?; 211 212 // 5. commit object deserialization 213 let commit_obj = AtpCommit::from_cbor(root_bytes) 214 .map_err(|e| CommitValidationError::MalformedCar(miette::miette!("{e}")))?; 215 216 // 6. field consistency: wire message vs signed commit object 217 if commit_obj.did.as_str() != msg.repo.as_str() { 218 return Err(CommitValidationError::FieldMismatch { field: "repo" }); 219 } 220 if commit_obj.rev.as_str() != msg.rev.as_str() { 221 return Err(CommitValidationError::FieldMismatch { field: "rev" }); 222 } 223 224 // 7. signature verification 225 if let Some(key) = signing_key { 226 commit_obj 227 .verify(key) 228 .map_err(|_| CommitValidationError::SigFailure)?; 229 } 230 231 let commit_obj = commit_obj.into_static(); 232 233 // 8. chain break checks 234 let chain_break = repo_state 235 .root 236 .as_ref() 237 .map(|r| breaks_chain(msg, r)) 238 .unwrap_or_default(); 239 240 // 9–10. per-record size limits and basic CBOR validity 241 for op in &msg.ops { 242 let Some(cid_link) = &op.cid else { continue }; 243 let cid = cid_link.to_ipld().map_err(|e| { 244 CommitValidationError::MalformedCar(miette::miette!("invalid op CID: {e}")) 245 })?; 246 let Some(block) = parsed.blocks.get(&cid) else { 247 return Err(CommitValidationError::MalformedCar(miette::miette!( 248 "block for op CID {cid} missing from CAR" 249 ))); 250 }; 251 252 if block.len() > MAX_RECORD_BYTES { 253 return Err(CommitValidationError::SizeLimitExceeded( 254 SizeLimitKind::RecordSize, 255 )); 256 } 257 258 serde_ipld_dagcbor::from_slice::<jacquard_common::Data>(block).map_err(|e| { 259 CommitValidationError::MalformedCar(miette::miette!("record is not valid CBOR: {e}")) 260 })?; 261 } 262 263 // 11. MST inversion 264 if opts.verify_mst { 265 verify_mst(msg, &parsed, &commit_obj, &handle) 266 .map_err(CommitValidationError::MstInvalid)?; 267 } 268 269 Ok(ValidatedCommit { 270 commit: msg, 271 parsed_blocks: parsed, 272 commit_obj, 273 chain_break, 274 }) 275} 276 277/// panics if called outside a tokio runtime context. 278pub fn validate_sync<'c>( 279 msg: &'c Sync<'c>, 280 signing_key: Option<&PublicKey>, 281) -> Result<ValidatedSync, SyncValidationError> { 282 let handle = tokio::runtime::Handle::current(); 283 const MAX_BLOCKS_BYTES: usize = 2_097_152; 284 285 // 1. size limit 286 if msg.blocks.len() > MAX_BLOCKS_BYTES { 287 return Err(SyncValidationError::SizeLimitExceeded); 288 } 289 290 // 2. CAR parse 291 let parsed = handle 292 .block_on(parse_car_bytes(msg.blocks.as_ref())) 293 .map_err(|e| SyncValidationError::MalformedCar(miette::miette!("{e}")))?; 294 295 let root_bytes = parsed.blocks.get(&parsed.root).ok_or_else(|| { 296 SyncValidationError::MalformedCar(miette::miette!("root block missing from CAR")) 297 })?; 298 299 // 3. commit object deserialization 300 let commit_obj = AtpCommit::from_cbor(root_bytes) 301 .map_err(|e| SyncValidationError::MalformedCar(miette::miette!("{e}")))?; 302 303 // 4. field consistency 304 if commit_obj.did.as_str() != msg.did.as_str() { 305 return Err(SyncValidationError::FieldMismatch { field: "did" }); 306 } 307 if commit_obj.rev.as_str() != msg.rev.as_str() { 308 return Err(SyncValidationError::FieldMismatch { field: "rev" }); 309 } 310 311 // 5. signature verification 312 if let Some(key) = signing_key { 313 commit_obj 314 .verify(key) 315 .map_err(|_| SyncValidationError::SigFailure)?; 316 } 317 318 Ok(ValidatedSync { 319 commit_obj: commit_obj.into_static(), 320 }) 321} 322 323fn breaks_chain(msg: &Commit<'_>, root: &crate::types::Commit) -> ChainBreak { 324 // since should equal the rev of the previous commit; only flag when since is present and wrong 325 let since_mismatch = msg 326 .since 327 .as_ref() 328 .map(|since| since.as_str() != root.rev.to_tid().as_str()) 329 .unwrap_or(false); 330 331 // prev_data must equal the last known data CID when both are present 332 let prev_data_mismatch = match &msg.prev_data { 333 Some(prev_link) => match prev_link.to_ipld() { 334 Ok(cid) => cid != root.data, 335 Err(_) => true, // unparseable CID is a mismatch 336 }, 337 None => true, // no prev_data but we have a previous state is a chain break 338 }; 339 340 ChainBreak { 341 since_mismatch, 342 prev_data_mismatch, 343 } 344} 345 346/// apply the inverse of each op (in reverse order) to the new MST and verify the resulting root 347/// equals `msg.prev_data`. called only when `opts.verify_mst` is true. 348fn verify_mst( 349 msg: &Commit<'_>, 350 parsed: &ParsedCar, 351 commit_obj: &AtpCommit<'_>, 352 handle: &tokio::runtime::Handle, 353) -> miette::Result<()> { 354 let store = Arc::new(MemoryBlockStore::new_from_blocks(parsed.blocks.clone())); 355 let mut mst: Mst<MemoryBlockStore> = Mst::load(store, commit_obj.data, None); 356 357 handle.block_on(async { 358 for op in msg.ops.iter().rev() { 359 let inv = match &op.action { 360 RepoOpAction::Create => { 361 let cid_link = op 362 .cid 363 .as_ref() 364 .ok_or_else(|| miette::miette!("create op missing CID"))?; 365 let cid = cid_link.to_ipld().into_diagnostic()?; 366 VerifiedWriteOp::Create { 367 key: op.path.to_smolstr(), 368 cid, 369 } 370 } 371 RepoOpAction::Update => { 372 let cid_link = op 373 .cid 374 .as_ref() 375 .ok_or_else(|| miette::miette!("update op missing CID"))?; 376 let Some(prev_link) = op.prev.as_ref() else { 377 // prev is optional in inductive firehose (v3); skip if absent 378 continue; 379 }; 380 let cid = cid_link.to_ipld().into_diagnostic()?; 381 let prev = prev_link.to_ipld().into_diagnostic()?; 382 VerifiedWriteOp::Update { 383 key: op.path.to_smolstr(), 384 cid, 385 prev, 386 } 387 } 388 RepoOpAction::Delete => { 389 let Some(prev_link) = op.prev.as_ref() else { 390 // prev is optional in inductive firehose (v3); skip if absent 391 continue; 392 }; 393 let prev = prev_link.to_ipld().into_diagnostic()?; 394 VerifiedWriteOp::Delete { 395 key: op.path.to_smolstr(), 396 prev, 397 } 398 } 399 RepoOpAction::Other(action) => { 400 return Err(miette::miette!("unknown op action: {action}")); 401 } 402 }; 403 404 let ok = mst.invert_op(inv).await.into_diagnostic()?; 405 if !ok { 406 return Err(miette::miette!( 407 "MST inversion inconsistent with tree state for op on {}", 408 op.path 409 )); 410 } 411 } 412 413 // verify the resulting root CID equals prev_data (skip for genesis commits) 414 if let Some(prev_link) = &msg.prev_data { 415 let expected = prev_link.to_ipld().into_diagnostic()?; 416 let root_cid = mst.get_pointer().await.into_diagnostic()?; 417 if root_cid != expected { 418 return Err(miette::miette!( 419 "MST inversion root mismatch: expected {expected}, got {root_cid}" 420 )); 421 } 422 } 423 424 Ok(()) 425 }) 426}