very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust
fjall
at-protocol
atproto
indexer
1use jacquard_common::IntoStatic;
2use jacquard_common::types::crypto::PublicKey;
3use jacquard_repo::MemoryBlockStore;
4use jacquard_repo::Mst;
5use jacquard_repo::car::reader::{ParsedCar, parse_car_bytes};
6use jacquard_repo::commit::Commit as AtpCommit;
7use jacquard_repo::mst::VerifiedWriteOp;
8use miette::IntoDiagnostic;
9use smol_str::ToSmolStr;
10use std::sync::Arc;
11use thiserror::Error;
12
13use crate::ingest::stream::{Commit, RepoOpAction, Sync};
14use crate::types::RepoState;
15
16/// describes which size limit was exceeded
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum SizeLimitKind {
19 /// msg.blocks.len() > 2 MiB
20 BlocksField,
21 /// msg.ops.len() > 200
22 OpCount,
23 /// individual record block > 1 MiB
24 RecordSize,
25}
26
27impl std::fmt::Display for SizeLimitKind {
28 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29 match self {
30 Self::BlocksField => write!(f, "blocks field exceeds 2MiB"),
31 Self::OpCount => write!(f, "op count exceeds 200"),
32 Self::RecordSize => write!(f, "record block exceeds 1MiB"),
33 }
34 }
35}
36
37#[derive(Debug, Error)]
38pub enum CommitValidationError {
39 /// rev is not greater than the last known rev
40 #[error("stale rev")]
41 StaleRev,
42 /// rev timestamp exceeds the clock skew window
43 #[error("future rev")]
44 FutureRev,
45 /// CAR could not be parsed, or a required block is missing
46 #[error("malformed CAR: {0}")]
47 MalformedCar(miette::Report),
48 /// wire message fields differ from the signed commit object
49 #[error("field mismatch in {field}")]
50 FieldMismatch { field: &'static str },
51 /// signature verification failed.
52 #[error("signature verification failed")]
53 SigFailure,
54 /// a block, op count, or record exceeds the ATProto size limits
55 #[error("size limit exceeded: {0}")]
56 SizeLimitExceeded(SizeLimitKind),
57 /// MST inversion check failed (only when verify_mst = true)
58 #[error("MST inversion failed: {0}")]
59 MstInvalid(miette::Report),
60 /// commit arrived from a host that is not the authoritative PDS for this DID
61 /// (enforced in phase 2 relay worker)
62 #[allow(dead_code)]
63 #[error("commit from wrong host")]
64 WrongHost,
65}
66
67#[derive(Debug, Error)]
68pub enum SyncValidationError {
69 /// blocks field exceeds 2MiB
70 #[error("size limit exceeded")]
71 SizeLimitExceeded,
72 /// CAR could not be parsed
73 #[error("malformed CAR: {0}")]
74 MalformedCar(miette::Report),
75 /// wire message fields differ from the signed commit object
76 #[error("field mismatch in {field}")]
77 FieldMismatch { field: &'static str },
78 /// signature verification failed
79 #[error("signature verification failed")]
80 SigFailure,
81}
82
83/// indicates that the commit's chain pointers do not match the last known repo state.
84/// this is not a hard rejection so callers can decide whta they want to do
85#[derive(Default, Debug)]
86pub struct ChainBreak {
87 /// msg.since is present and does not match the last known rev
88 pub since_mismatch: bool,
89 /// msg.prev_data does not match the last known data CID
90 pub prev_data_mismatch: bool,
91}
92
93impl ChainBreak {
94 pub fn is_broken(&self) -> bool {
95 self.since_mismatch || self.prev_data_mismatch
96 }
97}
98
99/// a successfully validated `#commit` message, carrying pre-parsed data for apply_commit
100pub struct ValidatedCommit<'c> {
101 pub commit: &'c Commit<'c>,
102 /// result of parse_car_bytes, already done so apply_commit does not re-parse
103 pub parsed_blocks: ParsedCar,
104 pub commit_obj: AtpCommit<'static>,
105 pub chain_break: ChainBreak,
106}
107
108/// a successfully validated `#sync` message
109pub struct ValidatedSync {
110 pub commit_obj: AtpCommit<'static>,
111}
112
113pub struct ValidationOptions {
114 /// clock drift window for future-rev rejection (seconds). default: 300
115 pub rev_clock_skew_secs: i64,
116 /// run MST inversion validation (expensive). default: false
117 pub verify_mst: bool,
118}
119
120impl Default for ValidationOptions {
121 fn default() -> Self {
122 Self {
123 rev_clock_skew_secs: 300,
124 verify_mst: false,
125 }
126 }
127}
128
129/// all methods panic if called outside a tokio runtime context.
130pub struct ValidationContext<'a> {
131 pub opts: &'a ValidationOptions,
132}
133
134impl ValidationContext<'_> {
135 pub fn validate_commit<'c>(
136 &self,
137 msg: &'c Commit<'c>,
138 repo_state: &RepoState,
139 signing_key: Option<&PublicKey>,
140 ) -> Result<ValidatedCommit<'c>, CommitValidationError> {
141 validate_commit(msg, repo_state, signing_key, self.opts)
142 }
143
144 pub fn validate_sync(
145 &self,
146 msg: &Sync<'_>,
147 signing_key: Option<&PublicKey>,
148 ) -> Result<ValidatedSync, SyncValidationError> {
149 validate_sync(msg, signing_key)
150 }
151}
152
153/// validate an incoming `#commit` message.
154///
155/// on success, returns a `ValidatedCommit` carrying pre-parsed data so that
156/// `apply_commit` does not need to repeat the work.
157///
158/// chain-break (since/prevData mismatch) is NOT an error. callers check
159/// `validated.chain_break.is_some()` and decide how to respond.
160///
161/// - `signing_key`: `None` when signature verification is disabled.
162///
163/// panics if called outside a tokio runtime context.
164pub fn validate_commit<'c>(
165 msg: &'c Commit<'c>,
166 repo_state: &RepoState,
167 signing_key: Option<&PublicKey>,
168 opts: &ValidationOptions,
169) -> Result<ValidatedCommit<'c>, CommitValidationError> {
170 let handle = tokio::runtime::Handle::current();
171 const MAX_BLOCKS_BYTES: usize = 2_097_152; // 2 MiB
172 const MAX_OPS: usize = 200;
173 const MAX_RECORD_BYTES: usize = 1_048_576; // 1 MiB
174
175 // 1. size limits
176 if msg.blocks.len() > MAX_BLOCKS_BYTES {
177 return Err(CommitValidationError::SizeLimitExceeded(
178 SizeLimitKind::BlocksField,
179 ));
180 }
181 if msg.ops.len() > MAX_OPS {
182 return Err(CommitValidationError::SizeLimitExceeded(
183 SizeLimitKind::OpCount,
184 ));
185 }
186
187 // 2. stale rev, skip if msg.rev <= last known rev (lexicographic order)
188 if let Some(root) = &repo_state.root {
189 if msg.rev.as_str() <= root.rev.to_tid().as_str() {
190 return Err(CommitValidationError::StaleRev);
191 }
192 }
193
194 // 3. future rev, reject if rev timestamp is more than clock_skew_secs ahead of now
195 {
196 let rev_us = msg.rev.timestamp() as i64;
197 let now_us = chrono::Utc::now().timestamp_micros();
198 if rev_us > now_us + opts.rev_clock_skew_secs * 1_000_000 {
199 return Err(CommitValidationError::FutureRev);
200 }
201 }
202
203 // 4. CAR parse
204 let parsed = handle
205 .block_on(parse_car_bytes(msg.blocks.as_ref()))
206 .map_err(|e| CommitValidationError::MalformedCar(miette::miette!("{e}")))?;
207
208 let root_bytes = parsed.blocks.get(&parsed.root).ok_or_else(|| {
209 CommitValidationError::MalformedCar(miette::miette!("root block missing from CAR"))
210 })?;
211
212 // 5. commit object deserialization
213 let commit_obj = AtpCommit::from_cbor(root_bytes)
214 .map_err(|e| CommitValidationError::MalformedCar(miette::miette!("{e}")))?;
215
216 // 6. field consistency: wire message vs signed commit object
217 if commit_obj.did.as_str() != msg.repo.as_str() {
218 return Err(CommitValidationError::FieldMismatch { field: "repo" });
219 }
220 if commit_obj.rev.as_str() != msg.rev.as_str() {
221 return Err(CommitValidationError::FieldMismatch { field: "rev" });
222 }
223
224 // 7. signature verification
225 if let Some(key) = signing_key {
226 commit_obj
227 .verify(key)
228 .map_err(|_| CommitValidationError::SigFailure)?;
229 }
230
231 let commit_obj = commit_obj.into_static();
232
233 // 8. chain break checks
234 let chain_break = repo_state
235 .root
236 .as_ref()
237 .map(|r| breaks_chain(msg, r))
238 .unwrap_or_default();
239
240 // 9–10. per-record size limits and basic CBOR validity
241 for op in &msg.ops {
242 let Some(cid_link) = &op.cid else { continue };
243 let cid = cid_link.to_ipld().map_err(|e| {
244 CommitValidationError::MalformedCar(miette::miette!("invalid op CID: {e}"))
245 })?;
246 let Some(block) = parsed.blocks.get(&cid) else {
247 return Err(CommitValidationError::MalformedCar(miette::miette!(
248 "block for op CID {cid} missing from CAR"
249 )));
250 };
251
252 if block.len() > MAX_RECORD_BYTES {
253 return Err(CommitValidationError::SizeLimitExceeded(
254 SizeLimitKind::RecordSize,
255 ));
256 }
257
258 serde_ipld_dagcbor::from_slice::<jacquard_common::Data>(block).map_err(|e| {
259 CommitValidationError::MalformedCar(miette::miette!("record is not valid CBOR: {e}"))
260 })?;
261 }
262
263 // 11. MST inversion
264 if opts.verify_mst {
265 verify_mst(msg, &parsed, &commit_obj, &handle)
266 .map_err(CommitValidationError::MstInvalid)?;
267 }
268
269 Ok(ValidatedCommit {
270 commit: msg,
271 parsed_blocks: parsed,
272 commit_obj,
273 chain_break,
274 })
275}
276
277/// panics if called outside a tokio runtime context.
278pub fn validate_sync<'c>(
279 msg: &'c Sync<'c>,
280 signing_key: Option<&PublicKey>,
281) -> Result<ValidatedSync, SyncValidationError> {
282 let handle = tokio::runtime::Handle::current();
283 const MAX_BLOCKS_BYTES: usize = 2_097_152;
284
285 // 1. size limit
286 if msg.blocks.len() > MAX_BLOCKS_BYTES {
287 return Err(SyncValidationError::SizeLimitExceeded);
288 }
289
290 // 2. CAR parse
291 let parsed = handle
292 .block_on(parse_car_bytes(msg.blocks.as_ref()))
293 .map_err(|e| SyncValidationError::MalformedCar(miette::miette!("{e}")))?;
294
295 let root_bytes = parsed.blocks.get(&parsed.root).ok_or_else(|| {
296 SyncValidationError::MalformedCar(miette::miette!("root block missing from CAR"))
297 })?;
298
299 // 3. commit object deserialization
300 let commit_obj = AtpCommit::from_cbor(root_bytes)
301 .map_err(|e| SyncValidationError::MalformedCar(miette::miette!("{e}")))?;
302
303 // 4. field consistency
304 if commit_obj.did.as_str() != msg.did.as_str() {
305 return Err(SyncValidationError::FieldMismatch { field: "did" });
306 }
307 if commit_obj.rev.as_str() != msg.rev.as_str() {
308 return Err(SyncValidationError::FieldMismatch { field: "rev" });
309 }
310
311 // 5. signature verification
312 if let Some(key) = signing_key {
313 commit_obj
314 .verify(key)
315 .map_err(|_| SyncValidationError::SigFailure)?;
316 }
317
318 Ok(ValidatedSync {
319 commit_obj: commit_obj.into_static(),
320 })
321}
322
323fn breaks_chain(msg: &Commit<'_>, root: &crate::types::Commit) -> ChainBreak {
324 // since should equal the rev of the previous commit; only flag when since is present and wrong
325 let since_mismatch = msg
326 .since
327 .as_ref()
328 .map(|since| since.as_str() != root.rev.to_tid().as_str())
329 .unwrap_or(false);
330
331 // prev_data must equal the last known data CID when both are present
332 let prev_data_mismatch = match &msg.prev_data {
333 Some(prev_link) => match prev_link.to_ipld() {
334 Ok(cid) => cid != root.data,
335 Err(_) => true, // unparseable CID is a mismatch
336 },
337 None => true, // no prev_data but we have a previous state is a chain break
338 };
339
340 ChainBreak {
341 since_mismatch,
342 prev_data_mismatch,
343 }
344}
345
346/// apply the inverse of each op (in reverse order) to the new MST and verify the resulting root
347/// equals `msg.prev_data`. called only when `opts.verify_mst` is true.
348fn verify_mst(
349 msg: &Commit<'_>,
350 parsed: &ParsedCar,
351 commit_obj: &AtpCommit<'_>,
352 handle: &tokio::runtime::Handle,
353) -> miette::Result<()> {
354 let store = Arc::new(MemoryBlockStore::new_from_blocks(parsed.blocks.clone()));
355 let mut mst: Mst<MemoryBlockStore> = Mst::load(store, commit_obj.data, None);
356
357 handle.block_on(async {
358 for op in msg.ops.iter().rev() {
359 let inv = match &op.action {
360 RepoOpAction::Create => {
361 let cid_link = op
362 .cid
363 .as_ref()
364 .ok_or_else(|| miette::miette!("create op missing CID"))?;
365 let cid = cid_link.to_ipld().into_diagnostic()?;
366 VerifiedWriteOp::Create {
367 key: op.path.to_smolstr(),
368 cid,
369 }
370 }
371 RepoOpAction::Update => {
372 let cid_link = op
373 .cid
374 .as_ref()
375 .ok_or_else(|| miette::miette!("update op missing CID"))?;
376 let Some(prev_link) = op.prev.as_ref() else {
377 // prev is optional in inductive firehose (v3); skip if absent
378 continue;
379 };
380 let cid = cid_link.to_ipld().into_diagnostic()?;
381 let prev = prev_link.to_ipld().into_diagnostic()?;
382 VerifiedWriteOp::Update {
383 key: op.path.to_smolstr(),
384 cid,
385 prev,
386 }
387 }
388 RepoOpAction::Delete => {
389 let Some(prev_link) = op.prev.as_ref() else {
390 // prev is optional in inductive firehose (v3); skip if absent
391 continue;
392 };
393 let prev = prev_link.to_ipld().into_diagnostic()?;
394 VerifiedWriteOp::Delete {
395 key: op.path.to_smolstr(),
396 prev,
397 }
398 }
399 RepoOpAction::Other(action) => {
400 return Err(miette::miette!("unknown op action: {action}"));
401 }
402 };
403
404 let ok = mst.invert_op(inv).await.into_diagnostic()?;
405 if !ok {
406 return Err(miette::miette!(
407 "MST inversion inconsistent with tree state for op on {}",
408 op.path
409 ));
410 }
411 }
412
413 // verify the resulting root CID equals prev_data (skip for genesis commits)
414 if let Some(prev_link) = &msg.prev_data {
415 let expected = prev_link.to_ipld().into_diagnostic()?;
416 let root_cid = mst.get_pointer().await.into_diagnostic()?;
417 if root_cid != expected {
418 return Err(miette::miette!(
419 "MST inversion root mismatch: expected {expected}, got {root_cid}"
420 ));
421 }
422 }
423
424 Ok(())
425 })
426}