lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

check upstream for unexpected events from inactive

eventually consistent babyy

phil 72027029 181ac457

+238 -34
+1 -1
hacking.md
··· 100 100 - [x] firehose websocket 101 101 - [-] ~~ping/pong (unless jacquard is already doing it):~~ seems like no but we can skip it 102 102 - [x] no-events-received timeout reconnect 103 + - [x] account status convergeance: if we receive commits from apparently-inactive accounts, should we check upstream status to make sure we're not stale? 103 104 - [ ] resync short-circuit: tiny repos may actually return their entire CAR for getRecord 104 105 - [ ] commit CAR handling: generate a list of keys with gaps noted, to reliably detect missing adjacent keys 105 - - [ ] account status convergeance: if we receive commits from apparently-inactive accounts, should we check upstream status to make sure we're not stale? 106 106 107 107 108 108 very much still todo but i'm getting tired
+3 -2
readme.md
··· 66 66 67 67 [`relay.fire.hose.cam`](https://relay.fire.hose.cam/) is one of [microcosm](https://www.microcosm.blue/)'s full-network relays. Lightrail works with a relay or PDS host upstream, or any other service that implements at least: 68 68 69 - - `com.atproto.sync.subscribeRepos` and 70 - - `com.atproto.sync.listRepos` 69 + - `com.atproto.sync.subscribeRepos`, 70 + - `com.atproto.sync.listRepos`, and 71 + - `com.atproto.sync.getRepoStatus` 71 72 72 73 73 74 ### Key configs
+2
src/main.rs
··· 139 139 let db = db.clone(); 140 140 let host = subscribe_host.clone(); 141 141 let resolver = resolver.clone(); 142 + let client = client.clone(); 142 143 async move { 143 144 let mut sub = firehose::Subscriber::new( 144 145 host, ··· 146 147 resolver, 147 148 args.max_firehose_workers, 148 149 Duration::from_secs(args.cursor_save_interval_secs), 150 + client, 149 151 ); 150 152 sub.run(token) 151 153 .await
+60 -1
src/sync/firehose/commit_event.rs
··· 44 44 seq: i64, 45 45 resolver: &Resolver, 46 46 db: &DbRef, 47 + client: &crate::http::ThrottledClient, 47 48 ) -> crate::error::Result<()> { 48 49 let did = commit.repo.clone(); 49 50 ··· 67 68 let (info, prev, pds_mode) = match step2 { 68 69 Step2Result::Proceed(info, prev, mode) => (info, prev, mode), 69 70 Step2Result::Drop => return Ok(()), 71 + Step2Result::InactiveAccount(info, _prev) => { 72 + // Our local record says inactive, but we may have missed a 73 + // reactivation #account event. Check upstream before dropping. 74 + if !validate::upstream_says_active(&pds_host, &did, client).await { 75 + metrics::counter!("lightrail_event_dropped_total", 76 + "event_type" => "commit", "reason" => "account_inactive") 77 + .increment(1); 78 + debug!(did = %did, status = info.status.as_str(), 79 + "commit dropped: account not active (confirmed upstream)"); 80 + return Ok(()); 81 + } 82 + // Upstream says active — persist the reactivation and re-run 83 + // the step-2 checks with the updated status. 84 + metrics::counter!("lightrail_account_reactivated_total", 85 + "trigger" => "commit") 86 + .increment(1); 87 + info!(did = %did, "account reactivated via upstream check (triggered by #commit)"); 88 + let db_ra = db.clone(); 89 + let did_ra = did.clone(); 90 + let rev_ra = commit.rev.clone(); 91 + let pds_host_ra = pds_host.clone(); 92 + let step2_retry = tokio::task::spawn_blocking(move || { 93 + reactivate_and_recheck(&db_ra, &did_ra, &rev_ra, pds_host_ra) 94 + }) 95 + .await??; 96 + match step2_retry { 97 + Step2Result::Proceed(i, p, m) => (i, p, m), 98 + _ => return Ok(()), 99 + } 100 + } 70 101 Step2Result::Buffer => { 71 102 // Repo is mid-resync. Serialize the commit and buffer it so it can 72 103 // be replayed after the resync fetch completes. ··· 318 349 enum Step2Result { 319 350 /// All good — proceed with signature verification and the rest of the pipeline. 320 351 Proceed(RepoInfo, Option<RepoPrev>, Sync11Mode), 321 - /// Drop this event (inactive account, desynchronized state, stale rev, etc.). 352 + /// Drop this event (desynchronized state, stale rev, future rev, etc.). 322 353 Drop, 323 354 /// Repo is mid-resync. Caller should buffer the commit for replay. 324 355 Buffer, 356 + /// Account is locally inactive. Caller may check upstream before dropping. 357 + InactiveAccount(RepoInfo, Option<RepoPrev>), 325 358 } 326 359 327 360 /// Step 2: load the repo state and decide how to handle this commit. ··· 375 408 if info.state == RepoState::Resyncing { 376 409 return Ok(Step2Result::Buffer); 377 410 } 411 + // Separate inactive check so the caller can probe upstream before dropping. 412 + if !info.status.is_active() { 413 + return Ok(Step2Result::InactiveAccount(info, prev)); 414 + } 378 415 if validate::should_drop(&info, prev.as_ref(), rev, "commit", &did) { 379 416 return Ok(Step2Result::Drop); 380 417 } ··· 383 420 None => Sync11Mode::Lenient, 384 421 }; 385 422 Ok(Step2Result::Proceed(info, prev, mode)) 423 + } 424 + 425 + /// Write `AccountStatus::Active` to storage for `did`, then re-run the 426 + /// step-2 checks so the commit pipeline can continue as normal. 427 + fn reactivate_and_recheck( 428 + db: &DbRef, 429 + did: &Did<'static>, 430 + rev: &Tid, 431 + pds_host: Option<Host>, 432 + ) -> crate::error::Result<Step2Result> { 433 + let Some((mut info, _)) = storage::repo::get(db, did)? else { 434 + return Ok(Step2Result::Drop); 435 + }; 436 + info.status = AccountStatus::Active; 437 + let mut batch = db.database.batch(); 438 + storage::repo::put_info_into(&mut batch, db, did, &info); 439 + batch 440 + .commit() 441 + .map_err(Into::<crate::storage::StorageError>::into)?; 442 + // Re-run the full step-2 check; the updated Active status means 443 + // InactiveAccount will not be returned again. 444 + check_step2_blocking(db, did.clone(), rev, pds_host) 386 445 } 387 446 388 447 /// Perform the storage-backed validation steps (6–9) and, if all pass,
+28 -9
src/sync/firehose/event_dispatcher.rs
··· 50 50 max_concurrent: usize, 51 51 resolver: Arc<crate::identity::Resolver>, 52 52 db: DbRef, 53 + client: crate::http::ThrottledClient, 53 54 } 54 55 55 56 struct PendingCommit { ··· 101 102 // --------------------------------------------------------------------------- 102 103 103 104 impl CommitDispatcher { 104 - pub fn new(resolver: Arc<crate::identity::Resolver>, db: DbRef, max_concurrent: usize) -> Self { 105 + pub fn new( 106 + resolver: Arc<crate::identity::Resolver>, 107 + db: DbRef, 108 + max_concurrent: usize, 109 + client: crate::http::ThrottledClient, 110 + ) -> Self { 105 111 Self { 106 112 queues: HashMap::new(), 107 113 busy: HashSet::new(), ··· 111 117 max_concurrent, 112 118 resolver, 113 119 db, 120 + client, 114 121 } 115 122 } 116 123 ··· 202 209 203 210 let resolver = self.resolver.clone(); 204 211 let db = self.db.clone(); 212 + let client = self.client.clone(); 205 213 let did_for_result = did.clone(); 206 214 let seq = pending.seq(); 207 215 let handle = self.workers.spawn(async move { 208 216 match pending { 209 217 PendingWork::Commit(p) => { 210 - run_commit_event_worker(p.commit, p.seq, did_for_result, resolver, db).await 218 + run_commit_event_worker( 219 + p.commit, 220 + p.seq, 221 + did_for_result, 222 + resolver, 223 + db, 224 + client, 225 + ) 226 + .await 211 227 } 212 228 PendingWork::Sync(p) => { 213 - run_sync_event_worker(p.sync, p.seq, did_for_result, resolver, db).await 229 + run_sync_event_worker(p.sync, p.seq, did_for_result, resolver, db, client) 230 + .await 214 231 } 215 232 PendingWork::Account(p) => { 216 233 run_account_event_worker(p.account, p.seq, did_for_result, db).await ··· 328 345 did: Did<'static>, 329 346 resolver: Arc<crate::identity::Resolver>, 330 347 db: DbRef, 348 + client: crate::http::ThrottledClient, 331 349 ) -> CommitWorkerResult { 332 - let outcome = super::commit_event::process_commit_event(commit, seq, &resolver, &db) 350 + let outcome = super::commit_event::process_commit_event(commit, seq, &resolver, &db, &client) 333 351 .await 334 352 .map_err(|e| e.to_string()); 335 353 CommitWorkerResult { did, seq, outcome } ··· 341 359 did: Did<'static>, 342 360 resolver: Arc<crate::identity::Resolver>, 343 361 db: DbRef, 362 + client: crate::http::ThrottledClient, 344 363 ) -> CommitWorkerResult { 345 - let outcome = super::sync_event::process_sync_event(sync, &resolver, &db) 364 + let outcome = super::sync_event::process_sync_event(sync, &resolver, &db, &client) 346 365 .await 347 366 .map_err(|e| e.to_string()); 348 367 CommitWorkerResult { did, seq, outcome } ··· 434 453 async fn commits_for_same_did_are_sequential() { 435 454 let db = crate::storage::open_temporary().unwrap(); 436 455 let resolver = make_resolver(); 437 - let mut d = CommitDispatcher::new(resolver, db, 4); 456 + let mut d = CommitDispatcher::new(resolver, db, 4, crate::http::build_client()); 438 457 439 458 let did: Did<'static> = Did::new_owned("did:plc:testsequential").unwrap(); 440 459 let c1 = { ··· 474 493 async fn commits_for_different_dids_run_in_parallel() { 475 494 let db = crate::storage::open_temporary().unwrap(); 476 495 let resolver = make_resolver(); 477 - let mut d = CommitDispatcher::new(resolver, db, 4); 496 + let mut d = CommitDispatcher::new(resolver, db, 4, crate::http::build_client()); 478 497 479 498 let did_a: Did<'static> = Did::new_owned("did:plc:testa").unwrap(); 480 499 let did_b: Did<'static> = Did::new_owned("did:plc:testb").unwrap(); ··· 491 510 async fn watermark_advances_after_completion() { 492 511 let db = crate::storage::open_temporary().unwrap(); 493 512 let resolver = make_resolver(); 494 - let mut d = CommitDispatcher::new(resolver, db, 4); 513 + let mut d = CommitDispatcher::new(resolver, db, 4, crate::http::build_client()); 495 514 496 515 let did_a: Did<'static> = Did::new_owned("did:plc:testwma").unwrap(); 497 516 let did_b: Did<'static> = Did::new_owned("did:plc:testwmb").unwrap(); ··· 512 531 async fn stalled_seq_evicted_from_watermark() { 513 532 let db = crate::storage::open_temporary().unwrap(); 514 533 let resolver = make_resolver(); 515 - let mut d = CommitDispatcher::new(resolver, db, 4); 534 + let mut d = CommitDispatcher::new(resolver, db, 4, crate::http::build_client()); 516 535 517 536 // Manually inject an old entry into outstanding without spawning a worker. 518 537 let stale_instant = Instant::now() - std::time::Duration::from_secs(STALL_EVICT_SECS + 1);
+9 -2
src/sync/firehose/mod.rs
··· 55 55 resolver: Arc<crate::identity::Resolver>, 56 56 max_workers: usize, 57 57 cursor_save_interval: Duration, 58 + client: crate::http::ThrottledClient, 58 59 } 59 60 60 61 impl Subscriber { ··· 64 65 resolver: Arc<crate::identity::Resolver>, 65 66 max_workers: usize, 66 67 cursor_save_interval: Duration, 68 + client: crate::http::ThrottledClient, 67 69 ) -> Self { 68 70 Self { 69 71 host, ··· 71 73 resolver, 72 74 max_workers, 73 75 cursor_save_interval, 76 + client, 74 77 } 75 78 } 76 79 ··· 88 91 89 92 // The dispatcher survives reconnects so in-flight workers keep running 90 93 // and the watermark doesn't regress. 91 - let mut dispatcher = 92 - CommitDispatcher::new(self.resolver.clone(), self.db.clone(), self.max_workers); 94 + let mut dispatcher = CommitDispatcher::new( 95 + self.resolver.clone(), 96 + self.db.clone(), 97 + self.max_workers, 98 + self.client.clone(), 99 + ); 93 100 let mut last_seq: i64 = 0; 94 101 let mut cursor_tick = Instant::now(); 95 102
+68 -14
src/sync/firehose/sync_event.rs
··· 26 26 use crate::storage::{ 27 27 self, DbRef, 28 28 pds_host::{self, Sync11Mode}, 29 - repo::{RepoInfo, RepoState}, 29 + repo::{AccountStatus, RepoInfo, RepoState}, 30 30 resync_queue::ResyncItem, 31 31 }; 32 32 use jacquard_common::url::Host; ··· 35 35 // Public entry point 36 36 // --------------------------------------------------------------------------- 37 37 38 + /// Outcome of [`check_and_enqueue`]. 39 + enum SyncCheckOutcome { 40 + /// Event handled (either processed or dropped for non-status reasons). 41 + Done, 42 + /// Account is locally inactive. Caller may check upstream before dropping. 43 + InactiveAccount, 44 + } 45 + 38 46 pub(super) async fn process_sync_event( 39 47 sync: Box<Sync<'static>>, 40 48 resolver: &Resolver, 41 49 db: &DbRef, 50 + client: &crate::http::ThrottledClient, 42 51 ) -> crate::error::Result<()> { 43 52 let did = sync.did.clone(); 44 53 ··· 72 81 }; 73 82 74 83 let pds_host: Option<Host> = resolved.pds.host().map(|h| h.to_owned()); 84 + let blocks = sync.blocks.to_vec(); 75 85 76 86 // ── Steps 2, 6, 7: Blocking storage checks + mark desync + enqueue ─────── 77 - let db = db.clone(); 78 - let blocks = sync.blocks.to_vec(); 79 - tokio::task::spawn_blocking(move || { 80 - check_and_enqueue(&db, did, rev, mst_root, blocks, pds_host) 81 - }) 82 - .await??; 87 + let outcome = { 88 + let db = db.clone(); 89 + let did = did.clone(); 90 + let rev = rev.clone(); 91 + let mst_root = mst_root.clone(); 92 + let blocks = blocks.clone(); 93 + let pds_host = pds_host.clone(); 94 + tokio::task::spawn_blocking(move || { 95 + check_and_enqueue(&db, did, rev, mst_root, blocks, pds_host) 96 + }) 97 + .await?? 98 + }; 99 + 100 + if let SyncCheckOutcome::InactiveAccount = outcome { 101 + // Our local record says inactive, but we may have missed a 102 + // reactivation #account event. Check upstream before dropping. 103 + if !validate::upstream_says_active(&pds_host, &did, client).await { 104 + return Ok(()); 105 + } 106 + // Upstream says active — persist the reactivation and re-run 107 + // check_and_enqueue with the updated status. 108 + metrics::counter!("lightrail_account_reactivated_total", 109 + "trigger" => "sync") 110 + .increment(1); 111 + info!(did = %did, "account reactivated via upstream check (triggered by #sync)"); 112 + let db_owned = db.clone(); 113 + tokio::task::spawn_blocking(move || { 114 + let Some((mut repo_info, _)) = storage::repo::get(&db_owned, &did)? else { 115 + return Ok(()); 116 + }; 117 + repo_info.status = AccountStatus::Active; 118 + let mut batch = db_owned.database.batch(); 119 + storage::repo::put_info_into(&mut batch, &db_owned, &did, &repo_info); 120 + batch 121 + .commit() 122 + .map_err(Into::<crate::storage::StorageError>::into)?; 123 + check_and_enqueue(&db_owned, did, rev, mst_root, blocks, pds_host).map(|_| ()) 124 + }) 125 + .await??; 126 + } 83 127 84 128 Ok(()) 85 129 } ··· 126 170 // Storage checks (blocking) 127 171 // --------------------------------------------------------------------------- 128 172 129 - /// Perform the storage-backed validation steps (2, 6, 7) 173 + /// Perform the storage-backed validation steps (2, 6, 7). 174 + /// 175 + /// Returns [`SyncCheckOutcome::InactiveAccount`] when the local record says 176 + /// the account is inactive, so the async caller can probe upstream and 177 + /// optionally retry. All other outcomes (processed or dropped) map to 178 + /// [`SyncCheckOutcome::Done`]. 130 179 fn check_and_enqueue( 131 180 db: &DbRef, 132 181 did: Did<'static>, ··· 134 183 mst_root: Vec<u8>, 135 184 blocks: Vec<u8>, 136 185 pds_host: Option<Host>, 137 - ) -> crate::error::Result<()> { 186 + ) -> crate::error::Result<SyncCheckOutcome> { 138 187 // A #sync event is unambiguously sync1.1 — upgrade the PDS mode eagerly, 139 188 // before any drop checks, since the presence of a valid signed event is 140 189 // enough to confirm the PDS supports sync1.1. ··· 151 200 152 201 let (info, prev) = match storage::repo::get(db, &did)? { 153 202 Some(r) => r, 154 - None => return Ok(()), // repo not indexed by us; drop silently 203 + None => return Ok(SyncCheckOutcome::Done), // repo not indexed by us; drop silently 155 204 }; 156 205 157 206 // If the sync event's commit exactly matches our stored prev_data, the repo ··· 164 213 { 165 214 metrics::counter!("lightrail_resync_avoided_total", "reason" => "no_op").increment(1); 166 215 debug!(did = %did, "sync dropped: commit state unchanged (no-op, likely post-migration)"); 167 - return Ok(()); 216 + return Ok(SyncCheckOutcome::Done); 168 217 } 169 218 170 - // Steps 2, 6, 7: shared drop checks. 219 + // Separate inactive check so the caller can probe upstream before dropping. 220 + if !info.status.is_active() { 221 + return Ok(SyncCheckOutcome::InactiveAccount); 222 + } 223 + 224 + // Steps 6, 7: shared drop checks (active status already verified above). 171 225 if validate::should_drop(&info, prev.as_ref(), &rev, "sync", &did) { 172 - return Ok(()); 226 + return Ok(SyncCheckOutcome::Done); 173 227 } 174 228 175 229 // All checks passed — mark desync and enqueue resync. ··· 198 252 batch 199 253 .commit() 200 254 .map_err(Into::<crate::storage::StorageError>::into)?; 201 - Ok(()) 255 + Ok(SyncCheckOutcome::Done) 202 256 }
+55 -1
src/sync/firehose/validate.rs
··· 15 15 use std::{fmt, sync::Arc}; 16 16 17 17 use jacquard_common::types::{crypto::PublicKey, string::Did, tid::Tid}; 18 - use tracing::{debug, warn}; 18 + use jacquard_common::url::Host; 19 + use tracing::{debug, info, warn}; 19 20 20 21 use crate::identity::{CachedIdentity, Resolver}; 21 22 use crate::storage::repo::{RepoInfo, RepoPrev, RepoState}; ··· 153 154 } 154 155 155 156 Ok(repo_commit) 157 + } 158 + 159 + // --------------------------------------------------------------------------- 160 + // Account reactivation 161 + // --------------------------------------------------------------------------- 162 + 163 + /// Query `com.atproto.sync.getRepoStatus` on the account's PDS. 164 + /// 165 + /// Returns `true` if the PDS reports the account as active. 166 + /// Returns `false` if the account is inactive, the host is unknown, the 167 + /// request fails, or the response cannot be parsed — treating all ambiguity 168 + /// conservatively as "still inactive". 169 + pub(super) async fn upstream_says_active( 170 + pds_host: &Option<Host>, 171 + did: &Did<'static>, 172 + client: &crate::http::ThrottledClient, 173 + ) -> bool { 174 + use jacquard_api::com_atproto::sync::get_repo_status::GetRepoStatus; 175 + use jacquard_common::xrpc::XrpcExt; 176 + 177 + let Some(host) = pds_host else { 178 + return false; 179 + }; 180 + let Ok(base) = format!("https://{host}").parse::<jacquard_common::url::Url>() else { 181 + return false; 182 + }; 183 + 184 + let resp = match client 185 + .xrpc(base) 186 + .send(&GetRepoStatus { did: did.clone() }) 187 + .await 188 + { 189 + Ok(r) => r, 190 + Err(e) => { 191 + debug!(did = %did, error = %e, 192 + "upstream getRepoStatus request failed; treating as inactive"); 193 + return false; 194 + } 195 + }; 196 + 197 + match resp.parse() { 198 + Ok(output) => { 199 + if output.active { 200 + info!(did = %did, "upstream confirms account active"); 201 + } 202 + output.active 203 + } 204 + Err(e) => { 205 + debug!(did = %did, error = %e, 206 + "upstream getRepoStatus parse failed; treating as inactive"); 207 + false 208 + } 209 + } 156 210 } 157 211 158 212 // ---------------------------------------------------------------------------
+12 -4
src/sync/resync/dispatcher.rs
··· 205 205 } 206 206 } 207 207 Ok(outcome) => { 208 - if let Err(e) = 209 - handle_completion(did.clone(), outcome, db.clone(), resolver.clone()).await 208 + if let Err(e) = handle_completion( 209 + did.clone(), 210 + outcome, 211 + db.clone(), 212 + resolver.clone(), 213 + client.clone(), 214 + ) 215 + .await 210 216 { 211 217 error!(error = %e, did = %did, "error handling worker completion"); 212 218 } ··· 306 312 outcome: WorkerOutcome, 307 313 db: DbRef, 308 314 resolver: std::sync::Arc<crate::identity::Resolver>, 315 + client: crate::http::ThrottledClient, 309 316 ) -> Result<()> { 310 317 match outcome { 311 318 WorkerOutcome::Success => { ··· 314 321 // Transition to Active first so buffered commits see the correct 315 322 // state when they're replayed through the normal pipeline. 316 323 transition_state(db.clone(), did.clone(), RepoState::Active, None).await?; 317 - replay_buffered_commits(did, db, &resolver).await?; 324 + replay_buffered_commits(did, db, &resolver, &client).await?; 318 325 } 319 326 WorkerOutcome::Retry { error, retry_count } => { 320 327 metrics::counter!("lightrail_resync_completed_total", "outcome" => "retry") ··· 381 388 did: Did<'static>, 382 389 db: DbRef, 383 390 resolver: &crate::identity::Resolver, 391 + client: &crate::http::ThrottledClient, 384 392 ) -> Result<()> { 385 393 let did_scan = did.clone(); 386 394 let db_scan = db.clone(); ··· 421 429 }; 422 430 423 431 if let Err(e) = crate::sync::firehose::commit_event::process_commit_event( 424 - commit, seq as i64, resolver, &db, 432 + commit, seq as i64, resolver, &db, client, 425 433 ) 426 434 .await 427 435 {