···14141515enum SpanLen {
1616 /// all collections are known
1717- #[allow(dead_code)]
1717+ #[allow(dead_code)] // used in tests
1818 Exactly(usize),
1919 /// at least one gap exists
2020- #[allow(dead_code)]
2020+ #[allow(dead_code)] // used in tests
2121 AtLeast(usize),
2222}
2323···5050 /// the span has *no* gaps
5151 fn is_complete(&self) -> bool {
5252 matches!(self.len(), SpanLen::Exactly(_))
5353- }
5454- fn contains(&self, k: &T) -> Option<bool> {
5555- if self.things.contains_key(k) {
5656- return Some(true);
5757- }
5858- // try to gap_after from the key before this one, if present
5959- // (when btree_cursors land we can do nicer with that)
6060- let falls_in_gap = self
6161- .things
6262- .range(..k) // exclusive range: all keys lex-before us
6363- .next_back() // take the closest previous one
6464- .map(|(_, gap_after)| gap_after) // if it existed, find out if it had a gap after
6565- .unwrap_or(&self.gap_before); // no before-key: span starts with gap?
6666-6767- if *falls_in_gap { None } else { Some(false) }
6868- }
6969- /// definitive answer about whether it's *possible* for `k` to be in span
7070- ///
7171- /// key exist -> true
7272- /// key falls in a gap -> true (it's possible!)
7373- /// key falls after a key without a gap after -> false (not possible!)
7474- #[allow(dead_code)]
7575- fn may_contain(&self, k: &T) -> bool {
7676- self.contains(k).unwrap_or(true)
7753 }
7854}
+127-90
src/mst/mortality.rs
···1717//! Multi-op commits are handled because all ops are considered together: a key
1818//! is a "survivor" only if it is visible AND not in the deleted set.
19192020+use std::collections::{HashMap, HashSet};
2021use std::ops::Bound;
2121-use std::collections::HashSet;
22222323use jacquard_api::com_atproto::sync::subscribe_repos::{RepoOp, RepoOpAction};
2424use jacquard_common::types::string::Nsid;
2525-use repo_stream::{MemCar, WalkItem};
2625use repo_stream::{DriverBuilder, JacquardLoadError, WalkError};
2626+use repo_stream::{MemCar, WalkItem};
2727use tracing::error;
28282929use super::Span;
···51515252impl KeySpan {
5353 pub fn left_of(&self, key: &str) -> Existence {
5454- self
5555- .things
5454+ self.things
5655 .range(..key.to_string())
5756 .next_back()
5858- .map(|(k, gap)| if *gap { Existence::Uncertain } else { Existence::Yes(k.clone()) })
5959- .unwrap_or(if self.gap_before { Existence::Uncertain } else { Existence::No })
5757+ .map(|(k, gap)| {
5858+ if *gap {
5959+ Existence::Uncertain
6060+ } else {
6161+ Existence::Yes(k.clone())
6262+ }
6363+ })
6464+ .unwrap_or(if self.gap_before {
6565+ Existence::Uncertain
6666+ } else {
6767+ Existence::No
6868+ })
6069 }
6170 pub fn right_of(&self, key: &str, left: &Existence) -> Existence {
6271 if let Some(gap_after) = self.things.get(key) {
···7079 .range((Bound::Excluded(key.to_string()), Bound::Unbounded))
7180 .next()
7281 .map(|(k, _)| Existence::Yes(k.to_string()))
7373- .unwrap_or(Existence::No)
8282+ .unwrap_or(Existence::No);
7483 }
7584 // the key does not exist
7685 if *left == Existence::Uncertain {
···7988 return Existence::Uncertain;
8089 }
8190 // we're not in a gap, so we have certainty about what's next
8282- self
8383- .things
9191+ self.things
8492 .range((Bound::Excluded(key.to_string()), Bound::Unbounded))
8593 .next()
8694 .map(|(k, _)| Existence::Yes(k.clone()))
···170178 true
171179}
172180173173-/// Collect every MST leaf path visible in a (possibly partial) CAR.
174174-///
175175-/// Uses `next_keys()` which silently skips subtrees whose MST node blocks are
176176-/// absent, so this works on both full and proof-only CARs.
177177-#[cfg(test)]
178178-fn collect_visible_paths(parsed: jacquard_repo::car::reader::ParsedCar) -> Result<Vec<String>> {
179179- let mut car = DriverBuilder::new().load_jacquard_parsed_car(parsed)?;
180180- let mut visible = Vec::new();
181181- while let Some((path, _)) = car.next_keys()? {
182182- visible.push(path.to_string());
183183- }
184184- Ok(visible)
185185-}
186186-187181/// Result of [`extract`].
182182+#[derive(Debug, Default)]
188183pub struct ExtractResult {
189184 /// Collections newly created in this commit (all pre-existing neighbours
190185 /// were absent from the proof, or the proof covers the full range).
191191- pub born: Vec<Nsid<'static>>,
186186+ pub born: HashMap<Nsid<'static>, bool>,
192187 /// Collections fully deleted in this commit (proof covers the full range).
193193- pub died: Vec<Nsid<'static>>,
194194- /// True when a possible collection death could not be confirmed because the
195195- /// CAR proof has gaps that could hide surviving keys. The caller should
196196- /// queue a full-repo resync to verify the current collection state.
197197- pub needs_resync: bool,
188188+ pub died: HashMap<Nsid<'static>, bool>,
198189}
199190200191/// Walk the partial CAR's MST to detect which collections are newly added
···236227 }
237228238229 if creates.is_empty() && deletes.is_empty() {
239239- return Ok(ExtractResult { born: vec![], died: vec![], needs_resync: false });
230230+ return Ok(ExtractResult::default());
240231 }
241232242233 // ── Walk the partial CAR's MST to build a gap-aware key span ─────────────
243243- let mut mem_car = DriverBuilder::new().load_jacquard_parsed_car(parsed)?;
234234+ let mut mem_car = DriverBuilder::new()
235235+ .with_block_processor(|_| vec![]) // drop record blocks as much as possible
236236+ .load_jacquard_parsed_car(parsed)?;
244237 let span = span_from_slice(&mut mem_car)?;
245238246239 // ── Check collection death ────────────────────────────────────────────────
247240 // A collection died iff all visible keys in it are being deleted AND
248241 // the span proves there are no hidden surviving keys.
249249- let mut died: Vec<Nsid<'static>> = Vec::new();
250250- let mut needs_resync = false;
251251- for coll in deletes
252252- .iter()
253253- .filter_map(|p| p.split_once('/').map(|(c, _)| c))
254254- .collect::<HashSet<_>>()
255255- {
256256- let prefix = format!("{coll}/");
242242+ let mut died = HashMap::new();
243243+ let mut delete_collections_seen = HashSet::new();
244244+ for mst_key in &deletes {
245245+ let (collection, _) = mst_key
246246+ .split_once('/')
247247+ .ok_or(MstMortalityError::InvalidData(format!(
248248+ "mst key missing '/': {mst_key}"
249249+ )))?;
250250+ if !delete_collections_seen.insert(collection.to_string()) {
251251+ continue;
252252+ }
253253+ let nsid = collection
254254+ .parse()
255255+ .map_err(|e| MstMortalityError::InvalidData(format!("bad nsid: {e}")))?;
256256+257257+ let collection_terminated = format!("{collection}/");
257258 let has_survivor = span
258259 .things
259259- .range(prefix.clone()..)
260260- .take_while(|(k, _)| k.starts_with(&prefix))
260260+ .range(collection_terminated.clone()..)
261261+ .take_while(|(k, _)| k.starts_with(&collection_terminated))
261262 .any(|(k, _)| !deletes.contains(k.as_str()));
262263 if has_survivor {
263264 continue;
264265 }
265265- if !can_prove_complete_coverage(&span, &prefix) {
266266- // Possible death but the CAR has gaps — we can't confirm. Request
267267- // a full-repo resync so the caller can reconcile the index later.
268268- metrics::counter!("lightrail_mortality_unproven_total", "kind" => "death")
269269- .increment(1);
270270- error!(collection = coll, "possible collection death unproven due to MST gaps");
271271- needs_resync = true;
272272- continue;
273273- }
274274- if let Ok(nsid) = Nsid::new_owned(coll) {
275275- died.push(nsid);
266266+ if can_prove_complete_coverage(&span, &collection_terminated) {
267267+ died.insert(nsid, true);
268268+ } else {
269269+ died.insert(nsid, false);
270270+ metrics::counter!("lightrail_mortality_unproven_total", "kind" => "death").increment(1);
271271+ error!(
272272+ collection = collection,
273273+ "possible collection death unproven due to MST gaps"
274274+ );
276275 }
277276 }
278277···281280 // the span proves there are no hidden pre-existing keys.
282281 // If the proof has gaps, we still record the birth — false positives here
283282 // are harmless (the index entry already exists or the resync will fix it).
284284- let mut born: Vec<Nsid<'static>> = Vec::new();
285285- for coll in creates
286286- .iter()
287287- .filter_map(|p| p.split_once('/').map(|(c, _)| c))
288288- .collect::<HashSet<_>>()
289289- {
290290- let prefix = format!("{coll}/");
283283+ let mut born = HashMap::new();
284284+ let mut created_collections_seen = HashSet::new();
285285+ for mst_key in &creates {
286286+ let (collection, _) = mst_key
287287+ .split_once('/')
288288+ .ok_or(MstMortalityError::InvalidData(format!(
289289+ "mst key missing '/': {mst_key}"
290290+ )))?;
291291+ if !created_collections_seen.insert(collection.to_string()) {
292292+ continue;
293293+ }
294294+ let nsid = collection
295295+ .parse()
296296+ .map_err(|e| MstMortalityError::InvalidData(format!("bad nsid: {e}")))?;
297297+298298+ let collection_terminated = format!("{collection}/");
291299 let has_preexisting = span
292300 .things
293293- .range(prefix.clone()..)
294294- .take_while(|(k, _)| k.starts_with(&prefix))
301301+ .range(collection_terminated.clone()..)
302302+ .take_while(|(k, _)| k.starts_with(&collection_terminated))
295303 .any(|(k, _)| !creates.contains(k.as_str()));
296304 if has_preexisting {
297305 continue;
298306 }
299299- if !can_prove_complete_coverage(&span, &prefix) {
300300- metrics::counter!("lightrail_mortality_unproven_total", "kind" => "birth")
301301- .increment(1);
307307+ if can_prove_complete_coverage(&span, &collection_terminated) {
308308+ born.insert(nsid, true);
309309+ } else {
310310+ born.insert(nsid, false);
311311+ metrics::counter!("lightrail_mortality_unproven_total", "kind" => "birth").increment(1);
302312 error!(
303303- collection = coll,
313313+ collection,
304314 "possible collection birth unproven due to MST gaps"
305315 );
306306- // Fall through: include the birth anyway. Spurious births are
307307- // idempotent (index insert is a blind overwrite); false negatives
308308- // would silently drop new collections from the index.
309309- }
310310- if let Ok(nsid) = Nsid::new_owned(coll) {
311311- born.push(nsid);
312316 }
313317 }
314318315315- Ok(ExtractResult { born, died, needs_resync })
319319+ Ok(ExtractResult { born, died })
320320+}
321321+322322+/// Collect every MST leaf path visible in a (possibly partial) CAR.
323323+///
324324+/// Uses `next_keys()` which silently skips subtrees whose MST node blocks are
325325+/// absent, so this works on both full and proof-only CARs.
326326+#[cfg(test)]
327327+fn collect_visible_paths(parsed: jacquard_repo::car::reader::ParsedCar) -> Result<Vec<String>> {
328328+ let mut car = DriverBuilder::new().load_jacquard_parsed_car(parsed)?;
329329+ let mut visible = Vec::new();
330330+ while let Some((path, _)) = car.next_keys()? {
331331+ visible.push(path.to_string());
332332+ }
333333+ Ok(visible)
316334}
317335318336#[cfg(test)]
···417435 // CAR contains only the created key → no preexisting neighbours.
418436 let parsed = make_parsed_car(&["app.bsky.feed.post/abc123"]).await;
419437 let ops = [op_create("app.bsky.feed.post/abc123")];
420420- let ExtractResult { born, died, .. } = extract(&ops, parsed).unwrap();
421421- assert_eq!(born, vec![nsid("app.bsky.feed.post")]);
438438+ let ExtractResult { born, died } = extract(&ops, parsed).unwrap();
439439+ assert_eq!(
440440+ born,
441441+ HashMap::from_iter([(nsid("app.bsky.feed.post"), true)])
442442+ );
422443 assert!(died.is_empty());
423444 }
424445···441462 async fn collection_dies_when_last_key_deleted() {
442463 let parsed = make_parsed_car(&["app.bsky.feed.post/abc123"]).await;
443464 let ops = [op_delete("app.bsky.feed.post/abc123")];
444444- let ExtractResult { born, died, .. } = extract(&ops, parsed).unwrap();
465465+ let ExtractResult { born, died } = extract(&ops, parsed).unwrap();
445466 assert!(born.is_empty());
446446- assert_eq!(died, vec![nsid("app.bsky.feed.post")]);
467467+ assert_eq!(
468468+ died,
469469+ HashMap::from_iter([(nsid("app.bsky.feed.post"), true)])
470470+ );
447471 }
448472449473 #[tokio::test]
···472496 op_create("app.bsky.feed.post/abc123"),
473497 op_delete("app.bsky.graph.follow/old"),
474498 ];
475475- let ExtractResult { mut born, mut died, .. } = extract(&ops, parsed).unwrap();
476476- born.sort_unstable();
477477- died.sort_unstable();
478478- assert_eq!(born, vec![nsid("app.bsky.feed.post")]);
479479- assert_eq!(died, vec![nsid("app.bsky.graph.follow")]);
499499+ let ExtractResult { born, died } = extract(&ops, parsed).unwrap();
500500+ assert_eq!(
501501+ born,
502502+ HashMap::from_iter([(nsid("app.bsky.feed.post"), true)])
503503+ );
504504+ assert_eq!(
505505+ died,
506506+ HashMap::from_iter([(nsid("app.bsky.graph.follow"), true)])
507507+ );
480508 }
481509482510 // ---------------------------------------------------------------------------
···544572 // coverage of the collection.
545573546574 #[tokio::test]
547547- async fn gap_suppresses_death_and_sets_needs_resync() {
575575+ async fn gap_suppresses_death_and_sets_unprovable_deaths() {
548576 let parsed = make_root_only_parsed_car(&[
549577 "app.bsky.feed.post/454397e440ec", // layer 4 — visible in root
550578 "app.bsky.feed.post/aaa", // layer 0 — hidden behind right child
···552580 .await;
553581 // Delete only the visible key; the hidden sibling might be a survivor.
554582 let ops = [op_delete("app.bsky.feed.post/454397e440ec")];
555555- let ExtractResult { born, died, needs_resync } = extract(&ops, parsed).unwrap();
583583+ let ExtractResult { born, died } = extract(&ops, parsed).unwrap();
556584 assert!(born.is_empty(), "unexpected birth: {born:?}");
557557- assert!(died.is_empty(), "death declared despite gap: {died:?}");
558558- assert!(needs_resync, "needs_resync should be set when death is unproven");
585585+ assert_eq!(
586586+ died,
587587+ HashMap::from_iter([(nsid("app.bsky.feed.post"), false),]),
588588+ "death declared despite (unprovable): {died:?}"
589589+ );
559590 }
560591561592 #[tokio::test]
···568599 // Create only the visible key; gap can't prove no pre-existing keys exist,
569600 // but we include the birth anyway (false positives are idempotent).
570601 let ops = [op_create("app.bsky.feed.post/454397e440ec")];
571571- let ExtractResult { born, died, needs_resync } = extract(&ops, parsed).unwrap();
572572- assert_eq!(born, vec![nsid("app.bsky.feed.post")], "birth should be included despite gap");
602602+ let ExtractResult { born, died } = extract(&ops, parsed).unwrap();
603603+ assert_eq!(
604604+ born,
605605+ HashMap::from_iter([(nsid("app.bsky.feed.post"), false)]),
606606+ "birth should be included despite gap"
607607+ );
573608 assert!(died.is_empty(), "unexpected death: {died:?}");
574574- assert!(!needs_resync, "needs_resync should not be set for unproven birth");
575609 }
576610}
577611···693727 #[test]
694728 fn right_of_key_not_in_span_left_uncertain() {
695729 let s = span(false, &[("a/r1", true), ("c/r1", false)]);
696696- assert_eq!(s.right_of("b/r1", &Existence::Uncertain), Existence::Uncertain);
730730+ assert_eq!(
731731+ s.right_of("b/r1", &Existence::Uncertain),
732732+ Existence::Uncertain
733733+ );
697734 }
698735699736 // Key is NOT in the span; left is definite → look for the next key to the right.
···787824788825#[cfg(test)]
789826mod fixture_tests {
790790- use super::{collect_visible_paths, extract, ExtractResult};
827827+ use super::{ExtractResult, collect_visible_paths, extract};
791828 use std::collections::{BTreeMap, HashSet};
792829 use std::sync::Arc;
793830
+60-38
src/sync/firehose/commit_event.rs
···1717//! resync if the bytes don't match `prev.prev_data`.
18181919use jacquard_api::com_atproto::sync::subscribe_repos::Commit;
2020-use jacquard_common::types::{string::Did, string::Nsid, tid::Tid};
2020+use jacquard_common::types::{string::Did, tid::Tid};
2121use jacquard_repo::commit::firehose::validate_v1_1;
2222use tracing::{debug, error, info, warn};
23232424use super::validate::{self, CarDrop};
2525use crate::identity::Resolver;
2626+use crate::mst::{self, mortality::ExtractResult};
2627use crate::storage::{
2728 self, DbRef,
2829 pds_host::{self, Sync11Mode},
···160161 metrics::histogram!("lightrail_commit_ops").record(commit.ops.len() as f64);
161162162163 // ── Collection birth/death detection ─────────────────────────────────────
163163- let crate::mst::mortality::ExtractResult { born, died, needs_resync } =
164164- crate::mst::mortality::extract(&commit.ops, parsed_clone)?;
164164+ let mortality = mst::mortality::extract(&commit.ops, parsed_clone)?;
165165166166 // ── Steps 6–9: Blocking storage checks + repo_prev update ───────────────
167167 let db = db.clone();
···185185 since,
186186 incoming_prev_data,
187187 new_mst_root_bytes,
188188- born,
189189- died,
190190- needs_resync,
188188+ mortality,
191189 pds_host,
192190 current_mode: pds_mode,
193191 },
···362360 since: Option<Tid>,
363361 incoming_prev_data: Option<Vec<u8>>,
364362 new_mst_root_bytes: Vec<u8>,
365365- born: Vec<Nsid<'static>>,
366366- died: Vec<Nsid<'static>>,
367367- needs_resync: bool,
363363+ mortality: ExtractResult,
368364 pds_host: Option<Host>,
369365 current_mode: Sync11Mode,
370366}
···383379 since,
384380 incoming_prev_data,
385381 new_mst_root_bytes,
386386- born,
387387- died,
388388- needs_resync,
382382+ mortality,
389383 pds_host,
390384 current_mode,
391385 }: ValidationState,
···417411 }
418412 }
419413420420- if !born.is_empty() {
421421- let names = born
422422- .iter()
423423- .map(|n| n.as_str())
424424- .collect::<Vec<_>>()
425425- .join(", ");
426426- info!(did = %did, collections = names, "collection birth");
414414+ if !mortality.born.is_empty() {
415415+ let names = mortality.born.keys().collect::<Vec<_>>();
416416+ info!(did = %did, collections = ?names, "collection birth");
427417 }
428428- if !died.is_empty() {
429429- let names = died
430430- .iter()
431431- .map(|n| n.as_str())
432432- .collect::<Vec<_>>()
433433- .join(", ");
434434- info!(did = %did, collections = names, "collection death");
418418+ if !mortality.died.is_empty() {
419419+ let names = mortality.died.keys().collect::<Vec<_>>();
420420+ info!(did = %did, collections = ?names, "collection death");
435421 }
436422437423 // All checks passed — atomically update the prev_data and the collection
438424 // index (born → insert, died → remove). Also record each born collection
439425 // in the global collection list (blind overwrite, never deleted).
440440- let n_born = born.len() as u64;
441441- let n_died = died.len() as u64;
426426+ let n_born = mortality.born.len() as u64;
427427+ let n_died = mortality.died.len() as u64;
442428 let mut batch = db.database.batch();
443429 storage::repo::put_prev_into(
444430 &mut batch,
···449435 prev_data: new_mst_root_bytes,
450436 },
451437 );
452452- for coll in born {
438438+ for coll in mortality.born.keys() {
453439 // TODO(temporary): detect spurious births to confirm pre-sync1.1 hypothesis
454440 if storage::collection_index::has_collection(db, &did, coll.clone())? {
455441 if incoming_prev_data.is_some() {
456442 error!(
457443 did = %did,
458458- collection = coll.as_str(),
459459- "spurious birth on sync1.1 commit — collection already indexed (unexpected)"
444444+ collection = %coll,
445445+ sync11 = true,
446446+ "spurious birth, already indexed"
460447 );
461448 } else {
462462- warn!(
449449+ error!(
463450 did = %did,
464464- collection = coll.as_str(),
465465- "spurious birth on pre-sync1.1 commit (no prevData) — expected"
451451+ collection = %coll,
452452+ sync11 = false,
453453+ "spurious birth, already indexed"
466454 );
467455 }
468456 }
469457 storage::collection_index::insert_into(&mut batch, db, &did, coll.clone());
470458 }
471471- for coll in died {
472472- storage::collection_index::remove_into(&mut batch, db, &did, coll);
459459+ let mut all_deaths_proven = true;
460460+ for (coll, proven) in &mortality.died {
461461+ // TODO(remove): detect phandom deaths???
462462+ if !storage::collection_index::has_collection(db, &did, coll.clone())? {
463463+ if incoming_prev_data.is_some() {
464464+ error!(
465465+ did = %did,
466466+ collection = %coll,
467467+ proven,
468468+ sync11 = true,
469469+ "phantom death, collection not indexed"
470470+ );
471471+ } else {
472472+ error!(
473473+ did = %did,
474474+ collection = %coll,
475475+ proven,
476476+ sync11 = false,
477477+ "phantom death, collection not indexed"
478478+ );
479479+ }
480480+ }
481481+482482+ if !proven {
483483+ all_deaths_proven = false;
484484+ continue;
485485+ }
486486+487487+ storage::collection_index::remove_into(&mut batch, db, &did, coll.clone());
473488 }
474489475490 // If mortality detection found a possible-but-unproven collection death,
476491 // queue a full-repo resync so the index can be reconciled once we have
477492 // a complete view of the repo's current MST.
478478- if needs_resync {
493493+ if !all_deaths_proven {
494494+ let maybes: Vec<_> = mortality
495495+ .died
496496+ .iter()
497497+ .filter(|(_, p)| !*p)
498498+ .map(|(k, _)| k)
499499+ .collect();
500500+ error!(did = %did, maybe_deaths = ?maybes, "queuing resync due to unprovable death");
479501 storage::resync_queue::enqueue_into(
480502 &mut batch,
481503 db,
···483505 &crate::storage::resync_queue::ResyncItem {
484506 did: did.clone(),
485507 retry_count: 0,
486486- retry_reason: "possible collection death unproven due to MST gaps".to_string(),
508508+ retry_reason: "unprovable_death".to_string(),
487509 commit_cbor: vec![],
488510 },
489511 );