lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

handle repo not found and fix some cow stuff

phil 20eafce2 cfdabe16

+329 -208
+7
hacking.md
··· 15 15 $ cargo test && cargo fmt && cargo clippy 16 16 ``` 17 17 18 + - prefer local error enums over generic string-y errors 19 + 20 + - for jacquard's CoW-wrapped types: 21 + - accept borrows like `fn do_thing(did: &Did<'_>)`. 22 + - accept possibly-needing ownership like `fn do_thing(did: Did<'_>)`. 23 + - accept full ownership like `fn do_thing(did: Did<'static>)`. 24 + 18 25 - TODO: configure tangled CI to run these on PR 19 26 20 27
+1 -1
src/examples/enqueue_resync.rs
··· 35 35 36 36 for raw in &args.dids { 37 37 let did = Did::new_owned(raw.clone())?; 38 - let inserted = storage::repo::ensure_repo(&db, did.clone())?; 38 + let inserted = storage::repo::ensure_repo(&db, &did)?; 39 39 let item = ResyncItem { 40 40 did: did.clone(), 41 41 retry_count: 0,
+16 -12
src/identity.rs
··· 15 15 //! fallback path built into jacquard-identity). When a PLC fallback is also 16 16 //! configured, any error from the primary resolver retries against it. 17 17 18 + use jacquard_common::IntoStatic; 18 19 use std::sync::Arc; 19 20 use std::time::Duration; 20 21 ··· 112 113 113 114 /// Capacity-bounded, TTL-expiring cache from DID strings to resolved identities. 114 115 struct IdentityCache { 115 - entries: Cache<String, Arc<CachedIdentity>>, 116 + entries: Cache<Did<'static>, Arc<CachedIdentity>>, 116 117 interner: UrlInterner, 117 118 } 118 119 ··· 127 128 } 128 129 } 129 130 130 - fn get(&self, did: &str) -> Option<Arc<CachedIdentity>> { 131 - self.entries.get(&did.to_owned()) 131 + fn get(&self, did: &Did<'_>) -> Option<Arc<CachedIdentity>> { 132 + self.entries.get(&did.clone().into_static()) 132 133 } 133 134 134 - fn insert(&self, did: &str, pds: Url, pubkey: PublicKey<'static>) -> Arc<CachedIdentity> { 135 + fn insert( 136 + &self, 137 + did: Did<'static>, 138 + pds: Url, 139 + pubkey: PublicKey<'static>, 140 + ) -> Arc<CachedIdentity> { 135 141 let pds = self.interner.intern(pds); 136 142 let identity = Arc::new(CachedIdentity { pds, pubkey }); 137 - self.entries.insert(did.to_owned(), Arc::clone(&identity)); 143 + self.entries.insert(did, Arc::clone(&identity)); 138 144 identity 139 145 } 140 146 141 - fn invalidate(&self, did: &str) { 142 - self.entries.invalidate(&did.to_owned()); 147 + fn invalidate(&self, did: &Did<'_>) { 148 + self.entries.invalidate(&did.clone().into_static()); 143 149 } 144 150 } 145 151 ··· 163 169 /// Returns a cached [`CachedIdentity`] on cache hit; otherwise resolves 164 170 /// via the primary (and optionally fallback) resolver and caches the result. 165 171 pub async fn resolve(&self, did: &Did<'_>) -> Result<Arc<CachedIdentity>, IdentityError> { 166 - let did_str = did.as_str(); 167 - 168 - if let Some(cached) = self.cache.get(did_str) { 172 + if let Some(cached) = self.cache.get(did) { 169 173 metrics::counter!("lightrail_identity_resolution_total", "outcome" => "hit") 170 174 .increment(1); 171 175 return Ok(cached); ··· 175 179 Ok((pds, pubkey)) => { 176 180 metrics::counter!("lightrail_identity_resolution_total", "outcome" => "miss") 177 181 .increment(1); 178 - Ok(self.cache.insert(did_str, pds, pubkey)) 182 + Ok(self.cache.insert(did.clone().into_static(), pds, pubkey)) 179 183 } 180 184 Err(e) => { 181 185 metrics::counter!("lightrail_identity_resolution_total", "outcome" => "error") ··· 191 195 /// preceding commits for the same DID have been processed, so the next 192 196 /// resolution fetches the updated key and PDS endpoint. 193 197 pub fn invalidate_did(&self, did: &Did<'_>) { 194 - self.cache.invalidate(did.as_str()); 198 + self.cache.invalidate(did); 195 199 } 196 200 197 201 async fn resolve_uncached(
+1 -1
src/server/get_repo_status.rs
··· 66 66 let (info, prev) = tokio::task::spawn_blocking({ 67 67 let db = db.clone(); 68 68 let did = did.clone(); 69 - move || crate::storage::repo::get(&db, did) 69 + move || crate::storage::repo::get(&db, &did) 70 70 }) 71 71 .await 72 72 .map_err(|_| GetRepoStatusError::StorageError)??
+1 -1
src/server/list_repos.rs
··· 80 80 81 81 let (entries, next) = tokio::task::spawn_blocking({ 82 82 let db = db.clone(); 83 - move || crate::storage::repo::scan_repos(&db, cursor, limit) 83 + move || crate::storage::repo::scan_repos(&db, cursor.as_ref(), limit) 84 84 }) 85 85 .await 86 86 .map_err(|_| ListReposError::StorageError)??;
+8 -1
src/server/list_repos_by_collection.rs
··· 80 80 let collection = req.collection.into_static(); 81 81 let (dids, next) = tokio::task::spawn_blocking({ 82 82 let db = db.clone(); 83 - move || crate::storage::collection_index::scan_rbc_active(&db, collection, cursor, limit) 83 + move || { 84 + crate::storage::collection_index::scan_rbc_active( 85 + &db, 86 + collection, 87 + cursor.as_ref(), 88 + limit, 89 + ) 90 + } 84 91 }) 85 92 .await 86 93 .map_err(|_| ListReposByCollectionError::StorageError)??;
+63 -63
src/storage/collection_index.rs
··· 23 23 // --------------------------------------------------------------------------- 24 24 25 25 /// `"rbc"<collection>\0<did>` — main collection index. 26 - fn rbc(collection: Nsid<'_>, did: Did<'_>) -> Vec<u8> { 26 + fn rbc(collection: Nsid<'_>, did: &Did<'_>) -> Vec<u8> { 27 27 let col = collection.as_str(); 28 28 let d = did.as_str(); 29 29 let mut key = Vec::with_capacity(PREFIX_RBC.len() + col.len() + 1 + d.len()); ··· 45 45 } 46 46 47 47 /// The DID bytes, suitable for appending to [`rbc_prefix`] to form a full key. 48 - fn rbc_suffix(did: Did<'_>) -> Vec<u8> { 48 + fn rbc_suffix(did: &Did<'_>) -> Vec<u8> { 49 49 did.as_str().as_bytes().to_vec() 50 50 } 51 51 ··· 67 67 } 68 68 69 69 /// `"cbr"<did>\0<collection>` — reversed index for per-repo collection lookup. 70 - fn cbr(did: Did<'_>, collection: Nsid<'_>) -> Vec<u8> { 70 + fn cbr(did: &Did<'_>, collection: Nsid<'_>) -> Vec<u8> { 71 71 let d = did.as_str(); 72 72 let col = collection.as_str(); 73 73 let mut key = Vec::with_capacity(PREFIX_CBR.len() + d.len() + 1 + col.len()); ··· 79 79 } 80 80 81 81 /// `"cbr"<did>\0` — prefix for scanning all collections for a repo. 82 - fn cbr_prefix(did: Did<'_>) -> Vec<u8> { 82 + fn cbr_prefix(did: &Did<'_>) -> Vec<u8> { 83 83 let d = did.as_str(); 84 84 let mut key = Vec::with_capacity(PREFIX_CBR.len() + d.len() + 1); 85 85 key.extend_from_slice(&PREFIX_CBR); ··· 105 105 pub fn scan_rbc( 106 106 db: &DbRef, 107 107 collection: Nsid<'_>, 108 - cursor: Option<Did<'_>>, 108 + cursor: Option<&Did<'_>>, 109 109 limit: usize, 110 110 ) -> StorageResult<(Vec<Did<'static>>, Option<Did<'static>>)> { 111 111 let prefix = rbc_prefix(collection.clone()); 112 112 let prefix_len = prefix.len(); 113 113 114 - let lower_did = cursor.clone().map(rbc_suffix).unwrap_or(vec![]); 114 + let lower_did = cursor.map(rbc_suffix).unwrap_or(vec![]); 115 115 let mut ranger = db.ks.range(prefixed_range(prefix.clone(), lower_did..)); 116 116 117 117 let mut dids = Vec::with_capacity(limit); ··· 144 144 /// exposed now?) or maybe use prefix + seek for the start? 145 145 pub fn scan_cbr( 146 146 db: &DbRef, 147 - did: Did<'_>, 147 + did: &Did<'_>, 148 148 cursor: Option<Nsid<'_>>, 149 149 limit: usize, 150 150 ) -> StorageResult<Vec<Nsid<'static>>> { 151 - let prefix = cbr_prefix(did.clone()); 151 + let prefix = cbr_prefix(did); 152 152 let prefix_len = prefix.len(); 153 153 154 154 let start_key: Vec<u8> = match cursor { 155 155 None => prefix.clone(), 156 - Some(ref col) => cbr(did.clone(), col.clone()), 156 + Some(ref col) => cbr(did, col.clone()), 157 157 }; 158 158 159 159 let mut cols = Vec::with_capacity(limit); ··· 168 168 // Skip the cursor key itself. 169 169 if cursor 170 170 .clone() 171 - .is_some_and(|c| k.as_ref() == cbr(did.clone(), c).as_slice()) 171 + .is_some_and(|c| k.as_ref() == cbr(did, c).as_slice()) 172 172 { 173 173 continue; 174 174 } ··· 186 186 pub fn insert_into( 187 187 batch: &mut fjall::OwnedWriteBatch, 188 188 db: &DbRef, 189 - did: Did<'_>, 189 + did: &Did<'_>, 190 190 collection: Nsid<'_>, 191 191 ) { 192 - batch.insert(&db.ks, rbc(collection.clone(), did.clone()), b""); 192 + batch.insert(&db.ks, rbc(collection.clone(), did), b""); 193 193 batch.insert(&db.ks, cbr(did, collection), b""); 194 194 } 195 195 ··· 200 200 pub fn remove_into( 201 201 batch: &mut fjall::OwnedWriteBatch, 202 202 db: &DbRef, 203 - did: Did<'_>, 203 + did: &Did<'_>, 204 204 collection: Nsid<'_>, 205 205 ) { 206 - batch.remove(&db.ks, rbc(collection.clone(), did.clone())); 206 + batch.remove(&db.ks, rbc(collection.clone(), did)); 207 207 batch.remove(&db.ks, cbr(did, collection)); 208 208 } 209 209 210 210 /// Return `true` if `did` has `collection` in the cbr index. 211 - pub fn has_collection(db: &DbRef, did: Did<'_>, collection: Nsid<'_>) -> StorageResult<bool> { 211 + pub fn has_collection(db: &DbRef, did: &Did<'_>, collection: Nsid<'_>) -> StorageResult<bool> { 212 212 Ok(db.ks.get(cbr(did, collection))?.is_some()) 213 213 } 214 214 ··· 221 221 pub fn scan_rbc_active( 222 222 db: &DbRef, 223 223 collection: Nsid<'_>, 224 - cursor: Option<Did<'_>>, 224 + cursor: Option<&Did<'_>>, 225 225 limit: usize, 226 226 ) -> StorageResult<(Vec<Did<'static>>, Option<Did<'static>>)> { 227 227 use crate::storage::repo::{AccountStatus, get_status}; 228 228 229 229 let prefix = rbc_prefix(collection.clone()); 230 230 let prefix_len = prefix.len(); 231 - let lower_did = cursor.map(rbc_suffix).unwrap_or_default(); 231 + let lower_did = cursor.map(rbc_suffix).unwrap_or(vec![]); 232 232 let mut ranger = db.ks.range(prefixed_range(prefix.clone(), lower_did..)); 233 233 234 234 let mut dids = Vec::with_capacity(limit); ··· 236 236 let (k, _v) = guard.into_inner()?; 237 237 let did = rbc_parse_did(&k, prefix_len)?; 238 238 239 - if !matches!(get_status(db, did.clone())?, Some(AccountStatus::Active)) { 239 + if !matches!(get_status(db, &did)?, Some(AccountStatus::Active)) { 240 240 continue; 241 241 } 242 242 ··· 257 257 } 258 258 259 259 /// Insert a `(collection, did)` pair into both the rbc and cbr indexes. 260 - pub fn insert(db: &DbRef, did: Did<'_>, collection: Nsid<'_>) -> StorageResult<()> { 260 + pub fn insert(db: &DbRef, did: &Did<'_>, collection: Nsid<'_>) -> StorageResult<()> { 261 261 let mut batch = db.database.batch(); 262 262 insert_into(&mut batch, db, did, collection); 263 263 batch.commit()?; ··· 265 265 } 266 266 267 267 /// Remove a `(collection, did)` pair from both indexes. 268 - pub fn remove(db: &DbRef, did: Did<'_>, collection: Nsid<'_>) -> StorageResult<()> { 268 + pub fn remove(db: &DbRef, did: &Did<'_>, collection: Nsid<'_>) -> StorageResult<()> { 269 269 let mut batch = db.database.batch(); 270 270 remove_into(&mut batch, db, did, collection); 271 271 batch.commit()?; ··· 284 284 pub fn remove_all_into( 285 285 batch: &mut fjall::OwnedWriteBatch, 286 286 db: &DbRef, 287 - did: Did<'_>, 287 + did: &Did<'_>, 288 288 ) -> StorageResult<usize> { 289 - let prefix = cbr_prefix(did.clone()); 289 + let prefix = cbr_prefix(did); 290 290 let collections: Vec<Vec<u8>> = db 291 291 .ks 292 292 .prefix(prefix.clone()) ··· 295 295 let prefix_len = prefix.len(); 296 296 for cbr_key in &collections { 297 297 if let Some(col) = cbr_parse_collection(cbr_key, prefix_len) { 298 - batch.remove(&db.ks, rbc(col, did.clone())); 298 + batch.remove(&db.ks, rbc(col, did)); 299 299 } 300 300 batch.remove(&db.ks, cbr_key.as_slice()); 301 301 } 302 302 Ok(collections.len()) 303 303 } 304 304 305 - pub fn remove_all(db: &DbRef, did: Did<'_>) -> StorageResult<()> { 305 + pub fn remove_all(db: &DbRef, did: &Did<'_>) -> StorageResult<()> { 306 306 let mut batch = db.database.batch(); 307 - remove_all_into(&mut batch, db, did.clone())?; 307 + remove_all_into(&mut batch, db, did)?; 308 308 batch.commit()?; 309 309 Ok(()) 310 310 } ··· 322 322 /// Typically called as `sync_collections(db, did, &snapshot.collections)`. 323 323 pub fn sync_collections( 324 324 db: &DbRef, 325 - did: Did<'_>, 325 + did: &Did<'_>, 326 326 collections: &[Nsid<'static>], 327 327 ) -> StorageResult<(usize, usize)> { 328 328 // Read the current set from the cbr index. Fjall iterates in key order, and 329 329 // the cbr prefix puts the collection NSID last, so the suffix scan is already 330 330 // sorted lexicographically. 331 - let prefix = cbr_prefix(did.clone()); 331 + let prefix = cbr_prefix(did); 332 332 let prefix_len = prefix.len(); 333 333 let existing: Vec<Nsid<'static>> = db 334 334 .ks ··· 363 363 // Peek borrows are dropped here; safe to call next() below. 364 364 match ord { 365 365 Ordering::Less => { 366 - remove_into(&mut batch, db, did.clone(), ei.next().unwrap()); 366 + remove_into(&mut batch, db, did, ei.next().unwrap()); 367 367 n_removed += 1; 368 368 } 369 369 Ordering::Greater => { 370 - insert_into(&mut batch, db, did.clone(), si.next().unwrap()); 370 + insert_into(&mut batch, db, did, si.next().unwrap()); 371 371 n_inserted += 1; 372 372 } 373 373 Ordering::Equal => { ··· 399 399 } 400 400 401 401 fn collections_for(db: &crate::storage::DbRef, d: &Did<'static>) -> Vec<Nsid<'static>> { 402 - scan_cbr(db, d.clone(), None, 1000).unwrap() 402 + scan_cbr(db, d, None, 1000).unwrap() 403 403 } 404 404 405 405 // --- key encoding --- 406 406 407 407 #[test] 408 408 fn rbc_structure() { 409 - let key = rbc(nsid("app.bsky.feed.post"), did("did:web:example.com")); 409 + let key = rbc(nsid("app.bsky.feed.post"), &did("did:web:example.com")); 410 410 assert_eq!(key, b"rbcapp.bsky.feed.post\0did:web:example.com"); 411 411 } 412 412 413 413 #[test] 414 414 fn rbc_prefix_matches_full_key() { 415 415 let prefix = rbc_prefix(nsid("app.bsky.feed.post")); 416 - let key = rbc(nsid("app.bsky.feed.post"), did("did:web:example.com")); 416 + let key = rbc(nsid("app.bsky.feed.post"), &did("did:web:example.com")); 417 417 assert!(key.starts_with(&prefix)); 418 418 } 419 419 420 420 #[test] 421 421 fn rbc_prefix_does_not_match_different_collection() { 422 422 let prefix = rbc_prefix(nsid("app.bsky.feed.post")); 423 - let key = rbc(nsid("app.bsky.feed.like"), did("did:web:example.com")); 423 + let key = rbc(nsid("app.bsky.feed.like"), &did("did:web:example.com")); 424 424 assert!(!key.starts_with(&prefix)); 425 425 } 426 426 ··· 428 428 #[test] 429 429 fn rbc_prefix_not_confused_by_collection_that_is_a_prefix() { 430 430 let prefix = rbc_prefix(nsid("app.bsky.feed")); 431 - let key = rbc(nsid("app.bsky.feed.post"), did("did:web:example.com")); 431 + let key = rbc(nsid("app.bsky.feed.post"), &did("did:web:example.com")); 432 432 assert!(!key.starts_with(&prefix)); 433 433 } 434 434 435 435 #[test] 436 436 fn rbc_sorts_by_collection_then_did() { 437 - let a = rbc(nsid("app.bsky.actor.profile"), did("did:web:a.com")); 438 - let b = rbc(nsid("app.bsky.feed.post"), did("did:web:a.com")); 439 - let c = rbc(nsid("app.bsky.feed.post"), did("did:web:b.com")); 437 + let a = rbc(nsid("app.bsky.actor.profile"), &did("did:web:a.com")); 438 + let b = rbc(nsid("app.bsky.feed.post"), &did("did:web:a.com")); 439 + let c = rbc(nsid("app.bsky.feed.post"), &did("did:web:b.com")); 440 440 assert!(a < b, "earlier collection should sort first"); 441 441 assert!(b < c, "same collection: earlier DID should sort first"); 442 442 } ··· 445 445 fn rbc_parse_did_roundtrips() { 446 446 let col = nsid("app.bsky.feed.post"); 447 447 let d = did("did:web:example.com"); 448 - let key = rbc(col.clone(), d.clone()); 448 + let key = rbc(col.clone(), &d); 449 449 let prefix_len = rbc_prefix(col).len(); 450 450 assert_eq!(rbc_parse_did(&key, prefix_len), Ok(d)); 451 451 } ··· 464 464 465 465 #[test] 466 466 fn cbr_structure() { 467 - let key = cbr(did("did:web:example.com"), nsid("app.bsky.feed.post")); 467 + let key = cbr(&did("did:web:example.com"), nsid("app.bsky.feed.post")); 468 468 assert_eq!(key, b"cbrdid:web:example.com\0app.bsky.feed.post"); 469 469 } 470 470 471 471 #[test] 472 472 fn cbr_prefix_matches_full_key() { 473 - let prefix = cbr_prefix(did("did:web:example.com")); 474 - let key = cbr(did("did:web:example.com"), nsid("app.bsky.feed.post")); 473 + let prefix = cbr_prefix(&did("did:web:example.com")); 474 + let key = cbr(&did("did:web:example.com"), nsid("app.bsky.feed.post")); 475 475 assert!(key.starts_with(&prefix)); 476 476 } 477 477 478 478 #[test] 479 479 fn cbr_prefix_does_not_match_different_did() { 480 - let prefix = cbr_prefix(did("did:web:example.com")); 481 - let key = cbr(did("did:web:other.com"), nsid("app.bsky.feed.post")); 480 + let prefix = cbr_prefix(&did("did:web:example.com")); 481 + let key = cbr(&did("did:web:other.com"), nsid("app.bsky.feed.post")); 482 482 assert!(!key.starts_with(&prefix)); 483 483 } 484 484 ··· 486 486 fn cbr_parse_collection_roundtrips() { 487 487 let d = did("did:web:example.com"); 488 488 let col = nsid("app.bsky.feed.post"); 489 - let key = cbr(d.clone(), col.clone()); 490 - let prefix_len = cbr_prefix(d).len(); 489 + let key = cbr(&d, col.clone()); 490 + let prefix_len = cbr_prefix(&d).len(); 491 491 assert_eq!(cbr_parse_collection(&key, prefix_len), Some(col)); 492 492 } 493 493 ··· 496 496 let col = nsid("app.bsky.feed.post"); 497 497 let d = did("did:web:example.com"); 498 498 let mut assembled = rbc_prefix(col.clone()); 499 - assembled.extend_from_slice(&rbc_suffix(d.clone())); 500 - assert_eq!(assembled, rbc(col, d)); 499 + assembled.extend_from_slice(&rbc_suffix(&d)); 500 + assert_eq!(assembled, rbc(col, &d)); 501 501 } 502 502 503 503 // --- CRUD --- ··· 509 509 510 510 sync_collections( 511 511 &db, 512 - d.clone(), 512 + &d, 513 513 &[nsid("app.bsky.actor.profile"), nsid("app.bsky.feed.post")], 514 514 ) 515 515 .unwrap(); ··· 525 525 let db = open_temporary().unwrap(); 526 526 let d = did("did:web:alice.com"); 527 527 528 - insert(&db, d.clone(), nsid("app.bsky.actor.profile")).unwrap(); 529 - insert(&db, d.clone(), nsid("app.bsky.feed.post")).unwrap(); 530 - insert(&db, d.clone(), nsid("app.bsky.graph.follow")).unwrap(); 528 + insert(&db, &d, nsid("app.bsky.actor.profile")).unwrap(); 529 + insert(&db, &d, nsid("app.bsky.feed.post")).unwrap(); 530 + insert(&db, &d, nsid("app.bsky.graph.follow")).unwrap(); 531 531 532 - sync_collections(&db, d.clone(), &[]).unwrap(); 532 + sync_collections(&db, &d, &[]).unwrap(); 533 533 534 534 assert_eq!(collections_for(&db, &d), vec![]); 535 535 } ··· 541 541 542 542 let cols = [nsid("app.bsky.feed.post"), nsid("app.bsky.graph.follow")]; 543 543 for col in &cols { 544 - insert(&db, d.clone(), col.clone()).unwrap(); 544 + insert(&db, &d, col.clone()).unwrap(); 545 545 } 546 546 547 - sync_collections(&db, d.clone(), &cols).unwrap(); 547 + sync_collections(&db, &d, &cols).unwrap(); 548 548 549 549 assert_eq!(collections_for(&db, &d), cols.to_vec()); 550 550 } ··· 554 554 let db = open_temporary().unwrap(); 555 555 let d = did("did:web:alice.com"); 556 556 557 - insert(&db, d.clone(), nsid("app.bsky.actor.profile")).unwrap(); 558 - insert(&db, d.clone(), nsid("app.bsky.feed.post")).unwrap(); 557 + insert(&db, &d, nsid("app.bsky.actor.profile")).unwrap(); 558 + insert(&db, &d, nsid("app.bsky.feed.post")).unwrap(); 559 559 560 560 sync_collections( 561 561 &db, 562 - d.clone(), 562 + &d, 563 563 &[nsid("app.bsky.feed.post"), nsid("app.bsky.graph.follow")], 564 564 ) 565 565 .unwrap(); ··· 575 575 let db = open_temporary().unwrap(); 576 576 let d = did("did:web:alice.com"); 577 577 578 - insert(&db, d.clone(), nsid("app.bsky.actor.profile")).unwrap(); 579 - insert(&db, d.clone(), nsid("app.bsky.feed.post")).unwrap(); 578 + insert(&db, &d, nsid("app.bsky.actor.profile")).unwrap(); 579 + insert(&db, &d, nsid("app.bsky.feed.post")).unwrap(); 580 580 581 581 sync_collections( 582 582 &db, 583 - d.clone(), 583 + &d, 584 584 &[nsid("app.bsky.feed.post"), nsid("app.bsky.graph.follow")], 585 585 ) 586 586 .unwrap(); ··· 608 608 let alice = did("did:web:alice.com"); 609 609 let bob = did("did:web:bob.com"); 610 610 611 - insert(&db, alice.clone(), nsid("app.bsky.feed.post")).unwrap(); 612 - insert(&db, bob.clone(), nsid("app.bsky.feed.post")).unwrap(); 611 + insert(&db, &alice, nsid("app.bsky.feed.post")).unwrap(); 612 + insert(&db, &bob, nsid("app.bsky.feed.post")).unwrap(); 613 613 614 - sync_collections(&db, alice.clone(), &[]).unwrap(); 614 + sync_collections(&db, &alice, &[]).unwrap(); 615 615 616 616 assert_eq!(collections_for(&db, &alice), vec![]); 617 617 assert_eq!(collections_for(&db, &bob), vec![nsid("app.bsky.feed.post")]);
+34 -27
src/storage/repo.rs
··· 19 19 /// `PREFIX_REPO<did>` — per-repo state entry. 20 20 /// 21 21 /// Exposed `pub(crate)` so `resync_queue` can use it in atomic batches. 22 - pub(crate) fn key(did: Did<'_>) -> Vec<u8> { 22 + pub(crate) fn key(did: &Did<'_>) -> Vec<u8> { 23 23 let d = did.as_str(); 24 24 let mut k = Vec::with_capacity(PREFIX_REPO.len() + d.len()); 25 25 k.extend_from_slice(&PREFIX_REPO); ··· 28 28 } 29 29 30 30 /// `PREFIX_REPO_PREV<did>` — per-repo transient sync state. 31 - fn prev_key(did: Did<'_>) -> Vec<u8> { 31 + fn prev_key(did: &Did<'_>) -> Vec<u8> { 32 32 let d = did.as_str(); 33 33 let mut k = Vec::with_capacity(PREFIX_REPO_PREV.len() + d.len()); 34 34 k.extend_from_slice(&PREFIX_REPO_PREV); ··· 47 47 Suspended, 48 48 Deactivated, 49 49 Error, 50 + /// The PDS returned a definitive "no repo for this DID" response. 51 + /// The DID identity exists but the PDS doesn't have a repo for it — 52 + /// either it was never created, the PDS pointer is stale, or the account 53 + /// is mid-migration. 54 + NotFound, 50 55 } 51 56 52 57 impl RepoState { ··· 60 65 RepoState::Suspended => "suspended", 61 66 RepoState::Deactivated => "deactivated", 62 67 RepoState::Error => "error", 68 + RepoState::NotFound => "notFound", 63 69 } 64 70 } 65 71 ··· 73 79 "suspended" => RepoState::Suspended, 74 80 "deactivated" => RepoState::Deactivated, 75 81 "error" => RepoState::Error, 82 + "notFound" => RepoState::NotFound, 76 83 _ => return None, 77 84 }) 78 85 } ··· 226 233 /// skips the `RepoPrev` read and doesn't take a snapshot. 227 234 /// 228 235 /// Returns `None` if the repo is not indexed. 229 - pub fn get_status(db: &DbRef, did: Did<'_>) -> StorageResult<Option<AccountStatus>> { 236 + pub fn get_status(db: &DbRef, did: &Did<'_>) -> StorageResult<Option<AccountStatus>> { 230 237 let k = key(did); 231 238 let Some(bytes) = db.ks.get(&k)? else { 232 239 return Ok(None); ··· 241 248 /// Returns `true` if a new record was inserted, `false` if one already existed. 242 249 /// Two concurrent callers may both see no record and both insert — the second 243 250 /// write is identical to the first, so this is safe without additional locking. 244 - pub fn ensure_repo(db: &DbRef, did: Did<'_>) -> StorageResult<bool> { 251 + pub fn ensure_repo(db: &DbRef, did: &Did<'_>) -> StorageResult<bool> { 245 252 let key = key(did); 246 253 if db.ks.get(&key)?.is_some() { 247 254 return Ok(false); ··· 258 265 /// Retrieve both [`RepoInfo`] and [`RepoPrev`] for a `did`. 259 266 /// 260 267 /// `None` if the repo is not indexed. 261 - pub fn get(db: &DbRef, did: Did<'_>) -> StorageResult<Option<(RepoInfo, Option<RepoPrev>)>> { 262 - let info_key = key(did.clone()); 268 + pub fn get(db: &DbRef, did: &Did<'_>) -> StorageResult<Option<(RepoInfo, Option<RepoPrev>)>> { 269 + let info_key = key(did); 263 270 let prev_key = prev_key(did); 264 271 let snapshot = db.database.snapshot(); 265 272 let Some(info_bytes) = snapshot.get(&db.ks, &info_key)? else { ··· 281 288 pub fn put_info_into( 282 289 batch: &mut fjall::OwnedWriteBatch, 283 290 db: &DbRef, 284 - did: Did<'_>, 291 + did: &Did<'_>, 285 292 info: &RepoInfo, 286 293 ) { 287 294 batch.insert(&db.ks, key(did), encode_repo_info(info)); 288 295 } 289 296 290 297 /// Write a [`RepoInfo`] for `did`. 291 - pub fn put_info(db: &DbRef, did: Did<'_>, info: &RepoInfo) -> StorageResult<()> { 298 + pub fn put_info(db: &DbRef, did: &Did<'_>, info: &RepoInfo) -> StorageResult<()> { 292 299 let mut batch = db.database.batch(); 293 300 put_info_into(&mut batch, db, did, info); 294 301 batch.commit()?; ··· 296 303 } 297 304 298 305 /// Read the transient [`RepoPrev`] for `did`. 299 - pub fn get_prev(db: &DbRef, did: Did<'_>) -> StorageResult<Option<RepoPrev>> { 306 + pub fn get_prev(db: &DbRef, did: &Did<'_>) -> StorageResult<Option<RepoPrev>> { 300 307 let key = prev_key(did); 301 308 db.ks 302 309 .get(&key)? ··· 311 318 pub fn put_prev_into( 312 319 batch: &mut fjall::OwnedWriteBatch, 313 320 db: &DbRef, 314 - did: Did<'_>, 321 + did: &Did<'_>, 315 322 prev: &RepoPrev, 316 323 ) { 317 324 batch.insert(&db.ks, prev_key(did), encode_repo_prev(prev)); 318 325 } 319 326 320 327 /// Write the transient [`RepoPrev`] for `did`. 321 - pub fn put_prev(db: &DbRef, did: Did<'_>, prev: &RepoPrev) -> StorageResult<()> { 328 + pub fn put_prev(db: &DbRef, did: &Did<'_>, prev: &RepoPrev) -> StorageResult<()> { 322 329 let key = prev_key(did); 323 330 db.ks.insert(key, encode_repo_prev(prev))?; 324 331 Ok(()) 325 332 } 326 333 327 334 /// Delete the transient [`RepoPrev`] for `did`. 328 - pub fn delete_prev(db: &DbRef, did: Did<'_>) -> StorageResult<()> { 335 + pub fn delete_prev(db: &DbRef, did: &Did<'_>) -> StorageResult<()> { 329 336 let key = prev_key(did); 330 337 db.ks.remove(key)?; 331 338 Ok(()) ··· 340 347 /// that have not yet been synced have `None` for the [`RepoPrev`] field. 341 348 pub fn scan_repos( 342 349 db: &DbRef, 343 - cursor: Option<Did<'_>>, 350 + cursor: Option<&Did<'_>>, 344 351 limit: usize, 345 352 ) -> StorageResult<( 346 353 Vec<(Did<'static>, RepoInfo, Option<RepoPrev>)>, ··· 353 360 354 361 let start_key: Vec<u8> = { 355 362 let mut k = PREFIX_REPO.to_vec(); 356 - if let Some(ref did) = cursor { 363 + if let Some(did) = cursor { 357 364 k.extend_from_slice(did.as_str().as_bytes()); 358 365 } 359 366 k ··· 377 384 })?; 378 385 let key_str = String::from_utf8_lossy(&k).into_owned(); 379 386 let info = decode_repo_info(&v, &key_str)?; 380 - let pk = prev_key(did.clone()); 387 + let pk = prev_key(&did); 381 388 let prev = db 382 389 .ks 383 390 .get(&pk)? ··· 473 480 fn ensure_repo_inserts_pending_when_absent() { 474 481 let db = open_temporary().unwrap(); 475 482 let d = did("did:web:example.com"); 476 - let inserted = ensure_repo(&db, d.clone()).unwrap(); 483 + let inserted = ensure_repo(&db, &d).unwrap(); 477 484 assert!(inserted); 478 - let (info, _) = get(&db, d.clone()).unwrap().unwrap(); 485 + let (info, _) = get(&db, &d).unwrap().unwrap(); 479 486 assert_eq!(info.state, RepoState::Pending); 480 487 assert_eq!(info.status, AccountStatus::Active); 481 488 } ··· 486 493 let d = did("did:web:example.com"); 487 494 put_info( 488 495 &db, 489 - d.clone(), 496 + &d, 490 497 &RepoInfo { 491 498 state: RepoState::Active, 492 499 status: AccountStatus::Suspended, ··· 494 501 }, 495 502 ) 496 503 .unwrap(); 497 - let inserted = ensure_repo(&db, d.clone()).unwrap(); 504 + let inserted = ensure_repo(&db, &d).unwrap(); 498 505 assert!(!inserted); 499 - let (info, _) = get(&db, d).unwrap().unwrap(); 506 + let (info, _) = get(&db, &d).unwrap().unwrap(); 500 507 assert_eq!(info.state, RepoState::Active); // unchanged 501 508 assert_eq!(info.status, AccountStatus::Suspended); // unchanged 502 509 } ··· 510 517 status: AccountStatus::Active, 511 518 error: None, 512 519 }; 513 - put_info(&db, d.clone(), &info).unwrap(); 514 - let (retrieved, prev) = get(&db, d).unwrap().unwrap(); 520 + put_info(&db, &d, &info).unwrap(); 521 + let (retrieved, prev) = get(&db, &d).unwrap().unwrap(); 515 522 assert_eq!(retrieved.state, RepoState::Active); 516 523 assert!(prev.is_none()); 517 524 } ··· 523 530 // Need a RepoInfo first for get() to return Some. 524 531 put_info( 525 532 &db, 526 - d.clone(), 533 + &d, 527 534 &RepoInfo { 528 535 state: RepoState::Active, 529 536 status: AccountStatus::Active, ··· 535 542 rev: tid("3lczouzaqmo2e"), 536 543 prev_data: vec![1, 2, 3, 4], 537 544 }; 538 - put_prev(&db, d.clone(), &prev).unwrap(); 539 - let (_, stored_prev) = get(&db, d.clone()).unwrap().unwrap(); 545 + put_prev(&db, &d, &prev).unwrap(); 546 + let (_, stored_prev) = get(&db, &d).unwrap().unwrap(); 540 547 let stored_prev = stored_prev.unwrap(); 541 548 assert_eq!(stored_prev.rev.as_str(), "3lczouzaqmo2e"); 542 549 assert_eq!(stored_prev.prev_data, vec![1, 2, 3, 4]); 543 550 544 - delete_prev(&db, d.clone()).unwrap(); 545 - let (_, no_prev) = get(&db, d).unwrap().unwrap(); 551 + delete_prev(&db, &d).unwrap(); 552 + let (_, no_prev) = get(&db, &d).unwrap().unwrap(); 546 553 assert!(no_prev.is_none()); 547 554 } 548 555 }
+22 -22
src/storage/resync_queue.rs
··· 24 24 /// `"rsq"<ts_be:u64>\0<did>` — timestamp-ordered resync queue. 25 25 /// 26 26 /// Big-endian timestamp gives natural chronological ordering. 27 - fn key(ts: u64, did: Did<'_>) -> Vec<u8> { 27 + fn key(ts: u64, did: &Did<'_>) -> Vec<u8> { 28 28 let d = did.as_str(); 29 29 let mut k = Vec::with_capacity(PREFIX_RESYNC_QUEUE.len() + 8 + 1 + d.len()); 30 30 k.extend_from_slice(&PREFIX_RESYNC_QUEUE); ··· 163 163 "enqueue resync to batch" 164 164 ); 165 165 } 166 - batch.insert(&db.ks, key(ts, item.did.clone()), encode(item)); 166 + batch.insert(&db.ks, key(ts, &item.did), encode(item)); 167 167 } 168 168 169 169 /// Enqueue a repo for resync at the given Unix timestamp (seconds). ··· 238 238 db: &DbRef, 239 239 now: u64, 240 240 since: Option<Vec<u8>>, 241 - busy: &HashSet<String>, 241 + busy: &HashSet<Did<'_>>, 242 242 ) -> StorageResult<Option<(ResyncItem, Vec<u8>)>> { 243 243 let prefix = key_prefix_all(); 244 244 let lower_suffix = since.unwrap_or_default(); ··· 252 252 let key_bytes = key_slice.as_ref(); 253 253 let (_, did) = key_parse(key_bytes)?; 254 254 255 - if busy.contains(did.as_str()) { 255 + if busy.contains(&did) { 256 256 debug!(did = did.as_str(), "skip busy did in resync queue"); 257 257 continue; 258 258 } ··· 262 262 let next_since = key_bytes[prefix.len()..].to_vec(); 263 263 264 264 // Read current repo info to preserve account status across the transition. 265 - let repo_key = repo::key(did.clone()); 265 + let repo_key = repo::key(&did); 266 266 let new_info = match db.ks.get(&repo_key)? { 267 267 Some(b) => { 268 268 let rk = String::from_utf8_lossy(&repo_key).into_owned(); ··· 330 330 331 331 #[test] 332 332 fn key_structure() { 333 - let k = key(0x0102030405060708, did("did:web:example.com")); 333 + let k = key(0x0102030405060708, &did("did:web:example.com")); 334 334 let mut expected = b"rsq".to_vec(); 335 335 expected.extend_from_slice(&0x0102030405060708u64.to_be_bytes()); 336 336 expected.push(b'\0'); ··· 340 340 341 341 #[test] 342 342 fn key_sorts_by_timestamp() { 343 - let earlier = key(100, did("did:web:example.com")); 344 - let later = key(200, did("did:web:example.com")); 343 + let earlier = key(100, &did("did:web:example.com")); 344 + let later = key(200, &did("did:web:example.com")); 345 345 assert!(earlier < later); 346 346 } 347 347 348 348 #[test] 349 349 fn key_same_timestamp_sorts_by_did() { 350 - let a = key(100, did("did:web:a.com")); 351 - let b = key(100, did("did:web:b.com")); 350 + let a = key(100, &did("did:web:a.com")); 351 + let b = key(100, &did("did:web:b.com")); 352 352 assert!(a < b); 353 353 } 354 354 ··· 356 356 fn key_parse_roundtrips() { 357 357 let ts = 0xdeadbeefcafe1234u64; 358 358 let d = did("did:web:example.com"); 359 - let k = key(ts, d.clone()); 359 + let k = key(ts, &d); 360 360 let (parsed_ts, parsed_did) = key_parse(&k).unwrap(); 361 361 assert_eq!(parsed_ts, ts); 362 362 assert_eq!(parsed_did, d); ··· 365 365 #[test] 366 366 fn key_prefix_all_is_prefix_of_any_entry() { 367 367 let prefix_all = key_prefix_all(); 368 - let k = key(100, did("did:web:example.com")); 368 + let k = key(100, &did("did:web:example.com")); 369 369 assert!(k.starts_with(&prefix_all)); 370 370 } 371 371 372 372 #[test] 373 373 fn key_ts_midfix_upper_bound_excludes_entries_at_that_ts() { 374 374 let prefix_all = key_prefix_all(); 375 - let entry = key(100, did("did:web:example.com")); 375 + let entry = key(100, &did("did:web:example.com")); 376 376 let mut upper = prefix_all.clone(); 377 377 upper.extend_from_slice(&key_ts_midfix(100)); 378 378 assert!(entry >= upper); ··· 381 381 #[test] 382 382 fn key_ts_midfix_upper_bound_includes_earlier_ts() { 383 383 let prefix_all = key_prefix_all(); 384 - let entry = key(99, did("did:web:example.com")); 384 + let entry = key(99, &did("did:web:example.com")); 385 385 let mut upper = prefix_all.clone(); 386 386 upper.extend_from_slice(&key_ts_midfix(100)); 387 387 assert!(entry < upper); ··· 483 483 fn pending_repo(db: &DbRef, did_str: &str) { 484 484 repo::put_info( 485 485 db, 486 - did(did_str), 486 + &did(did_str), 487 487 &repo::RepoInfo { 488 488 state: repo::RepoState::Pending, 489 489 status: repo::AccountStatus::Active, ··· 506 506 507 507 assert!(dequeue_ready(&db, 9999, None).unwrap().is_none()); 508 508 509 - let (info, _) = repo::get(&db, did("did:web:a.com")).unwrap().unwrap(); 509 + let (info, _) = repo::get(&db, &did("did:web:a.com")).unwrap().unwrap(); 510 510 assert_eq!(info.state, repo::RepoState::Resyncing); 511 511 } 512 512 ··· 515 515 let db = open_temporary().unwrap(); 516 516 repo::put_info( 517 517 &db, 518 - did("did:web:a.com"), 518 + &did("did:web:a.com"), 519 519 &repo::RepoInfo { 520 520 state: repo::RepoState::Pending, 521 521 status: repo::AccountStatus::Suspended, ··· 529 529 .unwrap() 530 530 .unwrap(); 531 531 532 - let (info, _) = repo::get(&db, did("did:web:a.com")).unwrap().unwrap(); 532 + let (info, _) = repo::get(&db, &did("did:web:a.com")).unwrap().unwrap(); 533 533 assert_eq!(info.status, repo::AccountStatus::Suspended); 534 534 } 535 535 ··· 541 541 enqueue(&db, 100, &item("did:web:a.com", 0, "first", &[])).unwrap(); 542 542 enqueue(&db, 101, &item("did:web:b.com", 0, "second", &[])).unwrap(); 543 543 544 - let mut busy = HashSet::new(); 545 - busy.insert("did:web:a.com".to_string()); 544 + let mut busy: HashSet<Did<'static>> = HashSet::new(); 545 + busy.insert(did("did:web:a.com")); 546 546 547 547 let (claimed, _) = claim_resync(&db, 9999, None, &busy).unwrap().unwrap(); 548 548 assert_eq!(claimed.did.as_str(), "did:web:b.com"); ··· 554 554 pending_repo(&db, "did:web:a.com"); 555 555 enqueue(&db, 100, &item("did:web:a.com", 0, "only", &[])).unwrap(); 556 556 557 - let mut busy = HashSet::new(); 558 - busy.insert("did:web:a.com".to_string()); 557 + let mut busy: HashSet<Did<'static>> = HashSet::new(); 558 + busy.insert(did("did:web:a.com")); 559 559 560 560 assert!(claim_resync(&db, 9999, None, &busy).unwrap().is_none()); 561 561 }
+1 -1
src/sync/backfill.rs
··· 112 112 tokio::task::spawn_blocking(move || { 113 113 let mut count: u64 = 0; 114 114 for did in dids { 115 - let newly_inserted = storage::repo::ensure_repo(&db, did.clone())?; 115 + let newly_inserted = storage::repo::ensure_repo(&db, &did)?; 116 116 if newly_inserted { 117 117 let item = ResyncItem { 118 118 did,
+3 -3
src/sync/firehose/account_event.rs
··· 33 33 // will follow if the repo needs to be refreshed. For inactive: set the 34 34 // matching RepoState so the account's condition is visible. 35 35 let new_state = if account.active { 36 - storage::repo::get(&db, account.did.clone())? 36 + storage::repo::get(&db, &account.did)? 37 37 .map(|(info, _)| info.state) 38 38 .unwrap_or(storage::repo::RepoState::Active) 39 39 } else { ··· 49 49 storage::repo::put_info_into( 50 50 &mut batch, 51 51 &db, 52 - account.did.clone(), 52 + &account.did, 53 53 &storage::repo::RepoInfo { 54 54 state: new_state, 55 55 status: new_status, ··· 57 57 }, 58 58 ); 59 59 let n_removed = if tombstone { 60 - storage::collection_index::remove_all_into(&mut batch, &db, account.did.clone())? 60 + storage::collection_index::remove_all_into(&mut batch, &db, &account.did)? 61 61 } else { 62 62 0 63 63 };
+7 -7
src/sync/firehose/commit_event.rs
··· 411 411 validate_births: bool, 412 412 ) -> crate::error::Result<()> { 413 413 // Load the current repo state and chain tip (may be absent for new repos). 414 - let (info, prev) = match storage::repo::get(db, did.clone())? { 414 + let (info, prev) = match storage::repo::get(db, &did)? { 415 415 Some(r) => r, 416 416 None => return Ok(()), // repo not indexed by us; drop silently 417 417 }; 418 418 419 419 // Steps 2, 6, 7: shared drop checks. 420 - if validate::should_drop(&info, prev.as_ref(), &rev, "commit", did.clone()) { 420 + if validate::should_drop(&info, prev.as_ref(), &rev, "commit", &did) { 421 421 return Ok(()); 422 422 } 423 423 ··· 454 454 // confident in the birth detection logic. 455 455 if validate_births { 456 456 for coll in &born { 457 - match storage::collection_index::has_collection(db, did.clone(), coll.clone()) { 457 + match storage::collection_index::has_collection(db, &did, coll.clone()) { 458 458 Ok(true) => { 459 459 tracing::error!( 460 460 did = %did.as_str(), ··· 502 502 storage::repo::put_prev_into( 503 503 &mut batch, 504 504 db, 505 - did.clone(), 505 + &did, 506 506 &RepoPrev { 507 507 rev, 508 508 prev_data: new_mst_root_bytes, 509 509 }, 510 510 ); 511 511 for coll in born { 512 - storage::collection_index::insert_into(&mut batch, db, did.clone(), coll.clone()); 512 + storage::collection_index::insert_into(&mut batch, db, &did, coll.clone()); 513 513 storage::collection_list::insert_into(&mut batch, db, coll); 514 514 } 515 515 for coll in died { 516 - storage::collection_index::remove_into(&mut batch, db, did.clone(), coll); 516 + storage::collection_index::remove_into(&mut batch, db, &did, coll); 517 517 } 518 518 batch 519 519 .commit() ··· 542 542 storage::repo::put_info_into( 543 543 &mut batch, 544 544 db, 545 - did.clone(), 545 + &did, 546 546 &RepoInfo { 547 547 state: RepoState::Desynchronized, 548 548 status: existing_status,
+3 -3
src/sync/firehose/sync_event.rs
··· 124 124 rev: Tid, 125 125 blocks: Vec<u8>, 126 126 ) -> crate::error::Result<()> { 127 - let (info, prev) = match storage::repo::get(db, did.clone())? { 127 + let (info, prev) = match storage::repo::get(db, &did)? { 128 128 Some(r) => r, 129 129 None => return Ok(()), // repo not indexed by us; drop silently 130 130 }; 131 131 132 132 // Steps 2, 6, 7: shared drop checks. 133 - if validate::should_drop(&info, prev.as_ref(), &rev, "sync", did.clone()) { 133 + if validate::should_drop(&info, prev.as_ref(), &rev, "sync", &did) { 134 134 return Ok(()); 135 135 } 136 136 ··· 139 139 storage::repo::put_info_into( 140 140 &mut batch, 141 141 db, 142 - did.clone(), 142 + &did, 143 143 &RepoInfo { 144 144 state: RepoState::Desynchronized, 145 145 status: info.status,
+1 -1
src/sync/firehose/validate.rs
··· 171 171 prev: Option<&RepoPrev>, 172 172 rev: &Tid, 173 173 label: &'static str, 174 - did: Did<'_>, 174 + did: &Did<'_>, 175 175 ) -> bool { 176 176 if !info.status.is_active() { 177 177 metrics::counter!("lightrail_event_dropped_total",
+109 -59
src/sync/resync/dispatcher.rs
··· 10 10 //! after the cursor in the queue and are never skipped. The busy set prevents 11 11 //! claiming a new entry for a DID whose previous entry is still being processed. 12 12 13 + use jacquard_common::types::string::Did; 13 14 use std::collections::{HashMap, HashSet}; 14 15 use std::time::{Duration, Instant}; 15 16 ··· 49 50 token: tokio_util::sync::CancellationToken, 50 51 ) -> Result<()> { 51 52 let client = crate::http::build_client(); 52 - let mut busy: HashSet<String> = HashSet::new(); 53 - let mut task_dids: HashMap<TaskId, String> = HashMap::new(); 53 + let mut busy: HashSet<Did<'static>> = HashSet::new(); 54 + let mut task_dids: HashMap<TaskId, Did<'static>> = HashMap::new(); 54 55 let mut since: Option<Vec<u8>> = None; 55 56 let mut workers: JoinSet<WorkerOutcome> = JoinSet::new(); 56 57 // Maps PDS hostname → Instant when the 429 cooldown expires. ··· 78 79 } 79 80 Ok(Ok(Some((item, cursor)))) => { 80 81 since = Some(cursor); 81 - let did_str = item.did.as_str().to_string(); 82 - busy.insert(did_str.clone()); 82 + let did = item.did.clone(); 83 + busy.insert(did.clone()); 83 84 84 85 // Check whether this item's PDS host is still cooling down 85 86 // after a 429. The resolver cache makes this a local lookup 86 87 // for any DID we've seen recently. 87 - if let Ok(resolved) = resolver.resolve(&item.did).await { 88 + if let Ok(resolved) = resolver.resolve(&did).await { 88 89 let host = resolved.pds.host_str().unwrap_or("").to_string(); 89 90 match cooling_hosts.get(&host) { 90 91 Some(&until) if Instant::now() < until => { ··· 98 99 .await 99 100 { 100 101 Ok(Ok(())) => {} 101 - Ok(Err(e)) => error!(error = %e, did = %did_str, 102 + Ok(Err(e)) => error!(error = %e, did = %did, 102 103 "failed to defer during host cooldown"), 103 - Err(e) => error!(error = %e, did = %did_str, 104 + Err(e) => error!(error = %e, did = %did, 104 105 "failed to defer during host cooldown (panic)"), 105 106 } 106 - busy.remove(&did_str); 107 - debug!(did = %did_str, host = %host, remaining_secs = remaining, 107 + busy.remove(&did); 108 + debug!(did = %did, host = %host, remaining_secs = remaining, 108 109 "deferring; PDS host cooling down after 429"); 109 110 break; 110 111 } ··· 130 131 ) 131 132 .await 132 133 }); 133 - task_dids.insert(handle.id(), did_str.clone()); 134 + task_dids.insert(handle.id(), did.clone()); 134 135 metrics::gauge!("lightrail_resync_workers").set(workers.len() as f64); 135 - trace!(did = %did_str, running = workers.len(), "spawned resync worker"); 136 + trace!(did = %did, running = workers.len(), "spawned resync worker"); 136 137 } 137 138 Ok(Ok(None)) => break, // queue empty or all ready items busy 138 139 Ok(Err(e)) => { ··· 163 164 Ok((id, outcome)) => (id, Ok(outcome)), 164 165 Err(e) => (e.id(), Err(e)), 165 166 }; 166 - let did_str = task_dids.remove(&task_id).unwrap_or_default(); 167 - busy.remove(&did_str); 167 + let did = task_dids 168 + .remove(&task_id) 169 + .expect("BUG: resync task completed for DID not in task_dids"); 170 + assert!( 171 + busy.remove(&did), 172 + "BUG: resync task completed for DID not in busy set" 173 + ); 168 174 169 175 metrics::gauge!("lightrail_resync_workers").set(workers.len() as f64); 170 176 ··· 177 183 host.clone(), 178 184 Instant::now() + Duration::from_secs(RATE_LIMIT_COOLDOWN_SECS), 179 185 ); 180 - warn!(did = %did_str, host = %host, cooldown_secs = RATE_LIMIT_COOLDOWN_SECS, 186 + warn!(did = %did, host = %host, cooldown_secs = RATE_LIMIT_COOLDOWN_SECS, 181 187 "PDS rate-limited; cooling down"); 182 - if let Ok(did) = parse_did(&did_str) { 183 - let item = ResyncItem { 184 - did, 185 - retry_count, 186 - retry_reason: format!("rate limited by {host}"), 187 - commit_cbor: vec![], 188 - }; 189 - let db = db.clone(); 190 - let ts = unix_now() + RATE_LIMIT_COOLDOWN_SECS; 191 - match tokio::task::spawn_blocking(move || { 192 - storage::resync_queue::enqueue(&db, ts, &item) 193 - }) 194 - .await 195 - { 196 - Ok(Ok(())) => {} 197 - Ok(Err(e)) => error!(error = %e, did = %did_str, 198 - "failed to re-enqueue after rate limit"), 199 - Err(e) => error!(error = %e, did = %did_str, 200 - "failed to re-enqueue after rate limit (panic)"), 201 - } 188 + let item = ResyncItem { 189 + did: did.clone(), 190 + retry_count, 191 + retry_reason: format!("rate limited by {host}"), 192 + commit_cbor: vec![], 193 + }; 194 + let db = db.clone(); 195 + let ts = unix_now() + RATE_LIMIT_COOLDOWN_SECS; 196 + match tokio::task::spawn_blocking(move || { 197 + storage::resync_queue::enqueue(&db, ts, &item) 198 + }) 199 + .await 200 + { 201 + Ok(Ok(())) => {} 202 + Ok(Err(e)) => error!(error = %e, did = %did, 203 + "failed to re-enqueue after rate limit"), 204 + Err(e) => error!(error = %e, did = %did, 205 + "failed to re-enqueue after rate limit (panic)"), 202 206 } 203 207 } 204 208 Ok(outcome) => { 205 - if let Err(e) = handle_completion(&did_str, outcome, db.clone()).await { 206 - error!(error = %e, did = %did_str, "error handling worker completion"); 209 + if let Err(e) = handle_completion(did.clone(), outcome, db.clone()).await { 210 + error!(error = %e, did = %did, "error handling worker completion"); 207 211 } 208 212 } 209 213 Err(e) => { ··· 212 216 metrics::counter!("lightrail_resync_completed_total", 213 217 "outcome" => "panic") 214 218 .increment(1); 215 - error!(error = %e, did = %did_str, "resync worker panicked; re-enqueueing"); 216 - re_enqueue_panic_recovery(&did_str, db.clone()).await; 219 + error!(error = %e, did = %did, "resync worker panicked; re-enqueueing"); 220 + re_enqueue_panic_recovery(did, db.clone()).await; 217 221 } 218 222 } 219 223 } ··· 236 240 host: String, 237 241 retry_count: u16, 238 242 }, 243 + /// The PDS returned a definitive "repo not found" response. 244 + /// The repo state has already been written; schedule a slow retry. 245 + NotFound { 246 + retry_count: u16, 247 + }, 239 248 } 240 249 241 250 async fn run_worker( ··· 246 255 describe_timeout: std::time::Duration, 247 256 get_repo_timeout: std::time::Duration, 248 257 ) -> WorkerOutcome { 249 - let did_str = item.did.as_str().to_string(); 258 + let did = item.did.clone(); 250 259 match super::index_repo( 251 260 client, 252 261 resolver, ··· 258 267 .await 259 268 { 260 269 Ok(()) => { 261 - trace!(did = %did_str, "resync completed"); 270 + trace!(did = %did, "resync completed"); 262 271 WorkerOutcome::Success 263 272 } 264 273 Err(ResyncError::Fetch(GetCollectionsError::RateLimited(host))) => { 265 - warn!(did = %did_str, host = %host, "PDS rate-limited; reporting to dispatcher"); 274 + warn!(did = %did, host = %host, "PDS rate-limited; reporting to dispatcher"); 266 275 WorkerOutcome::RateLimited { 267 276 host, 268 277 retry_count: item.retry_count, 269 278 } 270 279 } 280 + Err(ResyncError::RepoNotFound) => { 281 + // Invalidate the cached DID resolution: the PDS pointer may be 282 + // stale (e.g. mid-migration), so the next attempt will re-resolve. 283 + resolver.invalidate_did(&did); 284 + info!(did = %did, "repo not found on PDS; invalidated DID cache"); 285 + WorkerOutcome::NotFound { 286 + retry_count: item.retry_count, 287 + } 288 + } 271 289 Err(e) => { 272 - warn!(did = %did_str, error = %e, "resync failed"); 290 + warn!(did = %did, error = %e, "resync failed"); 273 291 WorkerOutcome::Retry { 274 292 error: e.to_string(), 275 293 retry_count: item.retry_count, ··· 282 300 // Completion handling 283 301 // --------------------------------------------------------------------------- 284 302 285 - async fn handle_completion(did_str: &str, outcome: WorkerOutcome, db: DbRef) -> Result<()> { 286 - let did = parse_did(did_str)?; 303 + async fn handle_completion(did: Did<'static>, outcome: WorkerOutcome, db: DbRef) -> Result<()> { 287 304 match outcome { 288 305 WorkerOutcome::Success => { 289 306 metrics::counter!("lightrail_resync_completed_total", "outcome" => "success") ··· 312 329 let ts = unix_now() + delay; 313 330 tokio::task::spawn_blocking(move || storage::resync_queue::enqueue(&db, ts, &item)) 314 331 .await??; 315 - info!(did = %did_str, retry = new_retry, delay_secs = delay, "re-enqueued for retry"); 332 + info!(did = %did, retry = new_retry, delay_secs = delay, "re-enqueued for retry"); 333 + } 334 + WorkerOutcome::NotFound { retry_count } => { 335 + metrics::counter!("lightrail_resync_completed_total", "outcome" => "not_found") 336 + .increment(1); 337 + let new_retry = retry_count.saturating_add(1); 338 + if let Some(delay) = not_found_backoff_secs(new_retry) { 339 + let item = ResyncItem { 340 + did: did.clone(), 341 + retry_count: new_retry, 342 + retry_reason: "repo not found".to_string(), 343 + commit_cbor: vec![], 344 + }; 345 + let ts = unix_now() + delay; 346 + tokio::task::spawn_blocking(move || storage::resync_queue::enqueue(&db, ts, &item)) 347 + .await??; 348 + info!(did = %did, retry = new_retry, delay_secs = delay, 349 + "re-enqueued after repo not found"); 350 + } else { 351 + info!(did = %did, retries = retry_count, 352 + "giving up on repo not found after max retries"); 353 + } 316 354 } 317 355 // RateLimited is handled inline in the dispatcher loop before calling here. 318 356 WorkerOutcome::RateLimited { .. } => { ··· 322 360 Ok(()) 323 361 } 324 362 325 - async fn re_enqueue_panic_recovery(did_str: &str, db: DbRef) { 326 - let Ok(did) = parse_did(did_str) else { return }; 363 + async fn re_enqueue_panic_recovery(did: Did<'static>, db: DbRef) { 327 364 let item = ResyncItem { 328 365 did: did.clone(), 329 366 retry_count: 1, ··· 338 375 .await 339 376 { 340 377 Ok(Ok(())) => {} 341 - Ok(Err(e)) => error!(error = %e, did = %did_str, "failed to re-enqueue after panic"), 342 - Err(e) => error!(error = %e, did = %did_str, "failed to re-enqueue after panic (panic)"), 378 + Ok(Err(e)) => error!(error = %e, did = %did, "failed to re-enqueue after panic"), 379 + Err(e) => error!(error = %e, did = %did, "failed to re-enqueue after panic (panic)"), 343 380 } 344 381 // Best-effort: transition to Error so the repo isn't stuck in Resyncing. 345 382 let _ = transition_state(db, did, RepoState::Error, Some("worker panic".to_string())).await; ··· 357 394 error: Option<String>, 358 395 ) -> Result<()> { 359 396 tokio::task::spawn_blocking(move || -> crate::error::Result<()> { 360 - let new_info = match storage::repo::get(&db, did.clone())? { 397 + let new_info = match storage::repo::get(&db, &did)? { 361 398 Some((mut info, _)) => { 362 399 info.state = state; 363 400 info.error = error; 364 401 info 365 402 } 366 403 None => { 367 - warn!(did = %did.as_str(), "no repo record during state transition; creating one"); 404 + warn!(did = %did, "no repo record during state transition; creating one"); 368 405 RepoInfo { 369 406 state, 370 407 status: AccountStatus::Active, ··· 372 409 } 373 410 } 374 411 }; 375 - storage::repo::put_info(&db, did, &new_info)?; 412 + storage::repo::put_info(&db, &did, &new_info)?; 376 413 Ok(()) 377 414 }) 378 415 .await??; 379 416 Ok(()) 380 417 } 381 418 382 - fn parse_did( 383 - s: &str, 384 - ) -> std::result::Result<jacquard_common::types::string::Did<'static>, super::ResyncError> { 385 - jacquard_common::types::string::Did::new_owned(s) 386 - .map_err(|_| super::ResyncError::InvalidDid(format!("invalid DID: {s}"))) 387 - } 388 - 389 419 fn backoff_secs(retry_count: u16) -> u64 { 390 420 // 60 s, 120 s, 240 s, 480 s, 960 s, 1920 s, 3600 s (capped) 391 421 let exp = (retry_count as u32).saturating_sub(1).min(5); 392 422 (60u64 * (1u64 << exp)).min(3600) 393 423 } 424 + 425 + /// Slow-retry schedule for repos that returned "not found" from their PDS. 426 + /// 427 + /// Returns `None` when we've exhausted retries — the `RepoState::NotFound` 428 + /// written by `index_repo` is the terminal state; future firehose activity 429 + /// for the DID will re-trigger processing normally. 430 + /// 431 + /// Schedule (retry_count after incrementing): 432 + /// 1 → 6 h (covers transient mid-migration windows) 433 + /// 2 → 24 h (covers stale DID→PDS cache expiry) 434 + /// 3 → 24 h (one more check before giving up) 435 + /// ≥4 → None 436 + fn not_found_backoff_secs(retry_count: u16) -> Option<u64> { 437 + match retry_count { 438 + 1 => Some(6 * 3600), // 6 h 439 + 2 => Some(24 * 3600), // 24 h 440 + 3 => Some(24 * 3600), // 24 h 441 + _ => None, 442 + } 443 + }
+14 -1
src/sync/resync/get_repo.rs
··· 55 55 GetRepoError::RepoDeactivated(_) => { 56 56 Err(GetCollectionsError::RepoGone(RepoGoneReason::Deactivated)) 57 57 } 58 - GetRepoError::Unknown(e) => Err(GetCollectionsError::UnexpectedXrpc(format!("{e:?}"))), 58 + GetRepoError::Unknown(ref data) => { 59 + // Some PDSes send {"error":"NotFound"} instead of the 60 + // standardised "RepoNotFound" XRPC error code. 61 + let is_not_found = data 62 + .as_object() 63 + .and_then(|obj| obj.get("error")) 64 + .and_then(|v| v.as_str()) 65 + == Some("NotFound"); 66 + if is_not_found { 67 + Err(GetCollectionsError::RepoNotFound) 68 + } else { 69 + Err(GetCollectionsError::UnexpectedXrpc(format!("{data:?}"))) 70 + } 71 + } 59 72 }, 60 73 Err(e) => Err(GetCollectionsError::Request(e.to_string())), 61 74 }
+38 -5
src/sync/resync/mod.rs
··· 46 46 InvalidDid(String), 47 47 #[error("blocking storage task panicked: {0}")] 48 48 TaskPanic(String), 49 + /// The PDS returned a definitive "repo not found" for this DID. 50 + /// The repo state has been updated; the caller may schedule a slow retry. 51 + #[error("repo not found on PDS")] 52 + RepoNotFound, 49 53 } 50 54 51 55 /// A snapshot of a repository's state as observed during a resync. ··· 128 132 .await 129 133 { 130 134 Ok(s) => s, 131 - Err(GetCollectionsError::RepoNotFound) => return Ok(()), 135 + Err(GetCollectionsError::RepoNotFound) => { 136 + info!(did = %did.as_str(), "repo not found on PDS; marking state"); 137 + let db = db.clone(); 138 + tokio::task::spawn_blocking(move || mark_not_found(&db, &did)) 139 + .await 140 + .map_err(|e| ResyncError::TaskPanic(e.to_string()))??; 141 + return Err(ResyncError::RepoNotFound); 142 + } 132 143 Err(GetCollectionsError::RepoGone(reason)) => { 133 144 let (state, status) = match reason { 134 145 RepoGoneReason::Takendown => (RepoState::Takendown, AccountStatus::Takendown), ··· 137 148 }; 138 149 info!(did = %did.as_str(), status = status.as_str(), "repo gone; updating account status"); 139 150 let db = db.clone(); 140 - tokio::task::spawn_blocking(move || update_account_status(&db, did, state, status)) 151 + tokio::task::spawn_blocking(move || update_account_status(&db, &did, state, status)) 141 152 .await 142 153 .map_err(|e| ResyncError::TaskPanic(e.to_string()))??; 143 154 return Ok(()); ··· 148 159 let db = db.clone(); 149 160 let collections = snapshot.collections; 150 161 let (n_inserted, n_removed) = tokio::task::spawn_blocking(move || { 151 - crate::storage::collection_index::sync_collections(&db, did, &collections) 162 + crate::storage::collection_index::sync_collections(&db, &did, &collections) 152 163 }) 153 164 .await 154 165 .map_err(|e| ResyncError::TaskPanic(e.to_string()))??; ··· 170 181 /// field is cleared since the gone state is fully described by `state`/`status`. 171 182 fn update_account_status( 172 183 db: &DbRef, 173 - did: Did<'_>, 184 + did: &Did<'_>, 174 185 state: RepoState, 175 186 status: AccountStatus, 176 187 ) -> Result<()> { 177 - let new_info = match crate::storage::repo::get(db, did.clone())? { 188 + let new_info = match crate::storage::repo::get(db, did)? { 178 189 Some((mut info, _)) => { 179 190 info.state = state; 180 191 info.status = status; ··· 184 195 None => RepoInfo { 185 196 state, 186 197 status, 198 + error: None, 199 + }, 200 + }; 201 + crate::storage::repo::put_info(db, did, &new_info)?; 202 + Ok(()) 203 + } 204 + 205 + /// Set `RepoState::NotFound` for a DID whose PDS reports no repo. 206 + /// 207 + /// Preserves the existing `AccountStatus` if a record exists (we don't know 208 + /// the true account status from a not-found response). The `error` field is 209 + /// cleared since the state is fully described by `RepoState::NotFound`. 210 + fn mark_not_found(db: &DbRef, did: &Did<'_>) -> Result<()> { 211 + let new_info = match crate::storage::repo::get(db, did)? { 212 + Some((mut info, _)) => { 213 + info.state = RepoState::NotFound; 214 + info.error = None; 215 + info 216 + } 217 + None => RepoInfo { 218 + state: RepoState::NotFound, 219 + status: AccountStatus::Active, 187 220 error: None, 188 221 }, 189 222 };