lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

wip slice tricks for getRecord before describeRepo

phil e140bc9c 4cab27ec

+139 -39
+78
src/mst/slice_tricks.rs
··· 356 356 } 357 357 358 358 // ========================================================================== 359 + // complete() 360 + // ========================================================================== 361 + 362 + #[test] 363 + fn complete_empty_span_no_gap() { 364 + assert_eq!(make_span(false, &[]).complete(), Some(BTreeSet::new())); 365 + } 366 + 367 + #[test] 368 + fn complete_empty_span_with_gap_is_none() { 369 + // gap_before with no keys = "could be anything" 370 + assert_eq!(make_span(true, &[]).complete(), None); 371 + } 372 + 373 + #[test] 374 + fn complete_single_key_no_gaps() { 375 + let s = make_span(false, &[("a.b.c", false)]); 376 + assert_eq!(s.complete(), Some(BTreeSet::from([nsid("a.b.c")]))); 377 + } 378 + 379 + #[test] 380 + fn complete_gap_before_is_none() { 381 + let s = make_span(true, &[("a.b.c", false)]); 382 + assert_eq!(s.complete(), None); 383 + } 384 + 385 + #[test] 386 + fn complete_gap_after_is_none() { 387 + let s = make_span(false, &[("a.b.c", true)]); 388 + assert_eq!(s.complete(), None); 389 + } 390 + 391 + #[test] 392 + fn complete_multiple_keys_no_gaps() { 393 + let s = make_span( 394 + false, 395 + &[("a.b.c", false), ("a.b.d", false), ("a.b.e", false)], 396 + ); 397 + assert_eq!( 398 + s.complete(), 399 + Some(BTreeSet::from([ 400 + nsid("a.b.c"), 401 + nsid("a.b.d"), 402 + nsid("a.b.e") 403 + ])) 404 + ); 405 + } 406 + 407 + #[test] 408 + fn complete_gap_between_keys_is_none() { 409 + let s = make_span(false, &[("a.b.c", true), ("a.b.d", false)]); 410 + assert_eq!(s.complete(), None); 411 + } 412 + 413 + #[test] 414 + fn complete_gap_before_and_after_is_none() { 415 + let s = make_span(true, &[("a.b.c", true)]); 416 + assert_eq!(s.complete(), None); 417 + } 418 + 419 + #[test] 420 + fn complete_sub_namespace_and_parent_both_present() { 421 + // Both collections are known; complete() returns both regardless of 422 + // whether the caller orders by MST order or lex order. 423 + let s = make_span( 424 + false, 425 + &[("sh.tangled.repo.issue", false), ("sh.tangled.repo", false)], 426 + ); 427 + assert_eq!( 428 + s.complete(), 429 + Some(BTreeSet::from([ 430 + nsid("sh.tangled.repo"), 431 + nsid("sh.tangled.repo.issue"), 432 + ])) 433 + ); 434 + } 435 + 436 + // ========================================================================== 359 437 // span_from_slice 360 438 // ========================================================================== 361 439
+9 -8
src/storage/collection_index.rs
··· 9 9 //! both sides must always be written and removed together; `insert` and 10 10 //! `remove` functions do the write thing, with `_into` variants for batching. 11 11 use std::cmp::Ordering; 12 + use std::collections::BTreeSet; 12 13 13 14 use fjall::util::prefixed_range; 14 15 use jacquard_common::types::nsid::Nsid; ··· 365 366 pub fn sync_collections( 366 367 db: &DbRef, 367 368 did: &Did<'_>, 368 - collections: &[Nsid<'static>], 369 + collections: &BTreeSet<Nsid<'_>>, 369 370 ) -> StorageResult<(usize, usize)> { 370 371 // Read the current set from the cbr index. Fjall iterates in key order, and 371 372 // the cbr prefix puts the collection NSID last, so the suffix scan is already ··· 563 564 sync_collections( 564 565 &db, 565 566 &d, 566 - &[nsid("app.bsky.actor.profile"), nsid("app.bsky.feed.post")], 567 + &BTreeSet::from_iter([nsid("app.bsky.actor.profile"), nsid("app.bsky.feed.post")]), 567 568 ) 568 569 .unwrap(); 569 570 ··· 582 583 insert(&db, &d, nsid("app.bsky.feed.post")).unwrap(); 583 584 insert(&db, &d, nsid("app.bsky.graph.follow")).unwrap(); 584 585 585 - sync_collections(&db, &d, &[]).unwrap(); 586 + sync_collections(&db, &d, &Default::default()).unwrap(); 586 587 587 588 assert_eq!(collections_for(&db, &d), vec![]); 588 589 } ··· 592 593 let db = open_temporary().unwrap(); 593 594 let d = did("did:web:alice.com"); 594 595 595 - let cols = [nsid("app.bsky.feed.post"), nsid("app.bsky.graph.follow")]; 596 + let cols = BTreeSet::from_iter([nsid("app.bsky.feed.post"), nsid("app.bsky.graph.follow")]); 596 597 for col in &cols { 597 598 insert(&db, &d, col.clone()).unwrap(); 598 599 } 599 600 600 601 sync_collections(&db, &d, &cols).unwrap(); 601 602 602 - assert_eq!(collections_for(&db, &d), cols.to_vec()); 603 + assert_eq!(collections_for(&db, &d), Vec::from_iter(cols)); 603 604 } 604 605 605 606 #[test] ··· 613 614 sync_collections( 614 615 &db, 615 616 &d, 616 - &[nsid("app.bsky.feed.post"), nsid("app.bsky.graph.follow")], 617 + &BTreeSet::from_iter([nsid("app.bsky.feed.post"), nsid("app.bsky.graph.follow")]), 617 618 ) 618 619 .unwrap(); 619 620 ··· 634 635 sync_collections( 635 636 &db, 636 637 &d, 637 - &[nsid("app.bsky.feed.post"), nsid("app.bsky.graph.follow")], 638 + &BTreeSet::from_iter([nsid("app.bsky.feed.post"), nsid("app.bsky.graph.follow")]), 638 639 ) 639 640 .unwrap(); 640 641 ··· 827 828 insert(&db, &alice, nsid("app.bsky.feed.post")).unwrap(); 828 829 insert(&db, &bob, nsid("app.bsky.feed.post")).unwrap(); 829 830 830 - sync_collections(&db, &alice, &[]).unwrap(); 831 + sync_collections(&db, &alice, &Default::default()).unwrap(); 831 832 832 833 assert_eq!(collections_for(&db, &alice), vec![]); 833 834 assert_eq!(collections_for(&db, &bob), vec![nsid("app.bsky.feed.post")]);
+39 -23
src/sync/resync/describe_repo.rs
··· 38 38 IntoStatic, 39 39 error::ClientErrorKind, 40 40 http_client::HttpClient, 41 - types::string::{Did, Nsid, RecordKey, Rkey, Tid}, 41 + types::string::{Did, Nsid, RecordKey, Rkey}, 42 42 xrpc::{XrpcError, XrpcExt}, 43 43 }; 44 44 use repo_stream::{DriverBuilder, LoadError}; 45 - use std::collections::HashSet; 45 + use std::collections::BTreeSet; 46 46 47 47 use super::{GetCollectionsError, RepoGoneReason, RepoSnapshot}; 48 + use crate::mst::slice_tricks::{self, RepoCollections}; 48 49 49 50 /// for `getRecord` CAR slice (commit block + small MST proof path) 50 51 const GET_RECORD_MEM_LIMIT_MB: usize = 1; ··· 91 92 let reader = tokio::io::BufReader::new(std::io::Cursor::new(output.body)); 92 93 let mut mem_car = DriverBuilder::new() 93 94 .with_mem_limit_mb(GET_RECORD_MEM_LIMIT_MB) 95 + .with_block_processor(|_| vec![]) // discard record blocks as much as possible 94 96 .load_car(reader) 95 97 .await 96 98 .map_err(|e| match e { ··· 100 102 e => GetCollectionsError::InvalidData(format!("getRecord CAR error: {e}")), 101 103 })?; 102 104 103 - let rev = Tid::new(&mem_car.commit.rev) 104 - .map_err(|e| GetCollectionsError::InvalidData(format!("bad rev in commit: {e}")))?; 105 + let report = slice_tricks::report(&mut mem_car) 106 + .map_err(|e| GetCollectionsError::InvalidData(format!("car report failed: {e}")))?; 105 107 106 - // shuffle repo-stream raw cid => jacquard cid wrapper 107 - let data = mem_car.commit.data.into(); 108 - 109 - let proven_collections = 110 - crate::mst::collections::extract_from_slice(&mut mem_car).map_err(|e| { 111 - GetCollectionsError::InvalidData(format!("failed to extract collections from CAR: {e}")) 112 - })?; 108 + let collection_span = match report.collections { 109 + RepoCollections::Complete(collections) => { 110 + // short-circuit -- we got everything just from getRecord!!! 111 + match collections.len() { 112 + 0 => metrics::counter!("lightrail_resync_describe_repo_avoided", "reason" => "slice_complete", 113 + "collections" => "empty").increment(1), 114 + 1 => metrics::counter!("lightrail_resync_describe_repo_avoided", "reason" => "slice_complete", 115 + "collections" => "single").increment(1), 116 + _ => metrics::counter!("lightrail_resync_describe_repo_avoided", "reason" => "slice_complete", 117 + "collections" => "multi").increment(1), 118 + }; 119 + return Ok(RepoSnapshot { 120 + collections, 121 + rev: report.rev, 122 + data: report.data, 123 + }); 124 + } 125 + RepoCollections::Tiny(_) => { 126 + metrics::counter!("lightrail_resync_describe_repo_avoided", "reason" => "tiny_repo") 127 + .increment(1); 128 + // intentionally fall through to sync.getRepo instead of using describeRepo 129 + return Err(GetCollectionsError::GetSmallRepo); 130 + } 131 + RepoCollections::Otherwise(span) => span, 132 + }; 113 133 114 134 // step 2: collection list from describeRepo. 115 - let collections: HashSet<Nsid<'_>> = client 135 + let collections: BTreeSet<Nsid<'_>> = client 116 136 .xrpc(base.clone()) 117 137 .send(&DescribeRepo { 118 138 repo: did.clone().into(), ··· 127 147 .collect(); 128 148 129 149 // post-processing check: collections we know from getRecord present 130 - if !collections.is_superset(&proven_collections) { 150 + if !collection_span.could_cover(&collections) { 131 151 return Err(GetCollectionsError::InvalidData( 132 152 "describeRepo inconsistent collection set".to_string(), 133 153 )); 134 154 } 135 155 136 - // final shape and order 137 - let mut collections = Vec::from_iter(collections); 138 - collections.sort_unstable(); 139 - 140 156 Ok(RepoSnapshot { 141 157 collections, 142 - rev, 143 - data, 158 + rev: report.rev, 159 + data: report.data, 144 160 }) 145 161 } 146 162 ··· 282 298 // Sorted regardless of the order the server returned them in. 283 299 assert_eq!( 284 300 snapshot.collections, 285 - vec![nsid("app.bsky.actor.profile"), nsid("app.bsky.feed.post")] 301 + BTreeSet::from_iter([nsid("app.bsky.actor.profile"), nsid("app.bsky.feed.post")]) 286 302 ); 287 303 } 288 304 ··· 306 322 307 323 assert_eq!( 308 324 snapshot.collections, 309 - vec![ 325 + BTreeSet::from_iter([ 310 326 nsid("app.bsky.actor.profile"), 311 327 nsid("app.bsky.feed.post"), 312 328 nsid("app.bsky.graph.follow"), 313 - ] 329 + ]) 314 330 ); 315 331 } 316 332 ··· 327 343 let base = server.uri().parse().unwrap(); 328 344 let snapshot = fetch_collections(&client, &base, did()).await.unwrap(); 329 345 330 - assert_eq!(snapshot.collections, vec![] as Vec<Nsid<'static>>); 346 + assert!(snapshot.collections.is_empty()); 331 347 } 332 348 333 349 #[tokio::test]
+7 -6
src/sync/resync/get_repo.rs
··· 12 12 xrpc::XrpcExt, 13 13 }; 14 14 use repo_stream::{DriverBuilder, LoadError}; 15 + use std::collections::BTreeSet; 15 16 16 17 use super::{GetCollectionsError, RepoGoneReason, RepoSnapshot}; 17 18 ··· 77 78 .map_err(|e| GetCollectionsError::InvalidData(format!("bad CAR walk: {e}")))?; 78 79 79 80 Ok(RepoSnapshot { 80 - collections, 81 + collections: BTreeSet::from_iter(collections), 81 82 rev, 82 83 data, 83 84 }) ··· 206 207 207 208 assert_eq!( 208 209 snapshot.collections, 209 - vec![ 210 + BTreeSet::from_iter([ 210 211 nsid("app.bsky.actor.profile"), 211 212 nsid("app.bsky.feed.post"), 212 213 nsid("app.bsky.graph.follow"), 213 - ] 214 + ]) 214 215 ); 215 216 } 216 217 ··· 233 234 234 235 assert_eq!( 235 236 snapshot.collections, 236 - vec![ 237 + BTreeSet::from_iter([ 237 238 nsid("sh.tangled.repo"), 238 239 nsid("sh.tangled.repo.issue"), 239 240 nsid("sh.tangled.repo.pull"), 240 - ] 241 + ]) 241 242 ); 242 243 } 243 244 ··· 250 251 let base = server.uri().parse().unwrap(); 251 252 let snapshot = fetch_collections(&client, &base, did()).await.unwrap(); 252 253 253 - assert_eq!(snapshot.collections, vec![] as Vec<Nsid<'static>>); 254 + assert!(snapshot.collections.is_empty()); 254 255 } 255 256 256 257 #[tokio::test]
+6 -2
src/sync/resync/mod.rs
··· 19 19 pub mod dispatcher; 20 20 pub mod get_repo; 21 21 22 + use std::collections::BTreeSet; 22 23 use std::time::Duration; 23 24 24 25 use cid::Cid as RawCid; ··· 61 62 #[derive(Debug, PartialEq)] 62 63 pub struct RepoSnapshot { 63 64 /// Sorted list of collection NSIDs present in the repository. 64 - pub collections: Vec<Nsid<'static>>, 65 + pub collections: BTreeSet<Nsid<'static>>, // TODO btreeset 65 66 /// Revision TID of the latest commit. 66 67 pub rev: Tid, 67 68 /// MST root CID ··· 89 90 /// The server returned an unrecognised XRPC error code. 90 91 #[error("unexpected XRPC error: {0}")] 91 92 UnexpectedXrpc(String), 93 + /// The repo is likely tiny, intentionally fall through to sync.getRepo 94 + #[error("should getRepo because it's likely tiny")] 95 + GetSmallRepo, 92 96 } 93 97 94 98 /// The specific reason a repository is inaccessible. ··· 275 279 } 276 280 // Any other failure or timeout: fall through. The PDS may not implement 277 281 // describeRepo, or may have a bug this endpoint doesn't hit. 278 - Ok(Err(_)) | Err(_) => {} 282 + Ok(Err(GetCollectionsError::GetSmallRepo)) | Ok(Err(_)) | Err(_) => {} 279 283 } 280 284 } 281 285