lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

stricter births/deaths

phil 40379369 5ed3cf9c

+548 -47
+525 -45
src/mst/mortality.rs
··· 17 17 //! Multi-op commits are handled because all ops are considered together: a key 18 18 //! is a "survivor" only if it is visible AND not in the deleted set. 19 19 20 + use std::ops::Bound; 20 21 use std::collections::HashSet; 21 22 22 - use jacquard_api::com_atproto::sync::subscribe_repos::RepoOp; 23 + use jacquard_api::com_atproto::sync::subscribe_repos::{RepoOp, RepoOpAction}; 23 24 use jacquard_common::types::string::Nsid; 25 + use repo_stream::{MemCar, WalkItem}; 24 26 use repo_stream::{DriverBuilder, JacquardLoadError, WalkError}; 27 + use tracing::error; 25 28 26 29 use super::Span; 27 30 ··· 31 34 Load(#[from] JacquardLoadError), 32 35 #[error("MST walk error: {0}")] 33 36 Walk(#[from] WalkError), 37 + #[error("bad data in repo: {0}")] 38 + InvalidData(String), 34 39 } 35 40 36 41 type Result<T> = std::result::Result<T, MstMortalityError>; 37 42 38 43 type KeySpan = Span<String>; 39 44 45 + #[derive(Debug, PartialEq)] 46 + pub enum Existence { 47 + Yes(String), 48 + No, 49 + Uncertain, 50 + } 51 + 52 + impl KeySpan { 53 + pub fn left_of(&self, key: &str) -> Existence { 54 + self 55 + .things 56 + .range(..key.to_string()) 57 + .next_back() 58 + .map(|(k, gap)| if *gap { Existence::Uncertain } else { Existence::Yes(k.clone()) }) 59 + .unwrap_or(if self.gap_before { Existence::Uncertain } else { Existence::No }) 60 + } 61 + pub fn right_of(&self, key: &str, left: &Existence) -> Existence { 62 + if let Some(gap_after) = self.things.get(key) { 63 + // the key itself exists so we can just look right 64 + if *gap_after { 65 + return Existence::Uncertain; 66 + } 67 + // no gap after so it's not uncertain, just down to existence 68 + return self 69 + .things 70 + .range((Bound::Excluded(key.to_string()), Bound::Unbounded)) 71 + .next() 72 + .map(|(k, _)| Existence::Yes(k.to_string())) 73 + .unwrap_or(Existence::No) 74 + } 75 + // the key does not exist 76 + if *left == Existence::Uncertain { 77 + // we're in a gap, so the right key is uncertain 78 + // *technically* there is a next-legal-atproto-key we could check here to *be* certain (TODO) 79 + return Existence::Uncertain; 80 + } 81 + // we're not in a gap, so we have certainty about what's next 82 + self 83 + .things 84 + .range((Bound::Excluded(key.to_string()), Bound::Unbounded)) 85 + .next() 86 + .map(|(k, _)| Existence::Yes(k.clone())) 87 + .unwrap_or(Existence::No) 88 + } 89 + // pub fn is_lonely(key: &str) -> MortalityResult { 90 + // // first: get left and right keys 91 + 92 + // let falls_in_gap = self 93 + // .things 94 + // .range(..k) // exclusive range: all keys lex-before us 95 + // .next_back() // take the closest previous one 96 + // .map(|(_, gap_after)| gap_after) // if it existed, find out if it had a gap after 97 + // .unwrap_or(&self.gap_before); // no before-key: span starts with gap? 98 + // todo!() 99 + // } 100 + } 101 + 102 + /// extract a span of collection NSIDs (with possible gaps) from a CAR slice 103 + fn span_from_slice(car: &mut MemCar) -> Result<KeySpan> { 104 + let mut prev_gap = false; 105 + let mut prev_key = None; 106 + 107 + let mut span = KeySpan::empty(); 108 + 109 + while let Some(item) = car.next()? { 110 + assert!( 111 + !matches!(item, WalkItem::Node { .. }), 112 + "car.next() does not return found nodes" 113 + ); 114 + 115 + let Some(key) = item.key() else { 116 + prev_gap = true; 117 + continue; 118 + }; 119 + 120 + if let Some(prev) = prev_key { 121 + span.things.insert(prev, prev_gap); 122 + } else { 123 + span.gap_before = prev_gap; 124 + } 125 + 126 + prev_gap = false; 127 + prev_key = Some(key.clone()); 128 + } 129 + 130 + if let Some(prev) = prev_key { 131 + span.things.insert(prev, prev_gap); 132 + } else { 133 + span.gap_before = prev_gap; 134 + } 135 + 136 + Ok(span) 137 + } 138 + 139 + /// Returns true iff the span proves there are no hidden keys with the given prefix. 140 + /// 141 + /// Conditions: 142 + /// (a) At least one key with the prefix is visible in the span. 143 + /// (b) `left_of` the first visible prefixed key is NOT Uncertain (no hidden keys before it). 144 + /// (c) Every visible prefixed key has `gap_after = false` (no hidden keys within or after). 145 + fn can_prove_complete_coverage(span: &KeySpan, prefix: &str) -> bool { 146 + let first_k = span 147 + .things 148 + .range(prefix.to_string()..) 149 + .next() 150 + .filter(|(k, _)| k.starts_with(prefix)) 151 + .map(|(k, _)| k.as_str()); 152 + 153 + let first_k = match first_k { 154 + Some(k) => k, 155 + None => return false, 156 + }; 157 + 158 + if matches!(span.left_of(first_k), Existence::Uncertain) { 159 + return false; 160 + } 161 + 162 + for (k, gap_after) in span.things.range(prefix.to_string()..) { 163 + if !k.starts_with(prefix) { 164 + break; 165 + } 166 + if *gap_after { 167 + return false; 168 + } 169 + } 170 + true 171 + } 172 + 40 173 /// Collect every MST leaf path visible in a (possibly partial) CAR. 41 174 /// 42 175 /// Uses `next_keys()` which silently skips subtrees whose MST node blocks are 43 176 /// absent, so this works on both full and proof-only CARs. 177 + #[cfg(test)] 44 178 fn collect_visible_paths(parsed: jacquard_repo::car::reader::ParsedCar) -> Result<Vec<String>> { 45 179 let mut car = DriverBuilder::new().load_jacquard_parsed_car(parsed)?; 46 180 let mut visible = Vec::new(); ··· 50 184 Ok(visible) 51 185 } 52 186 187 + /// Result of [`extract`]. 188 + pub struct ExtractResult { 189 + /// Collections newly created in this commit (all pre-existing neighbours 190 + /// were absent from the proof, or the proof covers the full range). 191 + pub born: Vec<Nsid<'static>>, 192 + /// Collections fully deleted in this commit (proof covers the full range). 193 + pub died: Vec<Nsid<'static>>, 194 + /// True when a possible collection death could not be confirmed because the 195 + /// CAR proof has gaps that could hide surviving keys. The caller should 196 + /// queue a full-repo resync to verify the current collection state. 197 + pub needs_resync: bool, 198 + } 199 + 53 200 /// Walk the partial CAR's MST to detect which collections are newly added 54 201 /// ("born") or fully removed ("died") by this commit. 55 - /// 56 - /// Returns `(born, died)` — both lists may be empty. 57 202 pub fn extract( 58 203 ops: &[RepoOp<'_>], 59 204 parsed: jacquard_repo::car::reader::ParsedCar, 60 - ) -> Result<(Vec<Nsid<'static>>, Vec<Nsid<'static>>)> { 205 + ) -> Result<ExtractResult> { 206 + // ── Check for duplicate paths (protocol violation in untrusted data) ────── 207 + let mut seen: HashSet<&str> = HashSet::with_capacity(ops.len()); 208 + for op in ops { 209 + if !seen.insert(op.path.as_ref()) { 210 + metrics::counter!( 211 + "lightrail_mortality_invalid_ops_total", 212 + "reason" => "duplicate_path" 213 + ) 214 + .increment(1); 215 + error!(path = %op.path, "duplicate path in commit ops; skipping commit"); 216 + return Err(MstMortalityError::InvalidData(format!( 217 + "duplicate op path: {}", 218 + op.path 219 + ))); 220 + } 221 + } 222 + 61 223 // ── Build create/delete path sets ──────────────────────────────────────── 62 - let mut created: HashSet<String> = HashSet::new(); 63 - let mut deleted: HashSet<String> = HashSet::new(); 224 + let mut creates: HashSet<&str> = HashSet::new(); 225 + let mut deletes: HashSet<&str> = HashSet::new(); 64 226 for op in ops { 65 - match op.action.as_ref() { 66 - "create" => { 67 - created.insert(op.path.to_string()); 227 + match op.action { 228 + RepoOpAction::Create => { 229 + creates.insert(op.path.as_ref()); 68 230 } 69 - "delete" => { 70 - deleted.insert(op.path.to_string()); 231 + RepoOpAction::Delete => { 232 + deletes.insert(op.path.as_ref()); 71 233 } 72 234 _ => {} // updates don't affect collection mortality 73 235 } 74 236 } 75 237 76 - if created.is_empty() && deleted.is_empty() { 77 - return Ok((vec![], vec![])); 238 + if creates.is_empty() && deletes.is_empty() { 239 + return Ok(ExtractResult { born: vec![], died: vec![], needs_resync: false }); 78 240 } 79 241 80 - // ── Walk the partial CAR's MST to collect visible leaf keys ────────────── 81 - let visible = collect_visible_paths(parsed)?; 242 + // ── Walk the partial CAR's MST to build a gap-aware key span ───────────── 243 + let mut mem_car = DriverBuilder::new().load_jacquard_parsed_car(parsed)?; 244 + let span = span_from_slice(&mut mem_car)?; 82 245 83 - // ── Check collection death (all visible keys in C are being deleted) ────── 84 - let deleted_collections: HashSet<&str> = deleted 246 + // ── Check collection death ──────────────────────────────────────────────── 247 + // A collection died iff all visible keys in it are being deleted AND 248 + // the span proves there are no hidden surviving keys. 249 + let mut died: Vec<Nsid<'static>> = Vec::new(); 250 + let mut needs_resync = false; 251 + for coll in deletes 85 252 .iter() 86 253 .filter_map(|p| p.split_once('/').map(|(c, _)| c)) 87 - .collect(); 88 - 89 - let mut died: Vec<Nsid<'static>> = Vec::new(); 90 - for coll in deleted_collections { 254 + .collect::<HashSet<_>>() 255 + { 91 256 let prefix = format!("{coll}/"); 92 - let has_survivor = visible 93 - .iter() 94 - .any(|k| k.starts_with(&prefix) && !deleted.contains(k.as_str())); 95 - if !has_survivor && let Ok(nsid) = Nsid::new_owned(coll) { 257 + let has_survivor = span 258 + .things 259 + .range(prefix.clone()..) 260 + .take_while(|(k, _)| k.starts_with(&prefix)) 261 + .any(|(k, _)| !deletes.contains(k.as_str())); 262 + if has_survivor { 263 + continue; 264 + } 265 + if !can_prove_complete_coverage(&span, &prefix) { 266 + // Possible death but the CAR has gaps — we can't confirm. Request 267 + // a full-repo resync so the caller can reconcile the index later. 268 + metrics::counter!("lightrail_mortality_unproven_total", "kind" => "death") 269 + .increment(1); 270 + error!(collection = coll, "possible collection death unproven due to MST gaps"); 271 + needs_resync = true; 272 + continue; 273 + } 274 + if let Ok(nsid) = Nsid::new_owned(coll) { 96 275 died.push(nsid); 97 276 } 98 277 } 99 278 100 - // ── Check collection birth (all visible keys in C are being created) ────── 101 - let created_collections: HashSet<&str> = created 279 + // ── Check collection birth ──────────────────────────────────────────────── 280 + // A collection was born iff all visible keys in it are being created AND 281 + // the span proves there are no hidden pre-existing keys. 282 + // If the proof has gaps, we still record the birth — false positives here 283 + // are harmless (the index entry already exists or the resync will fix it). 284 + let mut born: Vec<Nsid<'static>> = Vec::new(); 285 + for coll in creates 102 286 .iter() 103 287 .filter_map(|p| p.split_once('/').map(|(c, _)| c)) 104 - .collect(); 105 - 106 - let mut born: Vec<Nsid<'static>> = Vec::new(); 107 - for coll in created_collections { 288 + .collect::<HashSet<_>>() 289 + { 108 290 let prefix = format!("{coll}/"); 109 - let has_preexisting = visible 110 - .iter() 111 - .any(|k| k.starts_with(&prefix) && !created.contains(k.as_str())); 112 - if !has_preexisting && let Ok(nsid) = Nsid::new_owned(coll) { 291 + let has_preexisting = span 292 + .things 293 + .range(prefix.clone()..) 294 + .take_while(|(k, _)| k.starts_with(&prefix)) 295 + .any(|(k, _)| !creates.contains(k.as_str())); 296 + if has_preexisting { 297 + continue; 298 + } 299 + if !can_prove_complete_coverage(&span, &prefix) { 300 + metrics::counter!("lightrail_mortality_unproven_total", "kind" => "birth") 301 + .increment(1); 302 + error!( 303 + collection = coll, 304 + "possible collection birth unproven due to MST gaps" 305 + ); 306 + // Fall through: include the birth anyway. Spurious births are 307 + // idempotent (index insert is a blind overwrite); false negatives 308 + // would silently drop new collections from the index. 309 + } 310 + if let Ok(nsid) = Nsid::new_owned(coll) { 113 311 born.push(nsid); 114 312 } 115 313 } 116 314 117 - Ok((born, died)) 315 + Ok(ExtractResult { born, died, needs_resync }) 118 316 } 119 317 120 318 #[cfg(test)] 121 319 mod tests { 122 320 use super::*; 321 + use std::collections::BTreeMap; 123 322 use std::sync::Arc; 124 323 125 324 use bytes::Bytes; 325 + use cid::Cid as IpldCid; 126 326 use jacquard_api::com_atproto::sync::subscribe_repos::RepoOpAction; 127 327 use jacquard_common::CowStr; 128 328 use jacquard_common::types::string::Did; 129 329 use jacquard_common::types::tid::Tid; 330 + use jacquard_repo::car::reader::ParsedCar; 130 331 use jacquard_repo::commit::Commit; 131 332 use jacquard_repo::{BlockStore, MemoryBlockStore, Mst, car::write_car_bytes}; 132 333 ··· 187 388 #[tokio::test] 188 389 async fn empty_ops_returns_empty() { 189 390 let parsed = make_parsed_car(&["app.bsky.feed.post/abc123"]).await; 190 - let (born, died) = extract(&[], parsed).unwrap(); 391 + let ExtractResult { born, died, .. } = extract(&[], parsed).unwrap(); 191 392 assert!(born.is_empty()); 192 393 assert!(died.is_empty()); 193 394 } ··· 202 403 prev: None, 203 404 extra_data: Default::default(), 204 405 }]; 205 - let (born, died) = extract(&ops, parsed).unwrap(); 406 + let ExtractResult { born, died, .. } = extract(&ops, parsed).unwrap(); 206 407 assert!(born.is_empty()); 207 408 assert!(died.is_empty()); 208 409 } ··· 216 417 // CAR contains only the created key → no preexisting neighbours. 217 418 let parsed = make_parsed_car(&["app.bsky.feed.post/abc123"]).await; 218 419 let ops = [op_create("app.bsky.feed.post/abc123")]; 219 - let (born, died) = extract(&ops, parsed).unwrap(); 420 + let ExtractResult { born, died, .. } = extract(&ops, parsed).unwrap(); 220 421 assert_eq!(born, vec![nsid("app.bsky.feed.post")]); 221 422 assert!(died.is_empty()); 222 423 } ··· 227 428 let parsed = 228 429 make_parsed_car(&["app.bsky.feed.post/abc123", "app.bsky.feed.post/def456"]).await; 229 430 let ops = [op_create("app.bsky.feed.post/abc123")]; 230 - let (born, died) = extract(&ops, parsed).unwrap(); 431 + let ExtractResult { born, died, .. } = extract(&ops, parsed).unwrap(); 231 432 assert!(born.is_empty()); 232 433 assert!(died.is_empty()); 233 434 } ··· 240 441 async fn collection_dies_when_last_key_deleted() { 241 442 let parsed = make_parsed_car(&["app.bsky.feed.post/abc123"]).await; 242 443 let ops = [op_delete("app.bsky.feed.post/abc123")]; 243 - let (born, died) = extract(&ops, parsed).unwrap(); 444 + let ExtractResult { born, died, .. } = extract(&ops, parsed).unwrap(); 244 445 assert!(born.is_empty()); 245 446 assert_eq!(died, vec![nsid("app.bsky.feed.post")]); 246 447 } ··· 251 452 let parsed = 252 453 make_parsed_car(&["app.bsky.feed.post/abc123", "app.bsky.feed.post/def456"]).await; 253 454 let ops = [op_delete("app.bsky.feed.post/abc123")]; 254 - let (born, died) = extract(&ops, parsed).unwrap(); 455 + let ExtractResult { born, died, .. } = extract(&ops, parsed).unwrap(); 255 456 assert!(born.is_empty()); 256 457 assert!(died.is_empty()); 257 458 } ··· 271 472 op_create("app.bsky.feed.post/abc123"), 272 473 op_delete("app.bsky.graph.follow/old"), 273 474 ]; 274 - let (mut born, mut died) = extract(&ops, parsed).unwrap(); 475 + let ExtractResult { mut born, mut died, .. } = extract(&ops, parsed).unwrap(); 275 476 born.sort_unstable(); 276 477 died.sort_unstable(); 277 478 assert_eq!(born, vec![nsid("app.bsky.feed.post")]); 278 479 assert_eq!(died, vec![nsid("app.bsky.graph.follow")]); 279 480 } 481 + 482 + // --------------------------------------------------------------------------- 483 + // Duplicate path detection 484 + // --------------------------------------------------------------------------- 485 + 486 + #[tokio::test] 487 + async fn duplicate_op_paths_returns_error() { 488 + let parsed = make_parsed_car(&["app.bsky.feed.post/abc"]).await; 489 + let ops = [ 490 + op_create("app.bsky.feed.post/abc"), 491 + op_delete("app.bsky.feed.post/abc"), 492 + ]; 493 + assert!(matches!( 494 + extract(&ops, parsed), 495 + Err(MstMortalityError::InvalidData(_)) 496 + )); 497 + } 498 + 499 + // --------------------------------------------------------------------------- 500 + // Gap suppression 501 + // --------------------------------------------------------------------------- 502 + 503 + /// Build a sparse ParsedCar containing only the root MST node and commit. 504 + /// 505 + /// All subtree node blocks are absent, so subtrees surface as 506 + /// `MissingSubtree` during the walk. Useful for testing gap detection. 507 + async fn make_root_only_parsed_car(keys: &[&str]) -> ParsedCar { 508 + let storage = Arc::new(MemoryBlockStore::new()); 509 + let mut mst = Mst::new(storage.clone()); 510 + let dummy_cid = storage.put(b"record").await.unwrap(); 511 + for key in keys { 512 + mst = mst.add(key, dummy_cid).await.unwrap(); 513 + } 514 + let (mst_root, all_blocks) = mst.collect_blocks().await.unwrap(); 515 + let commit = Commit { 516 + did: Did::new_owned("did:web:example.com").unwrap(), 517 + version: 3, 518 + data: mst_root, 519 + rev: Tid::now_0(), 520 + prev: None, 521 + sig: Bytes::from(vec![0u8; 64]), 522 + }; 523 + let commit_cid = commit.to_cid().unwrap(); 524 + let commit_cbor = Bytes::from(commit.to_cbor().unwrap()); 525 + let root_bytes = all_blocks 526 + .get(&mst_root) 527 + .expect("root MST node not in blocks") 528 + .clone(); 529 + let mut sparse: BTreeMap<IpldCid, Bytes> = BTreeMap::new(); 530 + sparse.insert(commit_cid, commit_cbor); 531 + sparse.insert(mst_root, root_bytes); 532 + ParsedCar { 533 + root: commit_cid, 534 + blocks: sparse, 535 + } 536 + } 537 + 538 + // `app.bsky.feed.post/454397e440ec` is a known layer-4 key (sits in the 539 + // MST root node). Adding `app.bsky.feed.post/aaa` (layer 0, sorts after 540 + // the layer-4 key lexicographically) creates a right child subtree. 541 + // With root-only blocks, walking produces: 542 + // Leaf(app.bsky.feed.post/454397e440ec) | MissingSubtree 543 + // so the layer-4 key has gap_after=true — the span cannot prove complete 544 + // coverage of the collection. 545 + 546 + #[tokio::test] 547 + async fn gap_suppresses_death_and_sets_needs_resync() { 548 + let parsed = make_root_only_parsed_car(&[ 549 + "app.bsky.feed.post/454397e440ec", // layer 4 — visible in root 550 + "app.bsky.feed.post/aaa", // layer 0 — hidden behind right child 551 + ]) 552 + .await; 553 + // Delete only the visible key; the hidden sibling might be a survivor. 554 + let ops = [op_delete("app.bsky.feed.post/454397e440ec")]; 555 + let ExtractResult { born, died, needs_resync } = extract(&ops, parsed).unwrap(); 556 + assert!(born.is_empty(), "unexpected birth: {born:?}"); 557 + assert!(died.is_empty(), "death declared despite gap: {died:?}"); 558 + assert!(needs_resync, "needs_resync should be set when death is unproven"); 559 + } 560 + 561 + #[tokio::test] 562 + async fn gap_does_not_suppress_birth() { 563 + let parsed = make_root_only_parsed_car(&[ 564 + "app.bsky.feed.post/454397e440ec", // layer 4 — visible in root 565 + "app.bsky.feed.post/aaa", // layer 0 — hidden behind right child 566 + ]) 567 + .await; 568 + // Create only the visible key; gap can't prove no pre-existing keys exist, 569 + // but we include the birth anyway (false positives are idempotent). 570 + let ops = [op_create("app.bsky.feed.post/454397e440ec")]; 571 + let ExtractResult { born, died, needs_resync } = extract(&ops, parsed).unwrap(); 572 + assert_eq!(born, vec![nsid("app.bsky.feed.post")], "birth should be included despite gap"); 573 + assert!(died.is_empty(), "unexpected death: {died:?}"); 574 + assert!(!needs_resync, "needs_resync should not be set for unproven birth"); 575 + } 576 + } 577 + 578 + // ============================================================================= 579 + // KeySpan::left_of / right_of unit tests 580 + // ============================================================================= 581 + 582 + #[cfg(test)] 583 + mod keyspan_tests { 584 + use super::{Existence, KeySpan}; 585 + 586 + // ── helpers ────────────────────────────────────────────────────────────── 587 + 588 + fn span(gap_before: bool, things: &[(&str, bool)]) -> KeySpan { 589 + KeySpan { 590 + gap_before, 591 + things: things.iter().map(|(k, v)| (k.to_string(), *v)).collect(), 592 + } 593 + } 594 + 595 + fn yes(s: &str) -> Existence { 596 + Existence::Yes(s.to_string()) 597 + } 598 + 599 + // ── left_of ────────────────────────────────────────────────────────────── 600 + 601 + // Empty span, no gap → nothing to the left of anything. 602 + #[test] 603 + fn left_of_empty_no_gap() { 604 + assert_eq!(span(false, &[]).left_of("a/r1"), Existence::No); 605 + } 606 + 607 + // Empty span that's all-gap → anything could be to the left. 608 + #[test] 609 + fn left_of_empty_all_gap() { 610 + assert_eq!(span(true, &[]).left_of("a/r1"), Existence::Uncertain); 611 + } 612 + 613 + // Predecessor exists with no gap after → definite left neighbour. 614 + #[test] 615 + fn left_of_predecessor_no_gap() { 616 + let s = span(false, &[("a/r1", false)]); 617 + assert_eq!(s.left_of("b/r1"), yes("a/r1")); 618 + } 619 + 620 + // Predecessor exists but has gap_after → left is uncertain. 621 + #[test] 622 + fn left_of_predecessor_gap_after() { 623 + let s = span(false, &[("a/r1", true)]); 624 + assert_eq!(s.left_of("b/r1"), Existence::Uncertain); 625 + } 626 + 627 + // Key sits before the first span entry, no gap_before → nothing to the left. 628 + #[test] 629 + fn left_of_before_first_key_no_gap_before() { 630 + let s = span(false, &[("c/r1", false)]); 631 + assert_eq!(s.left_of("a/r1"), Existence::No); 632 + } 633 + 634 + // Key sits before the first span entry but gap_before is set → uncertain. 635 + #[test] 636 + fn left_of_before_first_key_gap_before() { 637 + let s = span(true, &[("c/r1", false)]); 638 + assert_eq!(s.left_of("a/r1"), Existence::Uncertain); 639 + } 640 + 641 + // Key sits between two span entries; the one to the left has no gap_after. 642 + #[test] 643 + fn left_of_between_two_keys_no_gap() { 644 + let s = span(false, &[("a/r1", false), ("c/r1", false)]); 645 + assert_eq!(s.left_of("b/r1"), yes("a/r1")); 646 + } 647 + 648 + // Key sits between two span entries; the left one has a gap_after. 649 + #[test] 650 + fn left_of_between_two_keys_gap() { 651 + let s = span(false, &[("a/r1", true), ("c/r1", false)]); 652 + assert_eq!(s.left_of("b/r1"), Existence::Uncertain); 653 + } 654 + 655 + // `left_of` a key that itself IS in the span — range `..key` is exclusive, 656 + // so it finds what comes before it, not the key itself. 657 + #[test] 658 + fn left_of_key_itself_in_span_nothing_before() { 659 + let s = span(false, &[("b/r1", false)]); 660 + assert_eq!(s.left_of("b/r1"), Existence::No); 661 + } 662 + 663 + #[test] 664 + fn left_of_key_itself_in_span_predecessor_exists() { 665 + let s = span(false, &[("a/r1", false), ("b/r1", false)]); 666 + assert_eq!(s.left_of("b/r1"), yes("a/r1")); 667 + } 668 + 669 + // ── right_of ───────────────────────────────────────────────────────────── 670 + 671 + // Key is in the span and has a gap after it → uncertain what's to the right. 672 + #[test] 673 + fn right_of_key_in_span_gap_after() { 674 + let s = span(false, &[("a/r1", true)]); 675 + assert_eq!(s.right_of("a/r1", &Existence::No), Existence::Uncertain); 676 + } 677 + 678 + // Key is in the span, no gap after, another key follows. 679 + #[test] 680 + fn right_of_key_in_span_no_gap_next_exists() { 681 + let s = span(false, &[("a/r1", false), ("c/r1", false)]); 682 + assert_eq!(s.right_of("a/r1", &Existence::No), yes("c/r1")); 683 + } 684 + 685 + // Key is in the span, no gap after, it's the last entry. 686 + #[test] 687 + fn right_of_key_in_span_no_gap_last() { 688 + let s = span(false, &[("a/r1", false)]); 689 + assert_eq!(s.right_of("a/r1", &Existence::No), Existence::No); 690 + } 691 + 692 + // Key is NOT in the span; left says we're in a gap → uncertain. 693 + #[test] 694 + fn right_of_key_not_in_span_left_uncertain() { 695 + let s = span(false, &[("a/r1", true), ("c/r1", false)]); 696 + assert_eq!(s.right_of("b/r1", &Existence::Uncertain), Existence::Uncertain); 697 + } 698 + 699 + // Key is NOT in the span; left is definite → look for the next key to the right. 700 + #[test] 701 + fn right_of_key_not_in_span_left_certain_next_exists() { 702 + let s = span(false, &[("a/r1", false), ("c/r1", false)]); 703 + assert_eq!(s.right_of("b/r1", &yes("a/r1")), yes("c/r1")); 704 + } 705 + 706 + // Key is NOT in the span; left is No (before first entry, no gap) → look right. 707 + #[test] 708 + fn right_of_key_not_in_span_left_no_next_exists() { 709 + let s = span(false, &[("c/r1", false)]); 710 + assert_eq!(s.right_of("b/r1", &Existence::No), yes("c/r1")); 711 + } 712 + 713 + // Key is NOT in the span; left is certain but there's nothing further right. 714 + #[test] 715 + fn right_of_key_not_in_span_left_certain_no_next() { 716 + let s = span(false, &[("a/r1", false)]); 717 + assert_eq!(s.right_of("b/r1", &yes("a/r1")), Existence::No); 718 + } 719 + 720 + // Empty all-gap span: right of anything is uncertain. 721 + #[test] 722 + fn right_of_empty_all_gap() { 723 + assert_eq!( 724 + span(true, &[]).right_of("a/r1", &Existence::Uncertain), 725 + Existence::Uncertain 726 + ); 727 + } 728 + 729 + // Empty no-gap span: right of anything is No. 730 + #[test] 731 + fn right_of_empty_no_gap() { 732 + assert_eq!( 733 + span(false, &[]).right_of("a/r1", &Existence::No), 734 + Existence::No 735 + ); 736 + } 737 + 738 + // ── combined: left_of feeds right_of ───────────────────────────────────── 739 + 740 + // The typical call pattern: compute left first, then use it for right. 741 + // Span: gap | "a/r1" | "c/r1" | no-gap 742 + // For key "b/r1" (not present): left is Uncertain (gap before b), right is Uncertain. 743 + #[test] 744 + fn combined_key_in_gap_both_uncertain() { 745 + let s = span(false, &[("a/r1", true), ("c/r1", false)]); 746 + let left = s.left_of("b/r1"); 747 + assert_eq!(left, Existence::Uncertain); 748 + assert_eq!(s.right_of("b/r1", &left), Existence::Uncertain); 749 + } 750 + 751 + // Span: "a/r1" | "b/r1" | "c/r1" all no-gap. 752 + // For key "b/r1" (present): left is Yes("a/r1"), right is Yes("c/r1"). 753 + #[test] 754 + fn combined_key_present_both_certain() { 755 + let s = span(false, &[("a/r1", false), ("b/r1", false), ("c/r1", false)]); 756 + let left = s.left_of("b/r1"); 757 + assert_eq!(left, yes("a/r1")); 758 + assert_eq!(s.right_of("b/r1", &left), yes("c/r1")); 759 + } 280 760 } 281 761 282 762 // ============================================================================= ··· 307 787 308 788 #[cfg(test)] 309 789 mod fixture_tests { 310 - use super::{collect_visible_paths, extract}; 790 + use super::{collect_visible_paths, extract, ExtractResult}; 311 791 use std::collections::{BTreeMap, HashSet}; 312 792 use std::sync::Arc; 313 793 ··· 556 1036 })) 557 1037 .collect(); 558 1038 559 - let (born, died) = extract(&ops, parsed).unwrap(); 1039 + let ExtractResult { born, died, .. } = extract(&ops, parsed).unwrap(); 560 1040 // Adding to an existing app.bsky.feed.post collection: 561 1041 // the adjacent key (3lon5cqsbwrj2) must be visible → no birth. 562 1042 if !born.is_empty() || !died.is_empty() {
+1 -1
src/mst/slice_tricks.rs
··· 121 121 while let Some(item) = car.next()? { 122 122 assert!( 123 123 !matches!(item, WalkItem::Node { .. }), 124 - "car.next() does not return nodes" 124 + "car.next() does not return found nodes" 125 125 ); 126 126 let Some(key) = item.key() else { 127 127 prev_gap = true;
+22 -1
src/sync/firehose/commit_event.rs
··· 160 160 metrics::histogram!("lightrail_commit_ops").record(commit.ops.len() as f64); 161 161 162 162 // ── Collection birth/death detection ───────────────────────────────────── 163 - let (born, died) = crate::mst::mortality::extract(&commit.ops, parsed_clone)?; 163 + let crate::mst::mortality::ExtractResult { born, died, needs_resync } = 164 + crate::mst::mortality::extract(&commit.ops, parsed_clone)?; 164 165 165 166 // ── Steps 6–9: Blocking storage checks + repo_prev update ─────────────── 166 167 let db = db.clone(); ··· 186 187 new_mst_root_bytes, 187 188 born, 188 189 died, 190 + needs_resync, 189 191 pds_host, 190 192 current_mode: pds_mode, 191 193 }, ··· 362 364 new_mst_root_bytes: Vec<u8>, 363 365 born: Vec<Nsid<'static>>, 364 366 died: Vec<Nsid<'static>>, 367 + needs_resync: bool, 365 368 pds_host: Option<Host>, 366 369 current_mode: Sync11Mode, 367 370 } ··· 382 385 new_mst_root_bytes, 383 386 born, 384 387 died, 388 + needs_resync, 385 389 pds_host, 386 390 current_mode, 387 391 }: ValidationState, ··· 466 470 } 467 471 for coll in died { 468 472 storage::collection_index::remove_into(&mut batch, db, &did, coll); 473 + } 474 + 475 + // If mortality detection found a possible-but-unproven collection death, 476 + // queue a full-repo resync so the index can be reconciled once we have 477 + // a complete view of the repo's current MST. 478 + if needs_resync { 479 + storage::resync_queue::enqueue_into( 480 + &mut batch, 481 + db, 482 + crate::util::unix_now(), 483 + &crate::storage::resync_queue::ResyncItem { 484 + did: did.clone(), 485 + retry_count: 0, 486 + retry_reason: "possible collection death unproven due to MST gaps".to_string(), 487 + commit_cbor: vec![], 488 + }, 489 + ); 469 490 } 470 491 471 492 // Upgrade PDS to strict on first prevData-bearing commit — atomically with