···9191 - [ ] *don't* allow non-validating commits that look like sync1.1
9292 - [ ] rachet by PDS host: be lenient if we have never seen a sync1.1-looking commit, always strict after we see one.
9393- [ ] account status convergeance: if we receive commits from apparently-inactive accounts, should we check upstream status to make sure we're not stale?
9494+- [ ] split the keyspace: put the rbc/cbr indexes on a second keyspace with larger block size, expect hits on main keyspace
9595+- [ ] websocket ping/pong (unless jacquard is already doing it)
9696+- [ ] websocket no-events-received timeout reconnect
94979598very much still todo but i'm getting tired
9699- [ ] multi-relay listener
···98101- [ ] admin view of backfill state etc
99102- [ ] vanity stats for optimizations, like how many in-flight repos were saved from resync due to high-water-mark firehose cursor persistence
100103- [ ] if the upstream is a PDS (check with describeServer?) then make only accept events for DIDs that have it as their PDS
101101-- [ ] split the keyspace: put the rbc/cbr indexes on a second keyspace with larger block size, expect hits on main keyspace
102104103105## some choices
104106
···11//! mst (merkle search tree, the atproto repo structure) utils
2233pub mod collections;
44+pub mod mortality;
+270
src/mst/mortality.rs
···11+//! Collection birth/death detection from a partial CAR slice.
22+//!
33+//! ATProto firehose commit CARs include MST proof nodes for each changed key.
44+//! Those proof nodes usually include the immediately adjacent keys (left and
55+//! right neighbours in sorted order) for each change. By walking the partial
66+//! CAR we collect all visible leaf keys: the changed keys themselves plus
77+//! (usually) their in-collection neighbours.
88+//!
99+//! A collection **dies** when every visible key in it is being deleted — all
1010+//! included adjacent keys are not in its collection.
1111+//!
1212+//! A collection is **born** when every visible key in it is being created (no
1313+//! neighbours share its collection name). Rarely a neighbour in the same
1414+//! collection might be present in the repo but not in the proof, producing a
1515+//! spurious re-birth; that is harmless (the index entry already exists).
1616+//!
1717+//! Multi-op commits are handled because all ops are considered together: a key
1818+//! is a "survivor" only if it is visible AND not in the deleted set.
1919+2020+use std::collections::HashSet;
2121+2222+use jacquard_api::com_atproto::sync::subscribe_repos::RepoOp;
2323+use jacquard_common::types::string::Nsid;
2424+use repo_stream::{DriverBuilder, JacquardLoadError, WalkError};
2525+2626+#[derive(Debug, thiserror::Error)]
2727+pub enum MstMortalityError {
2828+ #[error("failed to load CAR: {0}")]
2929+ Load(#[from] JacquardLoadError),
3030+ #[error("MST walk error: {0}")]
3131+ Walk(#[from] WalkError),
3232+}
3333+3434+type Result<T> = std::result::Result<T, MstMortalityError>;
3535+3636+/// Walk the partial CAR's MST to detect which collections are newly added
3737+/// ("born") or fully removed ("died") by this commit.
3838+///
3939+/// Returns `(born, died)` — both lists may be empty.
4040+pub fn extract(
4141+ ops: &[RepoOp<'_>],
4242+ parsed: jacquard_repo::car::reader::ParsedCar,
4343+) -> Result<(Vec<Nsid<'static>>, Vec<Nsid<'static>>)> {
4444+ // ── Build create/delete path sets ────────────────────────────────────────
4545+ let mut created: HashSet<String> = HashSet::new();
4646+ let mut deleted: HashSet<String> = HashSet::new();
4747+ for op in ops {
4848+ match op.action.as_ref() {
4949+ "create" => {
5050+ created.insert(op.path.to_string());
5151+ }
5252+ "delete" => {
5353+ deleted.insert(op.path.to_string());
5454+ }
5555+ _ => {} // updates don't affect collection mortality
5656+ }
5757+ }
5858+5959+ if created.is_empty() && deleted.is_empty() {
6060+ return Ok((vec![], vec![]));
6161+ }
6262+6363+ // ── Walk the partial CAR's MST to collect visible leaf keys ──────────────
6464+ //
6565+ // next_keys() silently skips subtrees whose MST node blocks are absent,
6666+ // giving us all leaves reachable through blocks that ARE in the CAR —
6767+ // exactly the proof nodes for keys adjacent to the changes.
6868+ let mut car = DriverBuilder::new().load_jacquard_parsed_car(parsed)?;
6969+ let mut visible: Vec<String> = Vec::new();
7070+ while let Some((path, _)) = car.next_keys()? {
7171+ visible.push(path.to_string());
7272+ }
7373+7474+ // ── Check collection death (all visible keys in C are being deleted) ──────
7575+ let deleted_collections: HashSet<&str> = deleted
7676+ .iter()
7777+ .filter_map(|p| p.split_once('/').map(|(c, _)| c))
7878+ .collect();
7979+8080+ let mut died: Vec<Nsid<'static>> = Vec::new();
8181+ for coll in deleted_collections {
8282+ let prefix = format!("{coll}/");
8383+ let has_survivor = visible
8484+ .iter()
8585+ .any(|k| k.starts_with(&prefix) && !deleted.contains(k.as_str()));
8686+ if !has_survivor && let Ok(nsid) = Nsid::new_owned(coll) {
8787+ died.push(nsid);
8888+ }
8989+ }
9090+9191+ // ── Check collection birth (all visible keys in C are being created) ──────
9292+ let created_collections: HashSet<&str> = created
9393+ .iter()
9494+ .filter_map(|p| p.split_once('/').map(|(c, _)| c))
9595+ .collect();
9696+9797+ let mut born: Vec<Nsid<'static>> = Vec::new();
9898+ for coll in created_collections {
9999+ let prefix = format!("{coll}/");
100100+ let has_preexisting = visible
101101+ .iter()
102102+ .any(|k| k.starts_with(&prefix) && !created.contains(k.as_str()));
103103+ if !has_preexisting && let Ok(nsid) = Nsid::new_owned(coll) {
104104+ born.push(nsid);
105105+ }
106106+ }
107107+108108+ Ok((born, died))
109109+}
110110+111111+#[cfg(test)]
112112+mod tests {
113113+ use super::*;
114114+ use std::sync::Arc;
115115+116116+ use bytes::Bytes;
117117+ use jacquard_common::CowStr;
118118+ use jacquard_common::types::string::Did;
119119+ use jacquard_common::types::tid::Tid;
120120+ use jacquard_repo::commit::Commit;
121121+ use jacquard_repo::{BlockStore, MemoryBlockStore, Mst, car::write_car_bytes};
122122+123123+ fn nsid(s: &str) -> Nsid<'static> {
124124+ Nsid::new_owned(s).unwrap()
125125+ }
126126+127127+ fn op_create(path: &'static str) -> RepoOp<'static> {
128128+ RepoOp {
129129+ action: CowStr::Borrowed("create"),
130130+ path: CowStr::Borrowed(path),
131131+ cid: None,
132132+ prev: None,
133133+ extra_data: Default::default(),
134134+ }
135135+ }
136136+137137+ fn op_delete(path: &'static str) -> RepoOp<'static> {
138138+ RepoOp {
139139+ action: CowStr::Borrowed("delete"),
140140+ path: CowStr::Borrowed(path),
141141+ cid: None,
142142+ prev: None,
143143+ extra_data: Default::default(),
144144+ }
145145+ }
146146+147147+ /// Build a ParsedCar containing all the given MST keys.
148148+ async fn make_parsed_car(keys: &[&str]) -> jacquard_repo::car::reader::ParsedCar {
149149+ let storage = Arc::new(MemoryBlockStore::new());
150150+ let mut mst = Mst::new(storage.clone());
151151+ let dummy_cid = storage.put(b"record").await.unwrap();
152152+ for key in keys {
153153+ mst = mst.add(key, dummy_cid).await.unwrap();
154154+ }
155155+ let (mst_root, mut blocks) = mst.collect_blocks().await.unwrap();
156156+ let commit = Commit {
157157+ did: Did::new_owned("did:web:example.com").unwrap(),
158158+ version: 3,
159159+ data: mst_root,
160160+ rev: Tid::now_0(),
161161+ prev: None,
162162+ sig: Bytes::from(vec![0u8; 64]),
163163+ };
164164+ let commit_cid = commit.to_cid().unwrap();
165165+ let commit_cbor = Bytes::from(commit.to_cbor().unwrap());
166166+ blocks.insert(commit_cid, commit_cbor);
167167+ let car_bytes = write_car_bytes(commit_cid, blocks).await.unwrap();
168168+ jacquard_repo::car::parse_car_bytes(&car_bytes)
169169+ .await
170170+ .unwrap()
171171+ }
172172+173173+ // ---------------------------------------------------------------------------
174174+ // Basic cases
175175+ // ---------------------------------------------------------------------------
176176+177177+ #[tokio::test]
178178+ async fn empty_ops_returns_empty() {
179179+ let parsed = make_parsed_car(&["app.bsky.feed.post/abc123"]).await;
180180+ let (born, died) = extract(&[], parsed).unwrap();
181181+ assert!(born.is_empty());
182182+ assert!(died.is_empty());
183183+ }
184184+185185+ #[tokio::test]
186186+ async fn update_ops_ignored() {
187187+ let parsed = make_parsed_car(&["app.bsky.feed.post/abc123"]).await;
188188+ let ops = [RepoOp {
189189+ action: CowStr::Borrowed("update"),
190190+ path: CowStr::Borrowed("app.bsky.feed.post/abc123"),
191191+ cid: None,
192192+ prev: None,
193193+ extra_data: Default::default(),
194194+ }];
195195+ let (born, died) = extract(&ops, parsed).unwrap();
196196+ assert!(born.is_empty());
197197+ assert!(died.is_empty());
198198+ }
199199+200200+ // ---------------------------------------------------------------------------
201201+ // Birth detection
202202+ // ---------------------------------------------------------------------------
203203+204204+ #[tokio::test]
205205+ async fn new_collection_is_born() {
206206+ // CAR contains only the created key → no preexisting neighbours.
207207+ let parsed = make_parsed_car(&["app.bsky.feed.post/abc123"]).await;
208208+ let ops = [op_create("app.bsky.feed.post/abc123")];
209209+ let (born, died) = extract(&ops, parsed).unwrap();
210210+ assert_eq!(born, vec![nsid("app.bsky.feed.post")]);
211211+ assert!(died.is_empty());
212212+ }
213213+214214+ #[tokio::test]
215215+ async fn birth_suppressed_when_preexisting_key_visible() {
216216+ // CAR contains both the created key and a preexisting sibling.
217217+ let parsed =
218218+ make_parsed_car(&["app.bsky.feed.post/abc123", "app.bsky.feed.post/def456"]).await;
219219+ let ops = [op_create("app.bsky.feed.post/abc123")];
220220+ let (born, died) = extract(&ops, parsed).unwrap();
221221+ assert!(born.is_empty());
222222+ assert!(died.is_empty());
223223+ }
224224+225225+ // ---------------------------------------------------------------------------
226226+ // Death detection
227227+ // ---------------------------------------------------------------------------
228228+229229+ #[tokio::test]
230230+ async fn collection_dies_when_last_key_deleted() {
231231+ let parsed = make_parsed_car(&["app.bsky.feed.post/abc123"]).await;
232232+ let ops = [op_delete("app.bsky.feed.post/abc123")];
233233+ let (born, died) = extract(&ops, parsed).unwrap();
234234+ assert!(born.is_empty());
235235+ assert_eq!(died, vec![nsid("app.bsky.feed.post")]);
236236+ }
237237+238238+ #[tokio::test]
239239+ async fn death_suppressed_when_survivor_visible() {
240240+ // CAR shows a sibling key that is not being deleted.
241241+ let parsed =
242242+ make_parsed_car(&["app.bsky.feed.post/abc123", "app.bsky.feed.post/def456"]).await;
243243+ let ops = [op_delete("app.bsky.feed.post/abc123")];
244244+ let (born, died) = extract(&ops, parsed).unwrap();
245245+ assert!(born.is_empty());
246246+ assert!(died.is_empty());
247247+ }
248248+249249+ // ---------------------------------------------------------------------------
250250+ // Multi-collection / multi-op
251251+ // ---------------------------------------------------------------------------
252252+253253+ #[tokio::test]
254254+ async fn birth_and_death_in_same_commit() {
255255+ let parsed = make_parsed_car(&[
256256+ "app.bsky.feed.post/abc123", // will be created (born)
257257+ "app.bsky.graph.follow/old", // will be deleted (died)
258258+ ])
259259+ .await;
260260+ let ops = [
261261+ op_create("app.bsky.feed.post/abc123"),
262262+ op_delete("app.bsky.graph.follow/old"),
263263+ ];
264264+ let (mut born, mut died) = extract(&ops, parsed).unwrap();
265265+ born.sort_unstable();
266266+ died.sort_unstable();
267267+ assert_eq!(born, vec![nsid("app.bsky.feed.post")]);
268268+ assert_eq!(died, vec![nsid("app.bsky.graph.follow")]);
269269+ }
270270+}
+7-129
src/sync/firehose/commit_event.rs
···2222use jacquard_api::com_atproto::sync::subscribe_repos::{Commit, RepoOp};
2323use jacquard_common::types::cid::CidLink;
2424use jacquard_common::types::{string::Did, string::Nsid, tid::Tid};
2525-use jacquard_repo::mst::{CursorPosition, MstCursor, VerifiedWriteOp};
2525+use jacquard_repo::mst::VerifiedWriteOp;
2626use jacquard_repo::{MemoryBlockStore, Mst};
2727use tracing::{debug, info, warn};
2828···8585 }
8686 };
87878888- // Build the block store once; the MST is cheap to clone (just a CID +
8989- // Arc to the store). Both step 5 and collection mortality share it.
8888+ // Clone the parsed CAR before consuming its blocks into the MST block store.
8989+ // not super-cheap, but the Bytes values are refcounted at least so whatever
9090+ let parsed_clone = parsed.clone();
9191+9092 let storage = Arc::new(MemoryBlockStore::new_from_blocks(parsed.blocks));
9193 let new_mst = Mst::load(Arc::clone(&storage), mst_root_cid, None);
92949395 // ── Step 5: Inductive proof ───────────────────────────────────────────────
9494- // Invert the ops on a clone of the new MST; the resulting root must equal
9595- // prevData. Uses a clone so the original `new_mst` is untouched for the
9696- // collection mortality check below.
9797- if !verify_inductive_proof(new_mst.clone(), &commit.ops, commit.prev_data.as_ref()).await? {
9696+ if !verify_inductive_proof(new_mst, &commit.ops, commit.prev_data.as_ref()).await? {
9897 metrics::counter!("lightrail_commit_dropped_total", "reason" => "proof_failed")
9998 .increment(1);
10099 debug!(did = %did, "commit dropped: inductive proof failed");
···104103 metrics::histogram!("lightrail_commit_ops").record(commit.ops.len() as f64);
105104106105 // ── Collection birth/death detection ─────────────────────────────────────
107107- let (born, died) = extract_collection_mortality(&commit.ops, new_mst).await?;
106106+ let (born, died) = crate::mst::mortality::extract(&commit.ops, parsed_clone)?;
108107109108 // ── Steps 2, 6–9: Blocking storage checks + repo_prev update ────────────
110109 let db = db.clone();
···271270 }
272271 _ => None,
273272 }
274274-}
275275-276276-// ---------------------------------------------------------------------------
277277-// Collection birth/death detection (async, MST cursor)
278278-// ---------------------------------------------------------------------------
279279-280280-/// Walk the partial CAR's MST to detect which collections are newly added
281281-/// ("born") or fully removed ("died") by this commit.
282282-///
283283-/// ## How it works
284284-///
285285-/// ATProto firehose commit CARs include MST proof nodes for each changed key.
286286-/// Those proof nodes usually include the immediately adjacent keys (left and
287287-/// right neighbours in sorted order) for each change. By walking the partial
288288-/// CAR we collect all visible leaf keys: the changed keys themselves plus
289289-/// (usually) their in-collection neighbours.
290290-///
291291-/// A collection dies when every visible key in it is being deleted - all
292292-/// included adjacent keys are *not* in its collection. (immediately-adjacent
293293-/// keys are always included when a key is deleted... i think?????????)
294294-///
295295-/// A collection is born when every visible key in it is being created (no
296296-/// neighbours share its collection name). *rarely,* a neighbour in the same
297297-/// collection might be present in the repo but not in the proof, and we end up
298298-/// re-birthing (ew) a collection / adding it to the index twice. this is
299299-/// harmless. we still get to avoid almost all redundant inserts.
300300-///
301301-/// Multi-op commits are handled because we consider all ops together: a key is
302302-/// a "survivor" only if it is visible AND not in the deleted set.
303303-async fn extract_collection_mortality(
304304- ops: &[RepoOp<'_>],
305305- mst: Mst<MemoryBlockStore>,
306306-) -> crate::error::Result<(Vec<Nsid<'static>>, Vec<Nsid<'static>>)> {
307307- use std::collections::HashSet;
308308-309309- // ── Build create/delete path sets ────────────────────────────────────────
310310- let mut created: HashSet<String> = HashSet::new();
311311- let mut deleted: HashSet<String> = HashSet::new();
312312- for op in ops {
313313- match op.action.as_ref() {
314314- "create" => {
315315- created.insert(op.path.to_string());
316316- }
317317- "delete" => {
318318- deleted.insert(op.path.to_string());
319319- }
320320- _ => {} // updates don't affect collection mortality
321321- }
322322- }
323323-324324- if created.is_empty() && deleted.is_empty() {
325325- return Ok((vec![], vec![]));
326326- }
327327-328328- // ── Walk the partial CAR's MST to collect visible leaf keys ──────────────
329329- //
330330- // Strategy: try to descend into each Tree node (which succeeds if the
331331- // block is in the CAR). On error (block absent), skip the subtree instead.
332332- // This gives us all leaves reachable through blocks that ARE in the CAR —
333333- // exactly the proof nodes for keys adjacent to the changes.
334334- let mut cursor = MstCursor::new(mst);
335335- let mut visible: Vec<String> = Vec::new();
336336-337337- while !cursor.is_end() {
338338- match cursor.current() {
339339- CursorPosition::Leaf { key, .. } => {
340340- visible.push(key.to_string());
341341- cursor
342342- .advance()
343343- .await
344344- .map_err(|e| crate::error::Error::Other(e.to_string()))?;
345345- }
346346- CursorPosition::Tree { .. } => {
347347- // Try to descend; fall back to skip if the block is absent.
348348- match cursor.advance().await {
349349- Ok(()) => {}
350350- Err(_) => cursor
351351- .skip_subtree()
352352- .await
353353- .map_err(|e| crate::error::Error::Other(e.to_string()))?,
354354- }
355355- }
356356- CursorPosition::End => break,
357357- }
358358- }
359359-360360- // ── Check collection death (all visible keys in C are being deleted) ──────
361361- let deleted_collections: HashSet<&str> = deleted
362362- .iter()
363363- .filter_map(|p| p.split_once('/').map(|(c, _)| c))
364364- .collect();
365365-366366- let mut died: Vec<Nsid<'static>> = Vec::new();
367367- for coll in deleted_collections {
368368- let prefix = format!("{coll}/");
369369- let has_survivor = visible
370370- .iter()
371371- .any(|k| k.starts_with(&prefix) && !deleted.contains(k.as_str()));
372372- if !has_survivor && let Ok(nsid) = Nsid::new_owned(coll) {
373373- died.push(nsid);
374374- }
375375- }
376376-377377- // ── Check collection birth (all visible keys in C are being created) ──────
378378- let created_collections: HashSet<&str> = created
379379- .iter()
380380- .filter_map(|p| p.split_once('/').map(|(c, _)| c))
381381- .collect();
382382-383383- let mut born: Vec<Nsid<'static>> = Vec::new();
384384- for coll in created_collections {
385385- let prefix = format!("{coll}/");
386386- let has_preexisting = visible
387387- .iter()
388388- .any(|k| k.starts_with(&prefix) && !created.contains(k.as_str()));
389389- if !has_preexisting && let Ok(nsid) = Nsid::new_owned(coll) {
390390- born.push(nsid);
391391- }
392392- }
393393-394394- Ok((born, died))
395273}
396274397275// ---------------------------------------------------------------------------