lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

spurious checks, resync buffer, metrics

phil a092bcf4 89ce75fe

+690 -49
+1
Cargo.lock
··· 2333 2333 "rustls", 2334 2334 "rustversion", 2335 2335 "serde", 2336 + "serde_ipld_dagcbor", 2336 2337 "serde_json", 2337 2338 "thiserror 2.0.18", 2338 2339 "tokio",
+1
Cargo.toml
··· 29 29 rustls = { version = "0.23", default-features = false, features = ["aws-lc-rs"] } 30 30 rustversion = "1" 31 31 serde = { version = "1", features = ["derive"] } 32 + serde_ipld_dagcbor = "0.6" 32 33 serde_json = "1" 33 34 thiserror = "2.0.18" 34 35 tokio = { version = "1.49.0", features = ["full"] }
+10 -7
hacking.md
··· 78 78 - [-] with disk spilling for huge repo (maybe later) 79 79 - [-] with queueing resync for large repos if resources are taken?? (maybe later) 80 80 - [x] (self-reminder: get_repo should be rare in lightrail) 81 - - [ ] "deep crawl" mode for relays that listHosts -> listRepos on host instead of relying on relay listRepos 82 - - [ ] defensive loop-cursor handling 81 + - [x] prefix-merge walker (limit by total collections to be merged?) 82 + - [x] add an all-collections index 83 83 - [~] actually firehose-index!! 84 84 - [x] extract collections-added/removed directly from CAR slice 85 - - [ ] (spend some time on tests here) 85 + - [x] (spend some time on tests here) 86 86 - [x] do the thing (write them to the db) 87 - - [ ] swap in repo-stream 88 - - [~] prefix-merge walker (limit by total collections to be merged?) 89 - - [x] add an all-collections index 90 - - [ ] lenient sync1.1 87 + - [x] swap in repo-stream 88 + - [ ] actually wire in the resync buffer (oops) 89 + - [ ] "deep crawl" mode for relays that listHosts -> listRepos on host instead of relying on relay listRepos 90 + - [ ] defensive loop-cursor handling 91 + - [ ] lenient pre-sync1.1 91 92 - [ ] *don't* allow non-validating commits that look like sync1.1 92 93 - [ ] rachet by PDS host: be lenient if we have never seen a sync1.1-looking commit, always strict after we see one. 94 + - [ ] boooo we probably need *even more* special handling for pre-sync1.1 repos since they don't include adjacent keys!!! 93 95 - [ ] account status convergeance: if we receive commits from apparently-inactive accounts, should we check upstream status to make sure we're not stale? 94 96 - [ ] split the keyspace: put the rbc/cbr indexes on a second keyspace with larger block size, expect hits on main keyspace 95 97 - [ ] websocket ping/pong (unless jacquard is already doing it) 96 98 - [ ] websocket no-events-received timeout reconnect 99 + 97 100 98 101 very much still todo but i'm getting tired 99 102 - [ ] multi-relay listener
+4 -3
readme.md
··· 1 1 # lightrail: `listReposByCollection` service 2 2 3 - **status: in development** 3 + **status: almost working well but not stable yet!!** 4 4 5 5 lightrail uses the adjacent keys included in CAR slices from firehose commits to detect the first record added and last record removed from a collection in an atproto repo. 6 6 ··· 18 18 19 19 ### wishlist features (probably doable?): 20 20 21 - - accept multiple collections for `listReposbyCollection` (merge + dedup by DID; works bc key is `<collection>||<did>`) 22 - - `listReposByCollectionPrefix`, either with additional indexes up the NSID hierarchy, or via merge+dedup. 21 + - [x] DONE accept multiple collections for `listReposbyCollection` (merge + dedup by DID; works bc key is `<collection>||<did>`) 22 + - [x] DONE "wilcard" fo `listReposbyCollection` by omitting the `collection` query param entirely 23 + - ~~`listReposByCollectionPrefix`, either with additional indexes up the NSID hierarchy, or via merge+dedup.~~ not doing 23 24 - subscribe to multiple relays 24 25 - use authenticated repo contents for backfill instead of `com.atproto.repo.describeRepo` (see [./authenticated-collection-list.md](./authenticated-collection-list.md)) 25 26
+445 -9
src/mst/mortality.rs
··· 33 33 34 34 type Result<T> = std::result::Result<T, MstMortalityError>; 35 35 36 + /// Collect every MST leaf path visible in a (possibly partial) CAR. 37 + /// 38 + /// Uses `next_keys()` which silently skips subtrees whose MST node blocks are 39 + /// absent, so this works on both full and proof-only CARs. 40 + fn collect_visible_paths(parsed: jacquard_repo::car::reader::ParsedCar) -> Result<Vec<String>> { 41 + let mut car = DriverBuilder::new().load_jacquard_parsed_car(parsed)?; 42 + let mut visible = Vec::new(); 43 + while let Some((path, _)) = car.next_keys()? { 44 + visible.push(path.to_string()); 45 + } 46 + Ok(visible) 47 + } 48 + 36 49 /// Walk the partial CAR's MST to detect which collections are newly added 37 50 /// ("born") or fully removed ("died") by this commit. 38 51 /// ··· 61 74 } 62 75 63 76 // ── Walk the partial CAR's MST to collect visible leaf keys ────────────── 64 - // 65 - // next_keys() silently skips subtrees whose MST node blocks are absent, 66 - // giving us all leaves reachable through blocks that ARE in the CAR — 67 - // exactly the proof nodes for keys adjacent to the changes. 68 - let mut car = DriverBuilder::new().load_jacquard_parsed_car(parsed)?; 69 - let mut visible: Vec<String> = Vec::new(); 70 - while let Some((path, _)) = car.next_keys()? { 71 - visible.push(path.to_string()); 72 - } 77 + let visible = collect_visible_paths(parsed)?; 73 78 74 79 // ── Check collection death (all visible keys in C are being deleted) ────── 75 80 let deleted_collections: HashSet<&str> = deleted ··· 268 273 assert_eq!(died, vec![nsid("app.bsky.graph.follow")]); 269 274 } 270 275 } 276 + 277 + // ============================================================================= 278 + // Fixture-driven tests 279 + // ============================================================================= 280 + // 281 + // Two fixture sources: 282 + // 283 + // 1. `../atproto-interop-tests/firehose/commit-proof-fixtures.json` 284 + // Bluesky's interop fixture set — 6 commit scenarios with known 285 + // `blocksInProof` CIDs and expected tree state. We build the "after" MST 286 + // from scratch using jacquard_repo, filter to the fixture's proof CIDs, and 287 + // check the adjacent-key invariants that guard against spurious births and 288 + // deaths. Fixture 6 uses real `app.bsky.*` NSIDs, so we can also assert the 289 + // actual mortality result. 290 + // 291 + // 2. `../mst-test-suite/tests/diff/exhaustive/*.json` + 292 + // `../mst-test-suite/cars/exhaustive/*.car` 293 + // 16 384 exhaustive MST diff pairs. We load the "B" (after) CAR file 294 + // directly, filter its blocks to `created_nodes ∪ proof_nodes` (the minimal 295 + // firehose CAR content), and assert the key safety property: if a collection 296 + // still has records in the after-tree, at least one survivor must be visible 297 + // in the proof blocks so that our mortality logic cannot spuriously declare 298 + // the collection dead. 299 + // 300 + // Both test functions skip gracefully when the fixture directories are absent 301 + // (e.g. in CI environments that don't clone the sibling repos). 302 + 303 + #[cfg(test)] 304 + mod fixture_tests { 305 + use super::{collect_visible_paths, extract}; 306 + use std::collections::{BTreeMap, HashSet}; 307 + use std::sync::Arc; 308 + 309 + use bytes::Bytes; 310 + use cid::Cid as IpldCid; 311 + use jacquard_api::com_atproto::sync::subscribe_repos::RepoOp; 312 + use jacquard_common::CowStr; 313 + use jacquard_common::types::string::{Did, Nsid}; 314 + use jacquard_common::types::tid::Tid; 315 + use jacquard_repo::car::reader::ParsedCar; 316 + use jacquard_repo::commit::Commit; 317 + use jacquard_repo::{MemoryBlockStore, Mst}; 318 + 319 + // ── Helpers ─────────────────────────────────────────────────────────────── 320 + 321 + fn parse_cid(s: &str) -> IpldCid { 322 + s.parse().unwrap_or_else(|e| panic!("bad CID {s:?}: {e}")) 323 + } 324 + 325 + /// Add a fake commit block that points to the current MST root. 326 + /// 327 + /// After this call `parsed.root` is the commit CID and the original MST 328 + /// root CID is returned (needed to know what the commit's `data` field is). 329 + fn attach_fake_commit(parsed: &mut ParsedCar) -> IpldCid { 330 + let mst_root = parsed.root; 331 + let commit = Commit { 332 + did: Did::new_owned("did:web:example.com").unwrap(), 333 + version: 3, 334 + data: mst_root, 335 + rev: Tid::now_0(), 336 + prev: None, 337 + sig: Bytes::from(vec![0u8; 64]), 338 + }; 339 + let commit_cid = commit.to_cid().unwrap(); 340 + let commit_cbor = Bytes::from(commit.to_cbor().unwrap()); 341 + parsed.blocks.insert(commit_cid, commit_cbor); 342 + parsed.root = commit_cid; 343 + mst_root 344 + } 345 + 346 + /// Build an MST from `after_keys` (each mapped to `leaf_cid`), collect all 347 + /// its blocks, then return a `ParsedCar` filtered to `proof_cids` plus a 348 + /// fake commit block. Also returns the computed MST root CID so callers can 349 + /// verify it matches a fixture's `rootAfterCommit`. 350 + async fn build_proof_car( 351 + after_keys: &[&str], 352 + leaf_cid: IpldCid, 353 + proof_cids: &HashSet<String>, 354 + ) -> (ParsedCar, IpldCid) { 355 + let storage = Arc::new(MemoryBlockStore::new()); 356 + let mut mst = Mst::new(storage); 357 + for key in after_keys { 358 + mst = mst.add(key, leaf_cid).await.unwrap(); 359 + } 360 + let (mst_root, all_blocks) = mst.collect_blocks().await.unwrap(); 361 + 362 + let filtered: BTreeMap<IpldCid, Bytes> = all_blocks 363 + .into_iter() 364 + .filter(|(cid, _)| proof_cids.contains(&cid.to_string())) 365 + .collect(); 366 + 367 + let commit = Commit { 368 + did: Did::new_owned("did:web:example.com").unwrap(), 369 + version: 3, 370 + data: mst_root, 371 + rev: Tid::now_0(), 372 + prev: None, 373 + sig: Bytes::from(vec![0u8; 64]), 374 + }; 375 + let commit_cid = commit.to_cid().unwrap(); 376 + let mut blocks = filtered; 377 + blocks.insert(commit_cid, Bytes::from(commit.to_cbor().unwrap())); 378 + 379 + let parsed = ParsedCar { 380 + root: commit_cid, 381 + blocks, 382 + }; 383 + (parsed, mst_root) 384 + } 385 + 386 + /// Check the two safety invariants for a single commit scenario: 387 + /// 388 + /// - **No spurious death**: if collection C has survivors in the after-tree, 389 + /// at least one survivor must be visible in the proof CAR. 390 + /// - **No spurious birth**: if collection C already had records before the 391 + /// commit (i.e., there are preexisting keys in the after-tree that are not 392 + /// being created), at least one preexisting key must be visible. 393 + /// 394 + /// Returns a list of violation descriptions (empty → all good). 395 + fn check_invariants( 396 + adds: &[&str], 397 + dels: &[&str], 398 + after_keys: &[&str], 399 + visible: &[String], 400 + ) -> Vec<String> { 401 + let adds_set: HashSet<&str> = adds.iter().copied().collect(); 402 + let dels_set: HashSet<&str> = dels.iter().copied().collect(); 403 + let visible_set: HashSet<&str> = visible.iter().map(String::as_str).collect(); 404 + let mut violations = Vec::new(); 405 + 406 + // Spurious death: a collection still has survivors but none are visible. 407 + for del_path in dels { 408 + let (coll, _) = del_path.split_once('/').unwrap(); 409 + let prefix = format!("{coll}/"); 410 + let survivors: Vec<&str> = after_keys 411 + .iter() 412 + .copied() 413 + .filter(|k| k.starts_with(&prefix) && !dels_set.contains(k)) 414 + .collect(); 415 + if survivors.is_empty() { 416 + continue; // collection legitimately died 417 + } 418 + let visible_survivor = visible_set 419 + .iter() 420 + .any(|v| v.starts_with(&prefix) && !dels_set.contains(*v)); 421 + if !visible_survivor { 422 + violations.push(format!( 423 + "spurious death possible: collection '{coll}' has survivors \ 424 + {survivors:?} but none appear in proof (visible={visible:?})" 425 + )); 426 + } 427 + } 428 + 429 + // Spurious birth: a collection existed before the commit (has preexisting 430 + // keys in the after-tree) but no preexisting key is visible. 431 + for add_path in adds { 432 + let (coll, _) = add_path.split_once('/').unwrap(); 433 + let prefix = format!("{coll}/"); 434 + let preexisting: Vec<&str> = after_keys 435 + .iter() 436 + .copied() 437 + .filter(|k| k.starts_with(&prefix) && !adds_set.contains(k)) 438 + .collect(); 439 + if preexisting.is_empty() { 440 + continue; // collection genuinely new 441 + } 442 + let visible_preexisting = visible_set 443 + .iter() 444 + .any(|v| v.starts_with(&prefix) && !adds_set.contains(*v)); 445 + if !visible_preexisting { 446 + violations.push(format!( 447 + "spurious birth possible: collection '{coll}' has preexisting \ 448 + keys {preexisting:?} but none appear in proof (visible={visible:?})" 449 + )); 450 + } 451 + } 452 + 453 + violations 454 + } 455 + 456 + // ── atproto-interop-tests ───────────────────────────────────────────────── 457 + 458 + /// Leaf CID shared across all atproto-interop-tests fixtures. 459 + const LEAF_CID_STR: &str = "bafyreie5cvv4h45feadgeuwhbcutmh6t2ceseocckahdoe6uat64zmz454"; 460 + 461 + #[tokio::test] 462 + async fn atproto_interop_fixtures() { 463 + let fixtures_path = std::path::Path::new(env!("CARGO_MANIFEST_DIR")) 464 + .join("../atproto-interop-tests/firehose/commit-proof-fixtures.json"); 465 + 466 + if !fixtures_path.exists() { 467 + println!("SKIP: atproto-interop-tests not found at {fixtures_path:?}"); 468 + return; 469 + } 470 + 471 + let leaf_cid = parse_cid(LEAF_CID_STR); 472 + let fixtures: Vec<serde_json::Value> = 473 + serde_json::from_str(&std::fs::read_to_string(&fixtures_path).unwrap()).unwrap(); 474 + 475 + let mut failures: Vec<String> = Vec::new(); 476 + 477 + for fixture in &fixtures { 478 + let comment = fixture["comment"].as_str().unwrap(); 479 + 480 + let keys: Vec<&str> = fixture["keys"] 481 + .as_array() 482 + .unwrap() 483 + .iter() 484 + .map(|v| v.as_str().unwrap()) 485 + .collect(); 486 + let adds: Vec<&str> = fixture["adds"] 487 + .as_array() 488 + .unwrap() 489 + .iter() 490 + .map(|v| v.as_str().unwrap()) 491 + .collect(); 492 + let dels: Vec<&str> = fixture["dels"] 493 + .as_array() 494 + .unwrap() 495 + .iter() 496 + .map(|v| v.as_str().unwrap()) 497 + .collect(); 498 + let expected_root = fixture["rootAfterCommit"].as_str().unwrap(); 499 + let proof_cids: HashSet<String> = fixture["blocksInProof"] 500 + .as_array() 501 + .unwrap() 502 + .iter() 503 + .map(|v| v.as_str().unwrap().to_string()) 504 + .collect(); 505 + 506 + // Build after-tree: keys + adds - dels 507 + let dels_set: HashSet<&str> = dels.iter().copied().collect(); 508 + let mut after_keys: Vec<&str> = keys 509 + .iter() 510 + .copied() 511 + .filter(|k| !dels_set.contains(k)) 512 + .collect(); 513 + after_keys.extend(adds.iter().copied()); 514 + after_keys.sort_unstable(); 515 + 516 + let (parsed, computed_root) = build_proof_car(&after_keys, leaf_cid, &proof_cids).await; 517 + 518 + // Sanity: verify our MST matches the fixture. 519 + if computed_root.to_string() != expected_root { 520 + failures.push(format!( 521 + "'{comment}': MST root mismatch \ 522 + (got {computed_root}, expected {expected_root}) — \ 523 + jacquard_repo may be incompatible with this fixture" 524 + )); 525 + continue; 526 + } 527 + 528 + let visible = collect_visible_paths(parsed.clone()).unwrap(); 529 + let violations = check_invariants(&adds, &dels, &after_keys, &visible); 530 + for v in violations { 531 + failures.push(format!("'{comment}': {v}")); 532 + } 533 + 534 + // Fixture 6 uses real app.bsky.* NSIDs — check the actual result. 535 + if comment == "split with earlier leaves on same layer" { 536 + let ops: Vec<_> = adds 537 + .iter() 538 + .map(|p| RepoOp { 539 + action: CowStr::Owned("create".into()), 540 + path: CowStr::Owned((*p).into()), 541 + cid: None, 542 + prev: None, 543 + extra_data: Default::default(), 544 + }) 545 + .chain(dels.iter().map(|p| RepoOp { 546 + action: CowStr::Owned("delete".into()), 547 + path: CowStr::Owned((*p).into()), 548 + cid: None, 549 + prev: None, 550 + extra_data: Default::default(), 551 + })) 552 + .collect(); 553 + 554 + let (born, died) = extract(&ops, parsed).unwrap(); 555 + // Adding to an existing app.bsky.feed.post collection: 556 + // the adjacent key (3lon5cqsbwrj2) must be visible → no birth. 557 + if !born.is_empty() || !died.is_empty() { 558 + failures.push(format!( 559 + "'{comment}': expected born=[], died=[] \ 560 + but got born={born:?}, died={died:?}; \ 561 + visible={visible:?}" 562 + )); 563 + } 564 + } 565 + } 566 + 567 + assert!( 568 + failures.is_empty(), 569 + "atproto-interop fixture violations:\n{}", 570 + failures.join("\n") 571 + ); 572 + } 573 + 574 + // ── mst-test-suite ──────────────────────────────────────────────────────── 575 + 576 + /// Property test over all 16 384 exhaustive MST diff fixtures. 577 + /// 578 + /// Loads the "B" (after-commit) CAR directly, filters its blocks to 579 + /// `created_nodes ∪ proof_nodes` (the blocks a compliant firehose sender 580 + /// must include), and asserts that the "no spurious death" invariant holds: 581 + /// if a collection still has surviving records in B, at least one survivor 582 + /// must be visible in the partial proof CAR. 583 + #[tokio::test] 584 + async fn mst_suite_no_spurious_death() { 585 + let suite_dir = std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("../mst-test-suite"); 586 + 587 + if !suite_dir.exists() { 588 + println!("SKIP: mst-test-suite not found at {suite_dir:?}"); 589 + return; 590 + } 591 + 592 + let fixture_dir = suite_dir.join("tests/diff/exhaustive"); 593 + let mut entries: Vec<_> = std::fs::read_dir(&fixture_dir) 594 + .unwrap() 595 + .filter_map(|e| e.ok()) 596 + .filter(|e| e.path().extension().map_or(false, |x| x == "json")) 597 + .collect(); 598 + entries.sort_by_key(|e| e.path()); 599 + 600 + let mut checked = 0usize; 601 + let mut failures: Vec<String> = Vec::new(); 602 + 603 + for entry in &entries { 604 + let text = std::fs::read_to_string(entry.path()).unwrap(); 605 + let fixture: serde_json::Value = serde_json::from_str(&text).unwrap(); 606 + let results = &fixture["results"]; 607 + 608 + let deleted_rpaths: Vec<&str> = results["record_ops"] 609 + .as_array() 610 + .unwrap() 611 + .iter() 612 + .filter(|op| op["new_value"].is_null()) 613 + .map(|op| op["rpath"].as_str().unwrap()) 614 + .collect(); 615 + 616 + if deleted_rpaths.is_empty() { 617 + continue; // no deletions → no possible spurious deaths 618 + } 619 + 620 + // Load the B (after) CAR. 621 + let b_car_rel = fixture["inputs"]["mst_b"].as_str().unwrap(); 622 + let b_car_path = suite_dir.join(b_car_rel.trim_start_matches("./")); 623 + let b_car_bytes = std::fs::read(&b_car_path).unwrap(); 624 + let mut full_parsed = jacquard_repo::car::parse_car_bytes(&b_car_bytes) 625 + .await 626 + .unwrap(); 627 + 628 + // Attach a fake commit so repo-stream can load it. 629 + attach_fake_commit(&mut full_parsed); 630 + let commit_cid = full_parsed.root; 631 + 632 + // Walk the full B tree to collect all surviving keys. 633 + let all_keys: HashSet<String> = collect_visible_paths(full_parsed.clone()) 634 + .unwrap() 635 + .into_iter() 636 + .collect(); 637 + 638 + // Build the proof-filtered partial CAR. 639 + let proof_cid_set: HashSet<String> = results["created_nodes"] 640 + .as_array() 641 + .unwrap() 642 + .iter() 643 + .chain(results["proof_nodes"].as_array().unwrap().iter()) 644 + .map(|v| v.as_str().unwrap().to_string()) 645 + .collect(); 646 + 647 + let partial_blocks: BTreeMap<IpldCid, Bytes> = full_parsed 648 + .blocks 649 + .into_iter() 650 + .filter(|(cid, _)| proof_cid_set.contains(&cid.to_string()) || *cid == commit_cid) 651 + .collect(); 652 + let partial_parsed = ParsedCar { 653 + root: commit_cid, 654 + blocks: partial_blocks, 655 + }; 656 + 657 + let visible: HashSet<String> = collect_visible_paths(partial_parsed) 658 + .unwrap() 659 + .into_iter() 660 + .collect(); 661 + 662 + // Invariant: for every deleted key, if the collection has survivors 663 + // in the after-tree, at least one must be visible in the proof CAR. 664 + for del in &deleted_rpaths { 665 + let (coll, _) = del.split_once('/').unwrap(); 666 + let prefix = format!("{coll}/"); 667 + 668 + let survivors: Vec<&str> = all_keys 669 + .iter() 670 + .filter(|k| k.starts_with(&prefix)) 671 + .map(String::as_str) 672 + .collect(); 673 + 674 + if survivors.is_empty() { 675 + continue; // collection truly died; fine 676 + } 677 + 678 + let visible_survivor = visible.iter().any(|v| v.starts_with(&prefix)); 679 + if !visible_survivor { 680 + let name = entry.path(); 681 + let name = name.file_name().unwrap().to_string_lossy(); 682 + failures.push(format!( 683 + "{name}: deleting '{del}' — collection '{coll}' has survivors \ 684 + {survivors:?} but none visible in proof (visible keys: {visible:?})" 685 + )); 686 + } 687 + } 688 + 689 + checked += 1; 690 + } 691 + 692 + assert!( 693 + checked > 0, 694 + "no fixtures with deletions found in {fixture_dir:?}" 695 + ); 696 + assert!( 697 + failures.is_empty(), 698 + "{} spurious-death invariant violations across {checked} fixtures:\n{}", 699 + failures.len(), 700 + failures.join("\n") 701 + ); 702 + println!( 703 + "mst_suite_no_spurious_death: checked {checked} fixtures with deletions — all passed" 704 + ); 705 + } 706 + }
+1 -1
src/storage/repo.rs
··· 72 72 } 73 73 74 74 impl RepoState { 75 - pub fn as_str(&self) -> &str { 75 + pub fn as_str(&self) -> &'static str { 76 76 match self { 77 77 RepoState::Pending => "pending", 78 78 RepoState::Desynchronized => "desynchronized",
+126 -20
src/sync/firehose/commit_event.rs
··· 24 24 use jacquard_common::types::{string::Did, string::Nsid, tid::Tid}; 25 25 use jacquard_repo::mst::VerifiedWriteOp; 26 26 use jacquard_repo::{MemoryBlockStore, Mst}; 27 - use tracing::{debug, info, warn}; 27 + use tracing::{debug, error, info, warn}; 28 28 29 29 use super::validate::{self, CarDrop}; 30 30 use crate::identity::Resolver; 31 31 use crate::storage::{ 32 32 self, DbRef, 33 33 repo::{AccountStatus, RepoInfo, RepoPrev, RepoState}, 34 - resync_queue::ResyncItem, 35 34 }; 36 35 37 36 // --------------------------------------------------------------------------- 38 37 // Public entry point 39 38 // --------------------------------------------------------------------------- 40 39 41 - pub(super) async fn process_commit_event( 40 + pub(crate) async fn process_commit_event( 42 41 commit: Box<Commit<'static>>, 42 + seq: i64, 43 43 resolver: &Resolver, 44 44 db: &DbRef, 45 45 ) -> crate::error::Result<()> { ··· 47 47 48 48 // ── Step 1: Resolve DID ────────────────────────────────────────────────── 49 49 let Some(resolved) = validate::resolve(&did, resolver, "commit").await else { 50 + metrics::counter!("lightrail_commit_dropped_total", "reason" => "did_resolution_failed") 51 + .increment(1); 50 52 return Ok(()); 51 53 }; 52 54 55 + // ── Step 2: Account status + desync state ──────────────────────────────── 56 + let rev = commit.rev.clone(); 57 + let db2 = db.clone(); 58 + let did2 = did.clone(); 59 + let step2 = 60 + tokio::task::spawn_blocking(move || check_step2_blocking(&db2, did2, &rev)).await??; 61 + let (info, prev) = match step2 { 62 + Step2Result::Proceed(info, prev) => (info, prev), 63 + Step2Result::Drop => return Ok(()), 64 + Step2Result::Buffer => { 65 + // Repo is mid-resync. Serialize the commit and buffer it so it can 66 + // be replayed after the resync fetch completes. 67 + let cbor = serde_ipld_dagcbor::to_vec(&*commit) 68 + .map_err(|e| crate::error::Error::Other(format!("commit cbor encode: {e}")))?; 69 + let seq_u64 = seq as u64; 70 + let db_buf = db.clone(); 71 + let did_buf = did.clone(); 72 + tokio::task::spawn_blocking(move || { 73 + storage::resync_buffer::push_buffer(&db_buf, did_buf, seq_u64, &cbor) 74 + }) 75 + .await??; 76 + metrics::counter!("lightrail_commit_buffered_total").increment(1); 77 + return Ok(()); 78 + } 79 + }; 80 + 53 81 // ── Steps 3–4: CAR parse + signature verification + field consistency ──── 54 82 let (new_mst_root_bytes, mst_root_cid, parsed) = match validate_car(&commit, &resolved.pubkey) 55 83 .await ··· 105 133 // ── Collection birth/death detection ───────────────────────────────────── 106 134 let (born, died) = crate::mst::mortality::extract(&commit.ops, parsed_clone)?; 107 135 108 - // ── Steps 2, 6–9: Blocking storage checks + repo_prev update ──────────── 136 + // ── Steps 6–9: Blocking storage checks + repo_prev update ─────────────── 109 137 let db = db.clone(); 110 138 let rev = commit.rev.clone(); 111 139 let since = commit.since.clone(); ··· 120 148 process_blocking( 121 149 &db, 122 150 did, 151 + info, 152 + prev, 123 153 rev, 124 154 since, 125 155 incoming_prev_data, ··· 276 306 // Storage checks (blocking) 277 307 // --------------------------------------------------------------------------- 278 308 279 - /// Perform the storage-backed validation steps (2, 6–9) and, if all pass, 309 + /// Outcome of the step-2 storage check. 310 + enum Step2Result { 311 + /// All good — proceed with signature verification and the rest of the pipeline. 312 + Proceed(RepoInfo, Option<RepoPrev>), 313 + /// Drop this event (inactive account, desynchronized state, stale rev, etc.). 314 + Drop, 315 + /// Repo is mid-resync. Caller should buffer the commit for replay. 316 + Buffer, 317 + } 318 + 319 + /// Step 2: load the repo state and decide how to handle this commit. 320 + /// 321 + /// - Unknown repo: creates a `Desynchronized` entry and enqueues a resync so 322 + /// we discover repos that appear on the firehose before backfill reaches them. 323 + /// - `Resyncing` repo: returns `Step2Result::Buffer` — the commit must be 324 + /// stored in the resync buffer and replayed after the fetch completes. 325 + /// - Inactive / desynchronized / stale / future rev: returns `Step2Result::Drop`. 326 + /// - All clear: returns `Step2Result::Proceed(info, prev)` so the caller can 327 + /// pass the pre-loaded repo state to `process_blocking`. 328 + fn check_step2_blocking( 329 + db: &DbRef, 330 + did: Did<'static>, 331 + rev: &Tid, 332 + ) -> crate::error::Result<Step2Result> { 333 + let Some((info, prev)) = storage::repo::get(db, &did)? else { 334 + // Unknown repo — create an entry and enqueue for initial fetch so that 335 + // repos appearing on the firehose before backfill reaches them are not 336 + // silently skipped. 337 + let mut batch = db.database.batch(); 338 + storage::repo::put_info_into( 339 + &mut batch, 340 + db, 341 + &did, 342 + &RepoInfo { 343 + state: RepoState::Desynchronized, 344 + status: AccountStatus::Active, 345 + error: None, 346 + }, 347 + ); 348 + storage::resync_queue::enqueue_into( 349 + &mut batch, 350 + db, 351 + crate::util::unix_now(), 352 + &crate::storage::resync_queue::ResyncItem { 353 + did, 354 + retry_count: 0, 355 + retry_reason: "first firehose event for unknown repo".to_string(), 356 + commit_cbor: vec![], 357 + }, 358 + ); 359 + batch 360 + .commit() 361 + .map_err(Into::<crate::storage::StorageError>::into)?; 362 + metrics::counter!("lightrail_commit_dropped_total", "reason" => "unknown_repo") 363 + .increment(1); 364 + return Ok(Step2Result::Drop); 365 + }; 366 + if info.state == RepoState::Resyncing { 367 + return Ok(Step2Result::Buffer); 368 + } 369 + if validate::should_drop(&info, prev.as_ref(), rev, "commit", &did) { 370 + return Ok(Step2Result::Drop); 371 + } 372 + Ok(Step2Result::Proceed(info, prev)) 373 + } 374 + 375 + /// Perform the storage-backed validation steps (6–9) and, if all pass, 280 376 /// persist the updated `repo_prev`. 377 + /// 378 + /// `info` and `prev` are pre-loaded by [`check_step2_blocking`] (step 2). 281 379 fn process_blocking( 282 380 db: &DbRef, 283 381 did: Did<'static>, 382 + info: RepoInfo, 383 + prev: Option<RepoPrev>, 284 384 rev: Tid, 285 385 since: Option<Tid>, 286 386 incoming_prev_data: Option<Vec<u8>>, ··· 288 388 born: Vec<Nsid<'static>>, 289 389 died: Vec<Nsid<'static>>, 290 390 ) -> crate::error::Result<()> { 291 - // Load the current repo state and chain tip (may be absent for new repos). 292 - let (info, prev) = match storage::repo::get(db, &did)? { 293 - Some(r) => r, 294 - None => return Ok(()), // repo not indexed by us; drop silently 295 - }; 296 - 297 - // Steps 2, 6, 7: shared drop checks. 298 - if validate::should_drop(&info, prev.as_ref(), &rev, "commit", &did) { 299 - return Ok(()); 300 - } 301 - 302 391 if let Some(prev) = &prev { 303 392 // Step 8: `since` must match repo_prev's rev (the previous rev in the chain). 304 393 if let Some(since) = &since ··· 310 399 warn!(did = %did.as_str(), 311 400 commit_since = since.as_str(), prev_rev = prev.rev.as_str(), 312 401 "commit dropped: since/rev mismatch; queueing resync"); 313 - return enqueue_desync(db, did, info.status); 402 + return enqueue_desync(db, did, info.status, "since/rev mismatch"); 314 403 } 315 404 316 405 // Step 9: `prevData` must match the MST root stored from the last commit. ··· 322 411 .increment(1); 323 412 warn!(did = %did.as_str(), 324 413 "commit dropped: prevData mismatch; queueing resync"); 325 - return enqueue_desync(db, did, info.status); 414 + return enqueue_desync(db, did, info.status, "prevData mismatch"); 326 415 } 327 416 } 328 417 ··· 359 448 }, 360 449 ); 361 450 for coll in born { 451 + // TODO(temporary): detect spurious births to confirm pre-sync1.1 hypothesis 452 + if storage::collection_index::has_collection(db, &did, coll.clone())? { 453 + if incoming_prev_data.is_some() { 454 + error!( 455 + did = %did, 456 + collection = coll.as_str(), 457 + "spurious birth on sync1.1 commit — collection already indexed (unexpected)" 458 + ); 459 + } else { 460 + warn!( 461 + did = %did, 462 + collection = coll.as_str(), 463 + "spurious birth on pre-sync1.1 commit (no prevData) — expected" 464 + ); 465 + } 466 + } 362 467 storage::collection_index::insert_into(&mut batch, db, &did, coll.clone()); 363 468 } 364 469 for coll in died { ··· 386 491 db: &DbRef, 387 492 did: Did<'static>, 388 493 existing_status: AccountStatus, 494 + reason: &str, 389 495 ) -> crate::error::Result<()> { 390 496 let mut batch = db.database.batch(); 391 497 storage::repo::put_info_into( ··· 402 508 &mut batch, 403 509 db, 404 510 crate::util::unix_now(), 405 - &ResyncItem { 511 + &crate::storage::resync_queue::ResyncItem { 406 512 did, 407 513 retry_count: 0, 408 - retry_reason: "commit chain discontinuity".to_string(), 514 + retry_reason: reason.to_string(), 409 515 commit_cbor: vec![], 410 516 }, 411 517 );
+1 -1
src/sync/firehose/event_dispatcher.rs
··· 329 329 resolver: Arc<crate::identity::Resolver>, 330 330 db: DbRef, 331 331 ) -> CommitWorkerResult { 332 - let outcome = super::commit_event::process_commit_event(commit, &resolver, &db) 332 + let outcome = super::commit_event::process_commit_event(commit, seq, &resolver, &db) 333 333 .await 334 334 .map_err(|e| e.to_string()); 335 335 CommitWorkerResult { did, seq, outcome }
+1 -1
src/sync/firehose/mod.rs
··· 20 20 //! successfully processed event. 21 21 22 22 mod account_event; 23 - mod commit_event; 23 + pub(crate) mod commit_event; 24 24 mod event_dispatcher; 25 25 mod identity_event; 26 26 mod sync_event;
+12 -3
src/sync/firehose/validate.rs
··· 6 6 //! ([`fresh_key_after_sig_failure`]). 7 7 //! - **Step 3** — CAR commit block deserialization, signature verification, and 8 8 //! DID/rev field consistency ([`deserialize_and_verify`]). 9 - //! - **Steps 2, 6, 7** — Account-status, stale-rev, and future-rev storage 10 - //! checks ([`should_drop`]). 9 + //! - **Steps 2, 6, 7** — Account-status, desync-state, stale-rev, and 10 + //! future-rev storage checks ([`should_drop`]). 11 11 //! 12 12 //! Event-type-specific logic (how to locate the commit block in the CAR, what 13 13 //! value to return, steps 8–9 for commits) remains in the respective modules. ··· 18 18 use tracing::{debug, warn}; 19 19 20 20 use crate::identity::{CachedIdentity, Resolver}; 21 - use crate::storage::repo::{RepoInfo, RepoPrev}; 21 + use crate::storage::repo::{RepoInfo, RepoPrev, RepoState}; 22 22 23 23 /// How many seconds in the future a `rev` timestamp may be before the event 24 24 /// is dropped as implausibly future-dated. ··· 179 179 .increment(1); 180 180 debug!(did = %did, status = info.status.as_str(), 181 181 "{label} dropped: account not active"); 182 + return true; 183 + } 184 + 185 + if info.state == RepoState::Desynchronized || info.state == RepoState::Resyncing { 186 + metrics::counter!("lightrail_event_dropped_total", 187 + "event_type" => label, "reason" => info.state.as_str()) 188 + .increment(1); 189 + debug!(did = %did, state = info.state.as_str(), 190 + "{label} dropped: repo awaiting resync"); 182 191 return true; 183 192 } 184 193
+88 -4
src/sync/resync/dispatcher.rs
··· 10 10 //! after the cursor in the queue and are never skipped. The busy set prevents 11 11 //! claiming a new entry for a DID whose previous entry is still being processed. 12 12 13 - use jacquard_common::types::string::Did; 13 + use jacquard_api::com_atproto::sync::subscribe_repos::Commit; 14 + use jacquard_common::{IntoStatic, types::string::Did}; 14 15 use std::collections::{HashMap, HashSet}; 15 16 use std::time::{Duration, Instant}; 16 17 ··· 206 207 } 207 208 } 208 209 Ok(outcome) => { 209 - if let Err(e) = handle_completion(did.clone(), outcome, db.clone()).await { 210 + if let Err(e) = 211 + handle_completion(did.clone(), outcome, db.clone(), resolver.clone()).await 212 + { 210 213 error!(error = %e, did = %did, "error handling worker completion"); 211 214 } 212 215 } ··· 300 303 // Completion handling 301 304 // --------------------------------------------------------------------------- 302 305 303 - async fn handle_completion(did: Did<'static>, outcome: WorkerOutcome, db: DbRef) -> Result<()> { 306 + async fn handle_completion( 307 + did: Did<'static>, 308 + outcome: WorkerOutcome, 309 + db: DbRef, 310 + resolver: std::sync::Arc<crate::identity::Resolver>, 311 + ) -> Result<()> { 304 312 match outcome { 305 313 WorkerOutcome::Success => { 306 314 metrics::counter!("lightrail_resync_completed_total", "outcome" => "success") 307 315 .increment(1); 308 - transition_state(db, did, RepoState::Active, None).await?; 316 + // Transition to Active first so buffered commits see the correct 317 + // state when they're replayed through the normal pipeline. 318 + transition_state(db.clone(), did.clone(), RepoState::Active, None).await?; 319 + replay_buffered_commits(did, db, &resolver).await?; 309 320 } 310 321 WorkerOutcome::Retry { error, retry_count } => { 311 322 metrics::counter!("lightrail_resync_completed_total", "outcome" => "retry") ··· 357 368 unreachable!("RateLimited handled before handle_completion") 358 369 } 359 370 } 371 + Ok(()) 372 + } 373 + 374 + /// After a successful resync, drain the commit buffer and replay each event. 375 + /// 376 + /// Commits are deserialized from DAG-CBOR and processed through the normal 377 + /// pipeline. The repo state is already `Active` at this point, so step 2 will 378 + /// pass and chain-continuity checks (steps 8–9) will validate as usual. Any 379 + /// commit that fails is skipped (it will be caught as a chain discontinuity 380 + /// and trigger a new resync if needed). Each entry is acked after processing 381 + /// regardless of outcome to prevent the buffer from growing indefinitely. 382 + async fn replay_buffered_commits( 383 + did: Did<'static>, 384 + db: DbRef, 385 + resolver: &crate::identity::Resolver, 386 + ) -> Result<()> { 387 + let did_scan = did.clone(); 388 + let db_scan = db.clone(); 389 + let events = tokio::task::spawn_blocking(move || { 390 + crate::storage::resync_buffer::scan_buffer(&db_scan, did_scan) 391 + }) 392 + .await 393 + .map_err(|e| ResyncError::TaskPanic(e.to_string()))??; 394 + 395 + if events.is_empty() { 396 + return Ok(()); 397 + } 398 + 399 + info!(did = %did, count = events.len(), "replaying buffered commits after resync"); 400 + 401 + for event in events { 402 + let seq = event.seq; 403 + 404 + // Deserialize borrowing from `event.cbor`, then convert to owned via 405 + // IntoStatic so the commit lifetime is independent of the buffer bytes. 406 + let commit: Box<Commit<'static>> = 407 + match serde_ipld_dagcbor::from_slice::<Commit<'_>>(&event.cbor) 408 + .map(IntoStatic::into_static) 409 + { 410 + Ok(c) => Box::new(c), 411 + Err(e) => { 412 + warn!(did = %did, seq, error = %e, 413 + "failed to deserialize buffered commit; skipping"); 414 + // Ack to avoid the entry accumulating across future resyncs. 415 + let did_ack = did.clone(); 416 + let db_ack = db.clone(); 417 + let _ = tokio::task::spawn_blocking(move || { 418 + crate::storage::resync_buffer::ack_buffer_entry(&db_ack, did_ack, seq) 419 + }) 420 + .await; 421 + continue; 422 + } 423 + }; 424 + 425 + if let Err(e) = crate::sync::firehose::commit_event::process_commit_event( 426 + commit, seq as i64, resolver, &db, 427 + ) 428 + .await 429 + { 430 + warn!(did = %did, seq, error = %e, "buffered commit replay failed; skipping"); 431 + } 432 + 433 + let did_ack = did.clone(); 434 + let db_ack = db.clone(); 435 + if let Err(e) = tokio::task::spawn_blocking(move || { 436 + crate::storage::resync_buffer::ack_buffer_entry(&db_ack, did_ack, seq) 437 + }) 438 + .await 439 + { 440 + warn!(did = %did, seq, error = %e, "failed to ack buffered commit"); 441 + } 442 + } 443 + 360 444 Ok(()) 361 445 } 362 446