lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

heavy mode: always a full `getRepo`

if you really really care about accuracy and don't care about PDS peoples' egress

phil 558e204c 94a1740d

+37 -23
+1 -1
hacking.md
··· 106 106 107 107 108 108 very much still todo but i'm getting tired 109 - - [ ] add a `--heavy` mode that always uses `getRepo` and never `describeRepo` 109 + - [x] add a `--heavy` mode that always uses `getRepo` and never `describeRepo` 110 110 - [ ] resync: estimate CAR size from `getRecord` mst height; `getRepo` if it's likely very small 111 111 - [ ] multi-relay subscriber 112 112 - [ ] special did:web behaviour to keep reusing a stale resolution on failure
+6
src/main.rs
··· 85 85 #[arg(long, action, env = "LIGHTRAIL_DEEP_CRAWL")] 86 86 deep_crawl: bool, 87 87 88 + /// Heavy mode: always fetch the full repo CAR via getRepo for resync, 89 + /// skipping the cheaper describeRepo fast path. 90 + #[arg(long, action, env = "LIGHTRAIL_HEAVY")] 91 + heavy: bool, 92 + 88 93 /// Max concurrent per-PDS listRepos workers during deep crawl. 89 94 #[arg( 90 95 long, ··· 186 191 Duration::from_secs(args.describe_repo_fetch_timeout_secs), 187 192 Duration::from_secs(args.get_repo_fetch_timeout_secs), 188 193 token, 194 + args.heavy, 189 195 ) 190 196 .await 191 197 .inspect_err(|e| warn!(error = %e, "resync exited"))
+2
src/sync/resync/describe_repo.rs
··· 86 86 e => GetCollectionsError::Request(e.to_string()), 87 87 })?; 88 88 89 + // TODO: extract keys from getRecord that we can check against describeRepo 90 + 89 91 let reader = tokio::io::BufReader::new(std::io::Cursor::new(output.body)); 90 92 let mut mem_car = DriverBuilder::new() 91 93 .with_mem_limit_mb(GET_RECORD_MEM_LIMIT_MB)
+4
src/sync/resync/dispatcher.rs
··· 50 50 describe_timeout: std::time::Duration, 51 51 get_repo_timeout: std::time::Duration, 52 52 token: tokio_util::sync::CancellationToken, 53 + force_get_repo: bool, 53 54 ) -> Result<()> { 54 55 let mut busy: HashSet<Did<'static>> = HashSet::new(); 55 56 let mut task_dids: HashMap<TaskId, Did<'static>> = HashMap::new(); ··· 129 130 &db, 130 131 describe_timeout, 131 132 get_repo_timeout, 133 + force_get_repo, 132 134 ) 133 135 .await 134 136 }); ··· 261 263 db: &DbRef, 262 264 describe_timeout: std::time::Duration, 263 265 get_repo_timeout: std::time::Duration, 266 + force_get_repo: bool, 264 267 ) -> WorkerOutcome { 265 268 let did = item.did.clone(); 266 269 match super::index_repo( ··· 270 273 db, 271 274 describe_timeout, 272 275 get_repo_timeout, 276 + force_get_repo, 273 277 ) 274 278 .await 275 279 {
+24 -22
src/sync/resync/mod.rs
··· 114 114 db: &DbRef, 115 115 describe_timeout: Duration, 116 116 get_repo_timeout: Duration, 117 + force_get_repo: bool, 117 118 ) -> Result<()> 118 119 where 119 120 C: HttpClient + HttpClientExt + Sync, ··· 131 132 did.clone(), 132 133 describe_timeout, 133 134 get_repo_timeout, 135 + force_get_repo, 134 136 ) 135 137 .await 136 138 { ··· 245 247 did: Did<'_>, 246 248 describe_timeout: Duration, 247 249 get_repo_timeout: Duration, 250 + force_get_repo: bool, 248 251 ) -> std::result::Result<RepoSnapshot, GetCollectionsError> 249 252 where 250 253 C: HttpClient + HttpClientExt + Sync, 251 254 { 252 - let describe_result = tokio::time::timeout( 253 - describe_timeout, 254 - describe_repo::fetch_collections(client, base, did.clone()), 255 - ) 256 - .await; 255 + if !force_get_repo { 256 + let describe_result = tokio::time::timeout( 257 + describe_timeout, 258 + describe_repo::fetch_collections(client, base, did.clone()), 259 + ) 260 + .await; 257 261 258 - match describe_result { 259 - Ok(Ok(snapshot)) if !snapshot.collections.is_empty() => { 260 - metrics::counter!("lightrail_resync_fetch_total", "source" => "describe_repo") 261 - .increment(1); 262 - return Ok(snapshot); 263 - } 264 - // Empty list: PDS may have a bug or not paginate large collection sets. 265 - // Fall through to the full CAR walk. 266 - Ok(Ok(_)) => {} 267 - // Rate-limited: don't escalate to a heavier getRepo request. 268 - Ok(Err(e @ GetCollectionsError::RateLimited(_))) => return Err(e), 269 - // Definitively gone: getRepo would return the same answer. 270 - Ok(Err(e @ (GetCollectionsError::RepoNotFound | GetCollectionsError::RepoGone(_)))) => { 271 - return Err(e); 262 + match describe_result { 263 + Ok(Ok(snapshot)) => { 264 + metrics::counter!("lightrail_resync_fetch_total", "source" => "describe_repo") 265 + .increment(1); 266 + return Ok(snapshot); 267 + } 268 + // Rate-limited: don't escalate to a heavier getRepo request. 269 + Ok(Err(e @ GetCollectionsError::RateLimited(_))) => return Err(e), 270 + // Definitively gone: getRepo would return the same answer. 271 + Ok(Err(e @ (GetCollectionsError::RepoNotFound | GetCollectionsError::RepoGone(_)))) => { 272 + return Err(e); 273 + } 274 + // Any other failure or timeout: fall through. The PDS may not implement 275 + // describeRepo, or may have a bug this endpoint doesn't hit. 276 + Ok(Err(_)) | Err(_) => {} 272 277 } 273 - // Any other failure or timeout: fall through. The PDS may not implement 274 - // describeRepo, or may have a bug this endpoint doesn't hit. 275 - Ok(Err(_)) | Err(_) => {} 276 278 } 277 279 278 280 let result = tokio::time::timeout(