lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

clippy long arms

phil bb0e7e45 c7a1adf0

+31 -28
+11 -18
src/sync/resync/dispatcher.rs
··· 16 16 use std::sync::{Arc, Mutex, atomic::Ordering}; 17 17 use std::time::{Duration, Instant, SystemTime}; 18 18 19 - use super::ResyncError; 19 + use super::{AppStuff, ResyncError}; 20 20 21 21 use tokio::task::{Id as TaskId, JoinSet}; 22 22 use tracing::{debug, error, info, trace, warn}; ··· 182 182 } 183 183 } 184 184 185 - let client = client.clone(); 186 - let resolver = resolver.clone(); 187 - let db = db.clone(); 188 - let token = token.clone(); 185 + let worker_stuff = AppStuff { 186 + resolver: resolver.clone(), 187 + client: client.clone(), 188 + db: db.clone(), 189 + token: token.clone(), 190 + }; 189 191 let handle = workers.spawn(async move { 190 192 run_worker( 191 193 item, 192 - &resolver, 193 - &client, 194 - &db, 195 - token, 194 + worker_stuff, 196 195 describe_timeout, 197 196 get_repo_timeout, 198 197 force_get_repo, ··· 371 370 372 371 async fn run_worker( 373 372 item: ResyncItem, 374 - resolver: &crate::identity::Resolver, 375 - client: &crate::http::ThrottledClient, 376 - db: &DbRef, 377 - token: tokio_util::sync::CancellationToken, 373 + app_stuff: AppStuff, 378 374 describe_timeout: std::time::Duration, 379 375 get_repo_timeout: std::time::Duration, 380 376 force_get_repo: bool, 381 377 ) -> WorkerOutcome { 382 378 let did = item.did.clone(); 383 379 match super::index_repo( 384 - client, 385 - resolver, 380 + app_stuff.clone(), 386 381 item.did, 387 - db, 388 - token, 389 382 describe_timeout, 390 383 get_repo_timeout, 391 384 force_get_repo, ··· 406 399 Err(ResyncError::RepoNotFound) => { 407 400 // Invalidate the cached DID resolution: the PDS pointer may be 408 401 // stale (e.g. mid-migration), so the next attempt will re-resolve. 409 - resolver.invalidate_did(&did); 402 + app_stuff.resolver.invalidate_did(&did); 410 403 info!(did = %did, "repo not found on PDS; invalidated DID cache"); 411 404 WorkerOutcome::NotFound { 412 405 retry_count: item.retry_count,
+20 -10
src/sync/resync/mod.rs
··· 113 113 Deactivated, 114 114 } 115 115 116 + /// Bundle of useful stuff from the app 117 + /// 118 + /// should probably become a proper AppState 119 + #[derive(Clone)] 120 + pub struct AppStuff { 121 + resolver: Arc<crate::identity::Resolver>, 122 + client: crate::http::ThrottledClient, 123 + db: DbRef, 124 + token: tokio_util::sync::CancellationToken, 125 + } 126 + 116 127 /// Establish the current collection set for `did` and write it to the index. 117 128 /// 118 129 /// Tries `describeRepo` first (cheap). Falls back to a full `getRepo` CAR walk ··· 120 131 /// 121 132 /// Returns `Ok(())` silently if the repo is not found or inaccessible; 122 133 /// the caller decides whether to retry on transient errors. 123 - pub async fn index_repo<C>( 124 - client: &C, 125 - resolver: &crate::identity::Resolver, 134 + pub async fn index_repo( 135 + AppStuff { 136 + resolver, 137 + client, 138 + db, 139 + token, 140 + }: AppStuff, 126 141 did: Did<'_>, 127 - db: &DbRef, 128 - token: tokio_util::sync::CancellationToken, 129 142 describe_timeout: Duration, 130 143 get_repo_timeout: Duration, 131 144 force_get_repo: bool, 132 - ) -> Result<()> 133 - where 134 - C: HttpClient + HttpClientExt + Sync, 135 - { 145 + ) -> Result<()> { 136 146 // Own the DID for the duration of the function so we can move it into 137 147 // spawn_blocking closures (which require 'static captures). 138 148 let did: Did<'static> = did.into_static(); ··· 141 151 let base = &*resolved.pds; 142 152 143 153 let repo_snapshot = match fetch_collections( 144 - client, 154 + &client, 145 155 base, 146 156 did.clone(), 147 157 token,