···117117 - [x] no-events-received timeout reconnect
118118- [x] account status convergeance: if we receive commits from apparently-inactive accounts, should we check upstream status to make sure we're not stale?
119119- [x] resync short-circuit: tiny repos may actually return their entire CAR for getRecord
120120+- [x] use jacquard's built-in inductive proof methods
120121- [~] repo-stream: drop record block contents with processor fn
121122 - [x] in getRecord before describeRepo
122123 - [ ] in commit handling
+40-98
src/sync/firehose/commit_event.rs
···1616//! 9. If we have a `repo_prev` and `prevData` is present: drop and queue
1717//! resync if the bytes don't match `prev.prev_data`.
18181919-use std::sync::Arc;
2020-2121-use cid::Cid as IpldCid;
2222-use jacquard_api::com_atproto::sync::subscribe_repos::{Commit, RepoOp};
2323-use jacquard_common::types::cid::CidLink;
1919+use jacquard_api::com_atproto::sync::subscribe_repos::Commit;
2420use jacquard_common::types::{string::Did, string::Nsid, tid::Tid};
2525-use jacquard_repo::mst::VerifiedWriteOp;
2626-use jacquard_repo::{MemoryBlockStore, Mst};
2727-use tracing::{debug, error, info, warn};
2121+use jacquard_repo::commit::firehose::{FirehoseCommit, RepoOp as FirehoseRepoOp};
2222+use tracing::{debug, error, info, trace, warn};
28232924use super::validate::{self, CarDrop};
3025use crate::identity::Resolver;
···116111 };
117112118113 // ── Steps 3–4: CAR parse + signature verification + field consistency ────
119119- let (new_mst_root_bytes, mst_root_cid, parsed) = match validate_car(&commit, &resolved.pubkey)
120120- .await
121121- {
114114+ let (new_mst_root_bytes, parsed) = match validate_car(&commit, &resolved.pubkey).await {
122115 Ok(v) => v,
123116 Err(CarDrop::InvalidSignature) => {
124117 let Some(fresh) =
···153146 // Clone the parsed CAR before consuming its blocks into the MST block store.
154147 // not super-cheap, but the Bytes values are refcounted at least so whatever
155148 let parsed_clone = parsed.clone();
156156-157157- let storage = Arc::new(MemoryBlockStore::new_from_blocks(parsed.blocks));
158158- let new_mst = Mst::load(Arc::clone(&storage), mst_root_cid, None);
159149160150 // ── Step 5: Inductive proof ───────────────────────────────────────────────
161161- let strict = pds_mode == Sync11Mode::Strict;
162162- if !verify_inductive_proof(new_mst, &commit.ops, commit.prev_data.as_ref(), strict).await? {
151151+ if pds_mode == Sync11Mode::Strict
152152+ && let Err(e) = to_firehose_commit(&commit)
153153+ .validate_v1_1(&resolved.pubkey)
154154+ .await
155155+ {
163156 metrics::counter!("lightrail_commit_dropped_total", "reason" => "proof_failed")
164157 .increment(1);
165165- debug!(did = %did, "commit dropped: inductive proof failed");
158158+ debug!(did = %did, error = %e, "commit dropped: inductive proof failed");
166159 return Ok(());
167160 }
168161···219212async fn validate_car(
220213 commit: &Commit<'static>,
221214 pubkey: &jacquard_common::types::crypto::PublicKey<'_>,
222222-) -> Result<(Vec<u8>, IpldCid, jacquard_repo::car::ParsedCar), CarDrop> {
215215+) -> Result<(Vec<u8>, jacquard_repo::car::ParsedCar), CarDrop> {
223216 // Parse the CAR slice embedded in the firehose event.
224217 let parsed = jacquard_repo::car::parse_car_bytes(commit.blocks.as_ref())
225218 .await
···254247 // for block store construction) and the parsed CAR (the blocks themselves).
255248 metrics::histogram!("lightrail_commit_car_bytes").record(commit.blocks.len() as f64);
256249 let mst_root_cid = repo_commit.data;
257257- Ok((mst_root_cid.to_bytes(), mst_root_cid, parsed))
250250+ Ok((mst_root_cid.to_bytes(), parsed))
258251}
259252260253// ---------------------------------------------------------------------------
261254// Step 5: Inductive proof (async, MST mutations)
262255// ---------------------------------------------------------------------------
263256264264-/// Verify the sync1.1 inductive proof for a commit.
265265-///
266266-/// Starting from the new MST (`mst`, loaded from this commit's CAR blocks),
267267-/// invert every op in the event's `ops` list in reverse order. Each inversion
268268-/// validates the op's `prev` CID in-place via [`Mst::invert_op`]. After all
269269-/// inversions the resulting MST root must equal `prev_data` (the previous MST
270270-/// root recorded in the commit object).
271271-///
272272-/// Returns `Ok(true)` if the proof passes or is skipped (pre-sync1.1),
273273-/// `Ok(false)` if it fails (caller should drop the commit), or an `Err` for
274274-/// a storage-level failure.
275275-async fn verify_inductive_proof(
276276- mut mst: Mst<MemoryBlockStore>,
277277- ops: &[RepoOp<'_>],
278278- prev_data: Option<&CidLink<'_>>,
279279- strict: bool,
280280-) -> crate::error::Result<bool> {
281281- // Without prevData we cannot check the final root.
282282- let Some(prev_data_link) = prev_data else {
283283- // Strict PDSes must always supply prevData.
284284- return Ok(!strict);
285285- };
286286-287287- let expected_prev_root = match prev_data_link.to_ipld() {
288288- Ok(cid) => cid,
289289- Err(_) => {
290290- // Malformed prevData CID — strict PDSes should never send this.
291291- return Ok(!strict);
292292- }
293293- };
294294-295295- // Invert each op in reverse order. The ops are in the CAR proof, so every
296296- // key's path from root is available in the block store.
297297- for op in ops.iter().rev() {
298298- let Some(verified_op) = convert_op(op) else {
299299- // An op is missing required CIDs — strict PDSes must always include them.
300300- return Ok(!strict);
301301- };
302302-303303- let ok = mst
304304- .invert_op(verified_op)
305305- .await
306306- .map_err(|e| crate::error::Error::Other(e.to_string()))?;
307307- if !ok {
308308- return Ok(false);
309309- }
310310- }
311311-312312- // After all inversions the tree root must match prevData.
313313- let actual_prev_root = mst
314314- .get_pointer()
315315- .await
316316- .map_err(|e| crate::error::Error::Other(e.to_string()))?;
317317-318318- Ok(actual_prev_root == expected_prev_root)
319319-}
320320-321321-/// Convert a firehose [`RepoOp`] into a [`VerifiedWriteOp`] for
322322-/// [`Mst::invert_op`].
323323-///
324324-/// Returns `None` if a required CID field is absent (pre-sync1.1 commits may
325325-/// omit `prev` on Update/Delete ops) or if the action is unrecognised.
326326-fn convert_op(op: &RepoOp<'_>) -> Option<VerifiedWriteOp> {
327327- let key = op.path.as_ref().into(); // &str → SmolStr
328328- match op.action.as_ref() {
329329- "create" => {
330330- let cid = op.cid.as_ref()?.to_ipld().ok()?;
331331- Some(VerifiedWriteOp::Create { key, cid })
332332- }
333333- "update" => {
334334- let cid = op.cid.as_ref()?.to_ipld().ok()?;
335335- let prev = op.prev.as_ref()?.to_ipld().ok()?;
336336- Some(VerifiedWriteOp::Update { key, cid, prev })
337337- }
338338- "delete" => {
339339- let prev = op.prev.as_ref()?.to_ipld().ok()?;
340340- Some(VerifiedWriteOp::Delete { key, prev })
341341- }
342342- _ => None,
257257+fn to_firehose_commit<'a>(commit: &Commit<'a>) -> FirehoseCommit<'a> {
258258+ FirehoseCommit {
259259+ repo: commit.repo.clone(),
260260+ rev: commit.rev.clone(),
261261+ seq: commit.seq,
262262+ since: commit.since.clone().unwrap_or_else(|| {
263263+ trace!("putting in a phony TID for FirehoseCommit (None on Commit)");
264264+ Tid::now_0()
265265+ }),
266266+ time: commit.time.clone(),
267267+ commit: commit.commit.clone(),
268268+ blocks: commit.blocks.clone(),
269269+ // HACK: reverse op order for inductive proof -- when upgrading jacquard this is fixed
270270+ ops: commit
271271+ .ops
272272+ .iter()
273273+ .rev()
274274+ .map(|op| FirehoseRepoOp {
275275+ action: op.action.clone(),
276276+ cid: op.cid.clone(),
277277+ path: op.path.clone(),
278278+ prev: op.prev.clone(),
279279+ })
280280+ .collect(),
281281+ prev_data: commit.prev_data.clone(),
282282+ blobs: commit.blobs.clone(),
283283+ too_big: commit.too_big,
284284+ rebase: commit.rebase,
343285 }
344286}
345287