lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

scan_repos: snapshot + better prefix scan

plus some types cleanup for clippy

phil 10a832f8 9deee8c1

+57 -53
+6 -6
src/main.rs
··· 191 191 let client = client.clone(); 192 192 let resolver = resolver.clone(); 193 193 async move { 194 - resync::dispatcher::run( 194 + resync::dispatcher::run(resync::DispatcherConfig { 195 195 resolver, 196 196 db, 197 197 client, 198 - args.max_resync_workers, 199 - Duration::from_secs(args.describe_repo_fetch_timeout_secs), 200 - Duration::from_secs(args.get_repo_fetch_timeout_secs), 198 + max_concurrent: args.max_resync_workers, 199 + describe_timeout: Duration::from_secs(args.describe_repo_fetch_timeout_secs), 200 + get_repo_timeout: Duration::from_secs(args.get_repo_fetch_timeout_secs), 201 201 token, 202 - args.heavy, 203 - ) 202 + force_get_repo: args.heavy, 203 + }) 204 204 .await 205 205 .inspect_err(|e| warn!(error = %e, "resync exited")) 206 206 }
+3 -3
src/server/list_repos.rs
··· 78 78 .transpose() 79 79 .map_err(|_| ListReposError::BadCursor)?; 80 80 81 - let (entries, next) = tokio::task::spawn_blocking({ 81 + let (account, next) = tokio::task::spawn_blocking({ 82 82 let db = db.clone(); 83 83 move || crate::storage::repo::scan_repos(&db, cursor.as_ref(), limit) 84 84 }) 85 85 .await 86 86 .map_err(|_| ListReposError::StorageError)??; 87 87 88 - let repos: Vec<Repo<'static>> = entries 88 + let repos: Vec<Repo<'static>> = account 89 89 .into_iter() 90 - .filter_map(|(did, info, prev)| { 90 + .filter_map(|crate::storage::Account { did, info, prev }| { 91 91 let prev = prev?; 92 92 // Parse the stored MST-root CID bytes back into a typed CID. 93 93 // Use the Str variant explicitly: jacquard-common's Cid::Ipld
+11 -10
src/storage/mod.rs
··· 9 9 pub mod resync_queue; 10 10 11 11 pub(crate) use error::{StorageError, StorageResult}; 12 + pub(crate) use repo::Account; 12 13 13 14 // --------------------------------------------------------------------------- 14 15 // key prefixes ··· 22 23 type KeyPrefix = [u8; 3]; 23 24 24 25 /// Main collection index (collection → did). See [`collection_index`]. 25 - pub(crate) const PREFIX_RBC: KeyPrefix = *b"rbc"; 26 + pub(super) const PREFIX_RBC: KeyPrefix = *b"rbc"; 26 27 /// Reversed collection index (did → collection). See [`collection_index`]. 27 - pub(crate) const PREFIX_CBR: KeyPrefix = *b"cbr"; 28 + pub(super) const PREFIX_CBR: KeyPrefix = *b"cbr"; 28 29 /// Per-repo state and account status. See [`repo`]. 29 - pub(crate) const PREFIX_REPO: KeyPrefix = *b"rep"; 30 + pub(super) const PREFIX_REPO: KeyPrefix = *b"rep"; 30 31 /// Per-repo transient sync state (rev + prevData CID). See [`repo`]. 31 - pub(crate) const PREFIX_REPO_PREV: KeyPrefix = *b"rev"; 32 + pub(super) const PREFIX_REPO_PREV: KeyPrefix = *b"rev"; 32 33 /// Firehose subscription cursor (per relay host). See [`firehose_cursor`]. 33 - pub(crate) const PREFIX_SUBSCRIBE_REPOS: KeyPrefix = *b"sub"; 34 + pub(super) const PREFIX_SUBSCRIBE_REPOS: KeyPrefix = *b"sub"; 34 35 /// listRepos backfill walk progress (per relay host). See [`backfill_progress`]. 35 - pub(crate) const PREFIX_LIST_REPOS: KeyPrefix = *b"lsr"; 36 + pub(super) const PREFIX_LIST_REPOS: KeyPrefix = *b"lsr"; 36 37 /// Timestamp-ordered resync work queue. See [`resync_queue`]. 37 - pub(crate) const PREFIX_RESYNC_QUEUE: KeyPrefix = *b"rsq"; 38 + pub(super) const PREFIX_RESYNC_QUEUE: KeyPrefix = *b"rsq"; 38 39 /// Per-repo buffered firehose events during resync. See [`resync_buffer`]. 39 - pub(crate) const PREFIX_RESYNC_BUFFER: KeyPrefix = *b"rsb"; 40 + pub(super) const PREFIX_RESYNC_BUFFER: KeyPrefix = *b"rsb"; 40 41 /// Per-PDS host state (sync1.1 mode, trust, listRepos cursor/done). See [`pds_host`]. 41 - pub(crate) const PREFIX_PDS_HOST: KeyPrefix = *b"pdh"; 42 + pub(super) const PREFIX_PDS_HOST: KeyPrefix = *b"pdh"; 42 43 /// listHosts walk cursor (per upstream relay host). See [`list_hosts_cursor`]. 43 - pub(crate) const PREFIX_LIST_HOSTS: KeyPrefix = *b"lhs"; 44 + pub(super) const PREFIX_LIST_HOSTS: KeyPrefix = *b"lhs"; 44 45 45 46 use std::path::Path; 46 47 use std::sync::Arc;
+14 -26
src/storage/repo.rs
··· 400 400 Ok((dids, next)) 401 401 } 402 402 403 + /// full combined details about an account 404 + pub struct Account { 405 + pub did: Did<'static>, 406 + pub info: RepoInfo, 407 + pub prev: Option<RepoPrev>, 408 + } 409 + 403 410 /// Iterate over repos in the `rep` keyspace, starting at `cursor` (inclusive). 404 411 /// 405 412 /// Returns at most `limit` entries. `next` is the first DID of the next page, ··· 411 418 db: &DbRef, 412 419 cursor: Option<&Did<'_>>, 413 420 limit: usize, 414 - ) -> StorageResult<( 415 - Vec<(Did<'static>, RepoInfo, Option<RepoPrev>)>, 416 - Option<Did<'static>>, 417 - )> { 418 - // TODO: use fjall prefix_range 419 - // TODO: probably snapshot so we get a consistent account view? 420 - 421 - let prefix_len = PREFIX_REPO.len(); 421 + ) -> StorageResult<(Vec<Account>, Option<Did<'static>>)> { 422 + let snapshot = db.database.snapshot(); 422 423 423 - let start_key: Vec<u8> = { 424 - let mut k = PREFIX_REPO.to_vec(); 425 - if let Some(did) = cursor { 426 - k.extend_from_slice(did.as_str().as_bytes()); 427 - } 428 - k 429 - }; 424 + let start_suffix: Vec<u8> = cursor.map(|did| did.as_bytes().to_vec()).unwrap_or(vec![]); 425 + let mut ranger = snapshot.range(&db.ks, prefixed_range(PREFIX_REPO, start_suffix..)); 430 426 431 - let mut ranger = db.ks.range(start_key..); 432 427 let mut entries = Vec::with_capacity(limit); 433 - 434 428 for guard in ranger.by_ref() { 435 429 let (k, v) = guard.into_inner()?; 436 - if !k.starts_with(&PREFIX_REPO) { 437 - break; 438 - } 439 - let did_str = std::str::from_utf8(&k[prefix_len..]).map_err(|_| StorageError::Corrupt { 430 + let did_str = std::str::from_utf8(&k[3..]).map_err(|_| StorageError::Corrupt { 440 431 key: String::from_utf8_lossy(&k).to_string(), 441 432 reason: "non-UTF-8 DID in repo key", 442 433 })?; ··· 455 446 decode_repo_prev(&b, &pk_str) 456 447 }) 457 448 .transpose()?; 458 - entries.push((did, info, prev)); 449 + entries.push(Account { did, info, prev }); 459 450 if entries.len() >= limit { 460 451 break; 461 452 } ··· 466 457 break None; 467 458 }; 468 459 let key = guard.key()?; 469 - if !key.starts_with(&PREFIX_REPO) { 470 - break None; 471 - } 472 - let did_str = match std::str::from_utf8(&key[prefix_len..]) { 460 + let did_str = match std::str::from_utf8(&key[3..]) { 473 461 Ok(s) => s, 474 462 Err(_) => continue, 475 463 };
+21 -8
src/sync/resync/dispatcher.rs
··· 37 37 /// the dispatcher will not dispatch new work to that host. 38 38 const RATE_LIMIT_COOLDOWN_SECS: u64 = 60; 39 39 40 + pub struct DispatcherConfig { 41 + pub resolver: std::sync::Arc<crate::identity::Resolver>, 42 + pub db: DbRef, 43 + pub client: crate::http::ThrottledClient, 44 + pub max_concurrent: usize, 45 + pub describe_timeout: std::time::Duration, 46 + pub get_repo_timeout: std::time::Duration, 47 + pub token: tokio_util::sync::CancellationToken, 48 + pub force_get_repo: bool, 49 + } 50 + 40 51 /// Run the resync dispatcher until the future is cancelled. 41 52 /// 42 53 /// Polls the resync queue, spawning up to `max_concurrent` worker tasks. 43 54 /// Workers report their outcome back to the dispatcher, which handles state 44 55 /// transitions and re-enqueues failed jobs with exponential backoff. 45 56 pub async fn run( 46 - resolver: std::sync::Arc<crate::identity::Resolver>, 47 - db: DbRef, 48 - client: crate::http::ThrottledClient, 49 - max_concurrent: usize, 50 - describe_timeout: std::time::Duration, 51 - get_repo_timeout: std::time::Duration, 52 - token: tokio_util::sync::CancellationToken, 53 - force_get_repo: bool, 57 + DispatcherConfig { 58 + resolver, 59 + db, 60 + client, 61 + max_concurrent, 62 + describe_timeout, 63 + get_repo_timeout, 64 + token, 65 + force_get_repo, 66 + }: DispatcherConfig, 54 67 ) -> Result<()> { 55 68 let mut busy: HashSet<Did<'static>> = HashSet::new(); 56 69 let mut task_dids: HashMap<TaskId, Did<'static>> = HashMap::new();
+2
src/sync/resync/mod.rs
··· 34 34 repo::{AccountStatus, RepoInfo, RepoPrev, RepoState}, 35 35 }; 36 36 37 + pub use dispatcher::DispatcherConfig; 38 + 37 39 type Result<T> = std::result::Result<T, ResyncError>; 38 40 39 41 /// Errors that can occur during a resync operation.