···55//! on, so the walk can be safely resumed after a restart. Already-known repos
66//! (any state) are skipped — the dispatcher's retry mechanism handles repos
77//! that need re-syncing.
88+//!
99+//! Accounts listed as non-active (takendown, suspended, deactivated, deleted)
1010+//! have their status recorded without queuing a resync — there's nothing to
1111+//! fetch.
812913use std::sync::Arc;
10141111-use jacquard_api::com_atproto::sync::list_repos::ListRepos;
1515+use jacquard_api::com_atproto::sync::list_repos::{ListRepos, RepoStatus};
1216use jacquard_common::{
1317 error::ClientErrorKind,
1418 types::string::Did,
···1721};
1822use tokio::time::Duration;
1923use tokio_util::sync::CancellationToken;
2020-use tracing::{error, info, trace, warn};
2424+use tracing::{debug, error, info, trace, warn};
21252226use crate::{
2327 error::Result,
···2630 storage::{
2731 self, DbRef,
2832 backfill_progress::{BackfillProgress, get, set},
3333+ repo::{AccountStatus, RepoInfo, RepoState},
2934 },
3035 sync::discovery_queue::{DiscoveryItem, DiscoveryQueue},
3136 util::TokenExt,
···3944/// Maximum consecutive transient failures before giving up on this host.
4045const MAX_PAGE_FAILURES: u32 = 3;
41464747+// ---------------------------------------------------------------------------
4848+// Listed account state
4949+// ---------------------------------------------------------------------------
5050+5151+/// Map the `active`/`status` fields from a listRepos `Repo` entry to a
5252+/// [`ListedAccountState`]. Follows the same mapping as firehose `#account`
5353+/// events in `account_event.rs`.
5454+fn classify_repo(active: Option<bool>, status: &Option<RepoStatus<'_>>) -> RepoState {
5555+ // active-as-fallback is safe:
5656+ if active.unwrap_or(true) {
5757+ return RepoState::Active;
5858+ }
5959+ let Some(jac_status) = status else {
6060+ return RepoState::Active;
6161+ };
6262+ match jac_status {
6363+ RepoStatus::Takendown => RepoState::Takendown,
6464+ RepoStatus::Suspended => RepoState::Suspended,
6565+ RepoStatus::Deleted => RepoState::Deleted,
6666+ RepoStatus::Deactivated => RepoState::Deactivated,
6767+ RepoStatus::Desynchronized => RepoState::Desynchronized,
6868+ RepoStatus::Throttled => RepoState::Throttled,
6969+ RepoStatus::Other(_) => RepoState::Error,
7070+ }
7171+}
7272+7373+// ---------------------------------------------------------------------------
7474+// Main loop
7575+// ---------------------------------------------------------------------------
7676+4277/// Walk `listRepos` on `host` and enqueue new repos for resync.
4378///
4479/// When `validate` is true (deep crawl / untrusted PDS), DIDs are verified to
4545-/// actually live on `host` and non-matching ones are rejected. The resolver is
4646-/// always used for PDS-host discovery so that enqueue timestamps can be
4747-/// staggered per actual PDS, spreading work across hosts in the queue.
8080+/// actually live on `host` and non-matching ones are rejected. Validated DIDs
8181+/// are considered authoritative for account status — if the PDS says an account
8282+/// is deactivated, we trust it even if our local state is Active. Without
8383+/// validation (relay backfill), we only write non-active status for accounts
8484+/// we haven't seen before, to avoid overwriting Active state with stale relay
8585+/// data (e.g. accounts that migrated off and appear deactivated on the old PDS).
4886pub async fn run(
4987 host: Host,
5088 db: DbRef,
···95133 dids
96134 };
971359898- // Resolve each DID's actual PDS host for per-host stagger.
136136+ // Resolve each DID's actual PDS host for discovery queue routing.
99137 // Cache hits are free; misses fall back to the listed host.
100138 //
101139 // TODO: ...this is basically redundant with validate_dids now?
102102- let dids_with_hosts: Vec<(Did<'static>, Arc<Url>)> = {
140140+ let dids_with_hosts: Vec<(Did<'static>, Arc<Url>, RepoState)> = {
103141 let mut out = Vec::with_capacity(dids.len());
104104- for did in dids {
142142+ for (did, account_state) in dids {
105143 let Some(res) = token.run(resolver.resolve(&did)).await else {
106144 return Ok(false); // cancelled
107145 };
108146 let pds_host = match res {
109147 Ok(resolved) => resolved.pds.clone(),
110148 Err(e) => {
111111- error!(did = %did, error = %e, "failed to resolve host for validated did; not enqueuing resync");
149149+ error!(did = %did, error = %e, "failed to resolve host for validated did; skipping");
112150 continue;
113151 }
114152 };
115115- out.push((did, pds_host));
153153+ out.push((did, pds_host, account_state));
116154 }
117155 out
118156 };
119157120158 let progress_cursor = next_cursor.clone().unwrap_or_default();
121121- let (page_queued, new_items) = {
159159+ let (page_queued, page_inactive, new_items) = {
122160 let db = db.clone();
123161 let host = host.clone();
124162 tokio::task::spawn_blocking(move || {
125125- persist_page(&db, &host, dids_with_hosts, progress_cursor)
163163+ persist_page(&db, &host, dids_with_hosts, progress_cursor, validate)
126164 })
127165 .await??
128166 };
···144182 host = %host,
145183 page_repos = page_len,
146184 page_queued,
185185+ page_inactive,
147186 total_queued,
148187 next_cursor = next_cursor.as_deref().unwrap_or("(done)"),
149188 "backfill page"
···192231 cursor: Option<&str>,
193232 client: &ThrottledClient,
194233 token: &CancellationToken,
195195-) -> Option<(Vec<Did<'static>>, Option<String>)> {
234234+) -> Option<(Vec<(Did<'static>, RepoState)>, Option<String>)> {
196235 let req = ListRepos {
197236 cursor: cursor.map(Into::into),
198237 limit: Some(PAGE_LIMIT),
···216255 Ok(resp) => match resp.parse() {
217256 Ok(out) => {
218257 let next = out.cursor.as_deref().map(str::to_owned);
219219- let dids = out
258258+ let listed = out
220259 .repos
221260 .into_iter()
222222- .map(|r| r.did.into_static())
261261+ .map(|r| {
262262+ let state = classify_repo(r.active, &r.status);
263263+ (r.did.into_static(), state)
264264+ })
223265 .collect::<Vec<_>>();
224224- Some((dids, next))
266266+ Some((listed, next))
225267 }
226268 Err(e) => {
227269 warn!(error = %e, host = %host, "listRepos response parse failed");
···253295254296/// Filter `dids` to those whose resolved PDS endpoint matches `host`.
255297///
256256-/// Bypasses the identity cache to avoid populating it with entries that may
257298/// Returns early with whatever has been validated so far if the token is cancelled.
258299async fn validate_dids(
259259- dids: Vec<Did<'static>>,
300300+ dids: Vec<(Did<'static>, RepoState)>,
260301 resolver: &Resolver,
261302 host: &Host,
262303 token: &CancellationToken,
263263-) -> Vec<Did<'static>> {
304304+) -> Vec<(Did<'static>, RepoState)> {
264305 let host_str = host.to_string();
265306 let mut valid = Vec::with_capacity(dids.len());
266266- for did in dids {
307307+ for (did, account_state) in dids {
267308 let Some(r) = token.run(resolver.resolve(&did)).await else {
268309 break;
269310 };
270311 match r {
271312 Ok(resolved) if resolved.pds.host_str() == Some(host_str.as_str()) => {
272272- valid.push(did);
313313+ valid.push((did, account_state));
273314 }
274315 Ok(resolved) => {
275316 metrics::counter!("lightrail_backfill_did_rejected_total", "reason" => "pds_mismatch")
···296337297338/// Insert newly-seen DIDs into the repo table and persist the backfill cursor.
298339///
299299-/// Returns `(count_inserted, new_items)` — the caller pushes `new_items` into
300300-/// the in-memory discovery queue (async, with backpressure).
340340+/// Active accounts are returned for the caller to push into the discovery
341341+/// queue. Non-active accounts have their status written directly (no resync
342342+/// needed). When `authoritative` is false (relay backfill), non-active status
343343+/// is only written for accounts that have no existing Active record — we don't
344344+/// overwrite a locally-Active account with stale relay data.
345345+///
346346+/// Returns `(active_count, inactive_count, new_active_items)`.
301347fn persist_page(
302348 db: &DbRef,
303349 host: &Host,
304304- items: Vec<DidWithPds>,
350350+ items: Vec<(Did<'static>, Arc<Url>, RepoState)>,
305351 progress_cursor: String,
306306-) -> Result<(u64, Vec<DidWithPds>)> {
307307- let mut new_items: Vec<DidWithPds> = Vec::new();
308308- for (did, pds) in items {
309309- let newly_inserted = storage::repo::ensure_repo(db, &did)?;
310310- if newly_inserted {
311311- db.stats.repos_queued_total.fetch_add(1, Ordering::Relaxed);
312312- new_items.push((did, pds));
352352+ authoritative: bool,
353353+) -> Result<(u64, u64, Vec<DidWithPds>)> {
354354+ let mut new_active: Vec<DidWithPds> = Vec::new();
355355+ let mut inactive_count: u64 = 0;
356356+ for (did, pds, repo_state) in items {
357357+ if let Some(deactiveated_account_status) = repo_state.to_account_inactive() {
358358+ if write_inactive_status(
359359+ db,
360360+ &did,
361361+ deactiveated_account_status,
362362+ repo_state,
363363+ authoritative,
364364+ )? {
365365+ inactive_count += 1;
366366+ }
367367+ } else {
368368+ let newly_inserted = storage::repo::ensure_repo(db, &did)?;
369369+ if newly_inserted {
370370+ db.stats.repos_queued_total.fetch_add(1, Ordering::Relaxed);
371371+ new_active.push((did, pds));
372372+ }
313373 }
314374 }
315375 // Persist progress before advancing so a crash during the next
···322382 completed_at: None,
323383 },
324384 )?;
325325- Ok((new_items.len() as u64, new_items))
385385+ Ok((new_active.len() as u64, inactive_count, new_active))
386386+}
387387+388388+/// Write a non-active account status from a listRepos entry.
389389+///
390390+/// Returns `true` if the status was written, `false` if skipped.
391391+///
392392+/// When `authoritative` is true (deep crawl — PDS confirmed via identity
393393+/// resolution), the status is always written. When false (relay backfill),
394394+/// we only write if the existing local status is not Active, to avoid
395395+/// overwriting with stale data (e.g. an account that migrated off this PDS
396396+/// and shows as deactivated on the old host).
397397+fn write_inactive_status(
398398+ db: &DbRef,
399399+ did: &Did<'_>,
400400+ status: AccountStatus,
401401+ state: RepoState,
402402+ authoritative: bool,
403403+) -> Result<bool> {
404404+ let existing = storage::repo::get_status(db, did)?;
405405+ match existing {
406406+ Some(ref existing_status) if existing_status.is_active() && !authoritative => {
407407+ debug!(
408408+ did = %did,
409409+ listed_status = status.as_str(),
410410+ "skipping non-active status from non-authoritative source; local account is active"
411411+ );
412412+ return Ok(false);
413413+ }
414414+ Some(ref existing_status) if *existing_status == status => {
415415+ return Ok(false); // already matches
416416+ }
417417+ _ => {}
418418+ }
419419+ let info = RepoInfo {
420420+ state,
421421+ status: status.clone(),
422422+ error: None,
423423+ };
424424+ storage::repo::put_info(db, did, &info)?;
425425+ metrics::counter!(
426426+ "lightrail_backfill_inactive_total",
427427+ "status" => status.as_str()
428428+ )
429429+ .increment(1);
430430+ Ok(true)
326431}
+5-9
src/sync/firehose/account_event.rs
···1717 } else {
1818 use jacquard_api::com_atproto::sync::subscribe_repos::AccountStatus;
1919 match account.status.as_ref() {
2020+ Some(AccountStatus::Throttled) => storage::repo::AccountStatus::Throttled,
2021 Some(AccountStatus::Takendown) => storage::repo::AccountStatus::Takendown,
2122 Some(AccountStatus::Suspended) => storage::repo::AccountStatus::Suspended,
2223 Some(AccountStatus::Deactivated) => storage::repo::AccountStatus::Deactivated,
···2526 }
2627 };
27282828- let tombstone = matches!(
2929- new_status,
3030- storage::repo::AccountStatus::Takendown | storage::repo::AccountStatus::Deleted
3131- );
2929+ let tombstone = new_status == storage::repo::AccountStatus::Deleted;
32303331 // For active accounts: preserve the current repo state — a #sync event
3432 // will follow if the repo needs to be refreshed. For inactive: set the
···3836 .map(|(info, _)| info.state)
3937 .unwrap_or(storage::repo::RepoState::Active)
4038 } else {
4141- match &new_status {
4242- storage::repo::AccountStatus::Suspended => storage::repo::RepoState::Suspended,
4343- storage::repo::AccountStatus::Deactivated => storage::repo::RepoState::Deactivated,
4444- _ => storage::repo::RepoState::Takendown,
4545- }
3939+ new_status
4040+ .to_repo_inactive()
4141+ .unwrap_or(storage::repo::RepoState::Error)
4642 };
47434844 // Atomically: update repo state + remove index entries if tombstoned.