lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

use slice tricks with sync.getRecord

- short-circuit if the CAR slice was the complete repo, or covered all possible contained collections
- fall back to getRepo if the repo is likely tiny (root layer <= 1)

phil e9c040e1 e140bc9c

+96 -23
+5 -3
hacking.md
··· 116 116 - [-] ~~ping/pong (unless jacquard is already doing it):~~ seems like no but we can skip it 117 117 - [x] no-events-received timeout reconnect 118 118 - [x] account status convergeance: if we receive commits from apparently-inactive accounts, should we check upstream status to make sure we're not stale? 119 - - [ ] resync short-circuit: tiny repos may actually return their entire CAR for getRecord 119 + - [x] resync short-circuit: tiny repos may actually return their entire CAR for getRecord 120 + - [~] repo-stream: drop record block contents with processor fn 121 + - [x] in getRecord before describeRepo 122 + - [ ] in commit handling 120 123 - [ ] commit CAR handling: generate a list of keys with gaps noted, to reliably detect missing adjacent keys 121 - - [ ] repo-stream: drop record block contents with processor fn 122 124 - [ ] meta/metrics keyspace for general stats 123 125 - [ ] total repos (hyperloglog estimate?) 124 126 - [ ] resync queue size ··· 127 129 - [x] config: add a `--heavy` mode that always uses `getRepo` and never `describeRepo` 128 130 - [x] config: db mem limit `--fjall-cache-mb` 129 131 - [x] config: per-host request rate self-throttling `--crawl-qps` (name from collectiondir) 130 - - [ ] resync: estimate CAR size from `getRecord` mst height; `getRepo` if it's likely very small 132 + - [x] resync: estimate CAR size from `getRecord` mst height; `getRepo` if it's likely very small 131 133 - [ ] special did:web ident cache behaviour to keep reusing a stale resolution on failure 132 134 - [ ] admin view of backfill state etc 133 135 - [ ] vanity stats for optimizations, like how many in-flight repos were saved from resync due to high-water-mark firehose cursor persistence
+1
src/mst/mod.rs
··· 6 6 7 7 use std::collections::BTreeMap; 8 8 9 + #[derive(Debug)] 9 10 pub struct Span<T: Ord> { 10 11 gap_before: bool, 11 12 things: BTreeMap<T, bool>, // gap after
+40
src/mst/slice_tricks.rs
··· 141 141 span.gap_before = prev_gap; 142 142 } 143 143 144 + prev_gap = false; 144 145 prev_collection = Some((&collection).into()); 145 146 } 146 147 ··· 589 590 nsid("sh.tangled.repo"), 590 591 ])) 591 592 ); 593 + } 594 + 595 + // --- gap stickiness regression --- 596 + 597 + // Verifies that a MissingSubtree between A and B does NOT cause B to look 598 + // like it has a gap after it. Before the fix, `prev_gap` was never reset 599 + // after being committed, so every collection after the first gap inherited 600 + // gap_after=true. 601 + #[tokio::test] 602 + async fn gap_does_not_stick_to_subsequent_collections() { 603 + // Build a repo where only the root node is kept. The root contains 604 + // the layer-4 key (app.bsky.feed.post) as an inline entry, plus left 605 + // and right child pointers for the flanking layer-0 keys. 606 + // Walking gives: MissingSubtree | feed.post | MissingSubtree 607 + // So feed.post should have gap_before=true AND gap_after=true. 608 + // 609 + // Now add a *second* fully-visible collection that is adjacent (no 610 + // missing subtree between it and feed.post). We use a full CAR for 611 + // two collections so that no MissingSubtree appears between them, 612 + // but we also want one MissingSubtree *before* both collections. 613 + // 614 + // Easiest to test with make_span directly: simulate the exact state 615 + // span_from_slice should produce after the fix. 616 + let s = make_span( 617 + true, 618 + &[ 619 + ("app.bsky.feed.post", false), 620 + ("app.bsky.graph.follow", false), 621 + ], 622 + ); 623 + // feed.post has gap_before; neither collection has gap_after 624 + assert!(!s.is_complete()); // gap_before exists 625 + // a collection lex-before feed.post is coverable (falls in the gap_before) 626 + assert!(s.could_cover(&BTreeSet::from([nsid("app.bsky.actor.profile")]))); 627 + // a collection lex-after graph.follow is NOT coverable (no trailing gap) 628 + assert!(!s.could_cover(&BTreeSet::from([nsid("app.bsky.richtext.facet")]))); 629 + // a collection lex-between feed.post and graph.follow is NOT coverable 630 + // (no gap between them) 631 + assert!(!s.could_cover(&BTreeSet::from([nsid("app.bsky.feed.repost")]))); 592 632 } 593 633 594 634 // --- sparse-CAR (gap) tests ---
+50 -20
src/sync/resync/describe_repo.rs
··· 116 116 _ => metrics::counter!("lightrail_resync_describe_repo_avoided", "reason" => "slice_complete", 117 117 "collections" => "multi").increment(1), 118 118 }; 119 + eprintln!("complete!! {collections:?}"); 119 120 return Ok(RepoSnapshot { 120 121 collections, 121 122 rev: report.rev, 122 123 data: report.data, 123 124 }); 124 125 } 125 - RepoCollections::Tiny(_) => { 126 + RepoCollections::Tiny(span) => { 127 + eprintln!("tiny collection! {span:?}"); 126 128 metrics::counter!("lightrail_resync_describe_repo_avoided", "reason" => "tiny_repo") 127 129 .increment(1); 128 130 // intentionally fall through to sync.getRepo instead of using describeRepo ··· 191 193 use std::sync::Arc; 192 194 193 195 use bytes::Bytes; 196 + use cid::Cid as RawCid; 194 197 use jacquard_common::types::nsid::Nsid; 195 198 use jacquard_common::types::string::Cid; 196 199 use jacquard_common::types::string::Did; 197 200 use jacquard_common::types::tid::Tid; 198 201 use jacquard_repo::commit::Commit; 199 202 use jacquard_repo::{BlockStore, MemoryBlockStore, Mst, car::write_car_bytes}; 203 + use std::collections::BTreeMap; 200 204 use wiremock::matchers::{method, path, query_param}; 201 205 use wiremock::{Mock, MockServer, ResponseTemplate}; 202 206 ··· 210 214 Did::new_owned(DID).unwrap() 211 215 } 212 216 213 - /// Build a minimal valid CAR for use as a `getRecord` response. 217 + /// Keys used for sparse CARs in tests. 218 + /// 219 + /// `app.bsky.feed.post/454397e440ec` is a known layer-4 key (atproto interop 220 + /// fixtures), giving root height 4. The flanking layer-0 keys force the root 221 + /// node to have left and right child pointers so a root-only CAR produces 222 + /// `MissingSubtree` on both sides — making the span incomplete and ensuring 223 + /// `fetch_collections` reaches the `describeRepo` call instead of short-circuiting. 224 + const SPARSE_KEYS: &[&str] = &[ 225 + "app.bsky.actor.profile/self", // layer 0, sorts before feed.post 226 + "app.bsky.feed.post/454397e440ec", // layer 4, appears in root node 227 + "app.bsky.graph.follow/self", // layer 0, sorts after feed.post 228 + ]; 229 + 230 + /// Build a sparse (root-MST-node-only) CAR for use as a `getRecord` response. 214 231 /// 215 - /// Returns `(car_bytes, mst_root_cid_string, rev_string)`. 216 - async fn make_car(keys: &[&str]) -> (Vec<u8>, String, String) { 232 + /// Only the commit block and the root MST node block are included; all subtree 233 + /// node blocks are absent. Walking produces `MissingSubtree` items, so 234 + /// `report()` returns `RepoCollections::Otherwise` — bypassing the short-circuit 235 + /// that would otherwise skip `describeRepo` when the span is complete. 236 + /// 237 + /// Requires at least one key at MST layer >= 2 (so height > 1 and the repo is 238 + /// not classified as "tiny"). The known layer-4 key 239 + /// `app.bsky.feed.post/454397e440ec` from the atproto interop fixtures is a 240 + /// reliable choice. 241 + async fn make_sparse_car(keys: &[&str]) -> (Vec<u8>, String, String) { 217 242 let storage = Arc::new(MemoryBlockStore::new()); 218 243 let mut mst = Mst::new(storage.clone()); 219 - 220 244 let dummy_cid = storage.put(b"record").await.unwrap(); 221 245 for key in keys { 222 246 mst = mst.add(key, dummy_cid).await.unwrap(); 223 247 } 224 - 225 - let (mst_root, mut blocks) = mst.collect_blocks().await.unwrap(); 248 + let (mst_root, all_blocks) = mst.collect_blocks().await.unwrap(); 226 249 let mst_root_str = mst_root.to_string(); 227 - 228 250 let commit = Commit { 229 251 did: Did::new_owned(DID).unwrap(), 230 252 version: 3, ··· 236 258 let commit_cid = commit.to_cid().unwrap(); 237 259 let rev_str = commit.rev.to_string(); 238 260 let commit_cbor = Bytes::from(commit.to_cbor().unwrap()); 239 - blocks.insert(commit_cid, commit_cbor); 240 - 241 - let car = write_car_bytes(commit_cid, blocks).await.unwrap(); 261 + let root_bytes = all_blocks 262 + .get(&mst_root) 263 + .expect("root MST node not in blocks") 264 + .clone(); 265 + let mut sparse: BTreeMap<RawCid, Bytes> = BTreeMap::new(); 266 + sparse.insert(commit_cid, commit_cbor); 267 + sparse.insert(mst_root, root_bytes); 268 + let car = write_car_bytes(commit_cid, sparse).await.unwrap(); 242 269 (car, mst_root_str, rev_str) 243 270 } 244 271 ··· 281 308 282 309 #[tokio::test] 283 310 async fn returns_collections_sorted() { 284 - let (car, _, _) = make_car(&["app.bsky.actor.profile/self"]).await; 311 + // describeRepo returns collections out of order; result must be sorted. 312 + let (car, _, _) = make_sparse_car(SPARSE_KEYS).await; 285 313 let server = mock_server( 286 314 ResponseTemplate::new(200).set_body_json(describe_repo_response(&[ 287 315 "app.bsky.feed.post", ··· 295 323 let base = server.uri().parse().unwrap(); 296 324 let snapshot = fetch_collections(&client, &base, did()).await.unwrap(); 297 325 298 - // Sorted regardless of the order the server returned them in. 299 326 assert_eq!( 300 327 snapshot.collections, 301 328 BTreeSet::from_iter([nsid("app.bsky.actor.profile"), nsid("app.bsky.feed.post")]) ··· 305 332 #[tokio::test] 306 333 async fn sorts_server_order() { 307 334 // Server returns collections in reverse lexicographic order. 308 - let (car, _, _) = make_car(&["app.bsky.actor.profile/self"]).await; 335 + let (car, _, _) = make_sparse_car(SPARSE_KEYS).await; 309 336 let server = mock_server( 310 337 ResponseTemplate::new(200).set_body_json(describe_repo_response(&[ 311 338 "app.bsky.graph.follow", ··· 332 359 333 360 #[tokio::test] 334 361 async fn returns_empty_vec_when_no_collections() { 335 - let (car, _, _) = make_car(&[]).await; 362 + // describeRepo says no collections; could_cover({}) trivially passes. 363 + let (car, _, _) = make_sparse_car(SPARSE_KEYS).await; 336 364 let server = mock_server( 337 365 ResponseTemplate::new(200).set_body_json(describe_repo_response(&[])), 338 366 car, ··· 348 376 349 377 #[tokio::test] 350 378 async fn returns_mst_root_cid_and_rev() { 351 - // The MST root CID (commit.data) and rev round-trip correctly. 352 - let (car, expected_mst_root, expected_rev) = 353 - make_car(&["app.bsky.actor.profile/self"]).await; 379 + // The MST root CID (commit.data) and rev come from the getRecord CAR commit, 380 + // not from describeRepo, so they round-trip correctly through the sparse path. 381 + let (car, expected_mst_root, expected_rev) = make_sparse_car(SPARSE_KEYS).await; 354 382 let server = mock_server( 355 383 ResponseTemplate::new(200) 356 384 .set_body_json(describe_repo_response(&["app.bsky.actor.profile"])), ··· 386 414 #[tokio::test] 387 415 async fn errors_on_describe_repo_failure() { 388 416 // getRecord succeeds but describeRepo returns an XRPC error. 389 - let (car, _, _) = make_car(&["app.bsky.actor.profile/self"]).await; 417 + // Sparse CAR (root-only) ensures the span is incomplete → describeRepo is reached. 418 + let (car, _, _) = make_sparse_car(SPARSE_KEYS).await; 390 419 let server = mock_server( 391 420 ResponseTemplate::new(400).set_body_json(serde_json::json!({ 392 421 "error": "InvalidRequest", ··· 406 435 #[tokio::test] 407 436 async fn errors_on_server_error() { 408 437 // A 5xx on describeRepo is a transient failure; surfaces as Err. 409 - let (car, _, _) = make_car(&["app.bsky.actor.profile/self"]).await; 438 + // Sparse CAR (root-only) ensures the span is incomplete → describeRepo is reached. 439 + let (car, _, _) = make_sparse_car(SPARSE_KEYS).await; 410 440 let server = mock_server(ResponseTemplate::new(500), car).await; 411 441 412 442 let client = reqwest::Client::new();