lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

pds resolution hack (i don't like this)

phil f98d2b6c ae34c49d

+57 -17
+14
src/identity.rs
··· 330 330 self.cache.get(did) 331 331 } 332 332 333 + /// Insert a resolved identity into the cache without a network call. 334 + /// 335 + /// Used by the dispatcher to warm the cache for discovery-queue items 336 + /// whose identity was already resolved during backfill. This avoids a 337 + /// redundant network resolution in the resync worker. 338 + pub fn insert_cached( 339 + &self, 340 + did: Did<'static>, 341 + pds: Url, 342 + pubkey: PublicKey<'static>, 343 + ) -> Arc<CachedIdentity> { 344 + self.cache.insert(did, pds, pubkey) 345 + } 346 + 333 347 /// Evict `did` from the identity cache. 334 348 /// 335 349 /// Called when a `#identity` firehose event is received, after all
+16 -11
src/sync/backfill.rs
··· 15 15 use jacquard_api::com_atproto::sync::list_repos::{ListRepos, RepoStatus}; 16 16 use jacquard_common::{ 17 17 error::ClientErrorKind, 18 - types::string::Did, 18 + types::{crypto::PublicKey, string::Did}, 19 19 url::{Host, Url}, 20 20 {IntoStatic, xrpc::XrpcExt}, 21 21 }; ··· 137 137 // Cache hits are free; misses fall back to the listed host. 138 138 // 139 139 // TODO: ...this is basically redundant with validate_dids now? 140 - let dids_with_hosts: Vec<(Did<'static>, Arc<Url>, RepoState)> = { 140 + let dids_with_hosts: Vec<(Did<'static>, Arc<Url>, PublicKey<'static>, RepoState)> = { 141 141 let mut out = Vec::with_capacity(dids.len()); 142 142 for (did, account_state) in dids { 143 143 let Some(res) = token.run(resolver.resolve(&did)).await else { 144 144 return Ok(false); // cancelled 145 145 }; 146 - let pds_host = match res { 147 - Ok(resolved) => resolved.pds.clone(), 146 + let resolved = match res { 147 + Ok(resolved) => resolved, 148 148 Err(e) => { 149 149 error!(did = %did, error = %e, "failed to resolve host for validated did; skipping"); 150 150 continue; 151 151 } 152 152 }; 153 - out.push((did, pds_host, account_state)); 153 + out.push(( 154 + did, 155 + resolved.pds.clone(), 156 + resolved.pubkey.clone(), 157 + account_state, 158 + )); 154 159 } 155 160 out 156 161 }; ··· 167 172 168 173 // Push newly-discovered repos into the in-memory discovery queue. 169 174 // Each push may block if the queue is full (backpressure). 170 - for (did, pds) in new_items { 175 + for (did, pds, pubkey) in new_items { 171 176 let Some(_) = token 172 - .run(discovery_queue.push(DiscoveryItem { did, pds })) 177 + .run(discovery_queue.push(DiscoveryItem { did, pds, pubkey })) 173 178 .await 174 179 else { 175 180 return Ok(false); ··· 333 338 // Storage commit 334 339 // --------------------------------------------------------------------------- 335 340 336 - type DidWithPds = (Did<'static>, Arc<Url>); 341 + type DidWithPds = (Did<'static>, Arc<Url>, PublicKey<'static>); 337 342 338 343 /// Insert newly-seen DIDs into the repo table and persist the backfill cursor. 339 344 /// ··· 347 352 fn persist_page( 348 353 db: &DbRef, 349 354 host: &Host, 350 - items: Vec<(Did<'static>, Arc<Url>, RepoState)>, 355 + items: Vec<(Did<'static>, Arc<Url>, PublicKey<'static>, RepoState)>, 351 356 progress_cursor: String, 352 357 authoritative: bool, 353 358 ) -> Result<(u64, u64, Vec<DidWithPds>)> { 354 359 let mut new_active: Vec<DidWithPds> = Vec::new(); 355 360 let mut inactive_count: u64 = 0; 356 - for (did, pds, repo_state) in items { 361 + for (did, pds, pubkey, repo_state) in items { 357 362 if let Some(deactiveated_account_status) = repo_state.to_account_inactive() { 358 363 if write_inactive_status( 359 364 db, ··· 368 373 let newly_inserted = storage::repo::ensure_repo(db, &did)?; 369 374 if newly_inserted { 370 375 db.stats.repos_queued_total.fetch_add(1, Ordering::Relaxed); 371 - new_active.push((did, pds)); 376 + new_active.push((did, pds, pubkey)); 372 377 } 373 378 } 374 379 }
+21 -4
src/sync/discovery_queue.rs
··· 24 24 use std::sync::atomic::{AtomicUsize, Ordering}; 25 25 use std::sync::{Arc, Mutex}; 26 26 27 + use jacquard_common::types::crypto::PublicKey; 27 28 use jacquard_common::types::string::Did; 28 29 use jacquard_common::url::Url; 29 30 30 31 /// A newly-discovered repo awaiting its initial resync. 32 + /// 33 + /// Carries the resolved identity (PDS + pubkey) from backfill so the 34 + /// dispatcher can warm the identity cache without a redundant network call. 31 35 pub struct DiscoveryItem { 32 36 pub did: Did<'static>, 33 37 pub pds: Arc<Url>, 38 + pub pubkey: PublicKey<'static>, 34 39 } 35 40 36 41 pub struct DiscoveryQueue { ··· 51 56 /// Ordered list of PDS hosts for round-robin traversal. 52 57 hosts: Vec<Arc<Url>>, 53 58 /// Per-host FIFO queues of DIDs awaiting resync. 54 - queues: HashMap<Arc<Url>, VecDeque<Did<'static>>>, 59 + queues: HashMap<Arc<Url>, VecDeque<(Did<'static>, PublicKey<'static>)>>, 55 60 /// Round-robin cursor: index into `hosts` for the next pop. 56 61 cursor: usize, 57 62 /// Total items across all queues. ··· 72 77 self.hosts.push(item.pds.clone()); 73 78 self.queues.insert(item.pds.clone(), VecDeque::new()); 74 79 } 75 - self.queues.get_mut(&item.pds).unwrap().push_back(item.did); 80 + self.queues 81 + .get_mut(&item.pds) 82 + .unwrap() 83 + .push_back((item.did, item.pubkey)); 76 84 self.len += 1; 77 85 } 78 86 } ··· 151 159 .queues 152 160 .get_mut(&host) 153 161 .expect("host in ring must have a queue"); 154 - let did = queue.pop_front().expect("host in ring must have items"); 162 + let (did, pubkey) = queue.pop_front().expect("host in ring must have items"); 155 163 let host_empty = queue.is_empty(); 156 164 inner.len -= 1; 157 165 consecutive_skips = 0; ··· 167 175 inner.cursor = idx + 1; 168 176 } 169 177 170 - result.push(DiscoveryItem { did, pds: host }); 178 + result.push(DiscoveryItem { 179 + did, 180 + pds: host, 181 + pubkey, 182 + }); 171 183 } 172 184 173 185 let popped = result.len(); ··· 212 224 } 213 225 214 226 fn item(did_str: &str, pds_str: &str) -> DiscoveryItem { 227 + use jacquard_common::types::crypto::KeyCodec; 215 228 DiscoveryItem { 216 229 did: did(did_str), 217 230 pds: url(pds_str), 231 + pubkey: PublicKey { 232 + codec: KeyCodec::Ed25519, 233 + bytes: std::borrow::Cow::Owned(vec![0u8; 32]), 234 + }, 218 235 } 219 236 } 220 237
+6 -2
src/sync/resync/dispatcher.rs
··· 239 239 continue; 240 240 } 241 241 242 + // Warm the identity cache so the worker gets a cache hit 243 + // instead of a redundant network resolution. 244 + resolver.insert_cached(did.clone(), (*dq_item.pds).clone(), dq_item.pubkey); 245 + 242 246 busy.insert(did.clone()); 243 247 let item = ResyncItem { 244 248 did: dq_item.did, ··· 366 370 let host = resolver 367 371 .resolve_cached(did) 368 372 .and_then(|r| r.pds.host_str().map(str::to_owned)) 369 - .unwrap_or_default(); 373 + .unwrap_or_else(|| "(resolving)".to_owned()); 370 374 BusyEntry { 371 375 did: did.to_string(), 372 376 host, ··· 390 394 let host = resolver 391 395 .resolve_cached(did) 392 396 .and_then(|r| r.pds.host_str().map(str::to_owned)) 393 - .unwrap_or_default(); 397 + .unwrap_or_else(|| "(resolving)".to_owned()); 394 398 *counts.entry(host).or_default() += 1; 395 399 } 396 400 let mut entries: Vec<_> = counts