lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

sync1.1 strictness ratchet

phil 9f309918 93c0ce83

+80 -24
+6 -1
src/main.rs
··· 86 86 deep_crawl: bool, 87 87 88 88 /// Max concurrent per-PDS listRepos workers during deep crawl. 89 - #[arg(long, env = "LIGHTRAIL_MAX_DEEP_CRAWL_WORKERS", requires("deep_crawl"), default_value_t = 4)] 89 + #[arg( 90 + long, 91 + env = "LIGHTRAIL_MAX_DEEP_CRAWL_WORKERS", 92 + requires("deep_crawl"), 93 + default_value_t = 4 94 + )] 90 95 max_deep_crawl_workers: usize, 91 96 } 92 97
+1 -1
src/server/list_repos.rs
··· 60 60 /// scan for `limit` repos; if the scan yields a next DID, it is returned as 61 61 /// the cursor for the following page. 62 62 /// 63 - /// Repos without a chain tip (not yet synced, so no `head`/`rev` available) 63 + /// Repos without prev_data (not yet synced, so no `head`/`rev` available) 64 64 /// are omitted from the response, but still count against the scan window for 65 65 /// cursor advancement. 66 66 ///
+1 -1
src/storage/pds_host.rs
··· 36 36 } 37 37 38 38 impl Sync11Mode { 39 - fn as_str(&self) -> &'static str { 39 + pub fn as_str(&self) -> &'static str { 40 40 match self { 41 41 Sync11Mode::Lenient => "lenient", 42 42 Sync11Mode::Strict => "strict",
+47 -17
src/sync/firehose/commit_event.rs
··· 30 30 use crate::identity::Resolver; 31 31 use crate::storage::{ 32 32 self, DbRef, 33 + pds_host::{self, Sync11Mode}, 33 34 repo::{AccountStatus, RepoInfo, RepoPrev, RepoState}, 34 35 }; 36 + use jacquard_common::url::Host; 35 37 36 38 // --------------------------------------------------------------------------- 37 39 // Public entry point ··· 52 54 return Ok(()); 53 55 }; 54 56 55 - // ── Step 2: Account status + desync state ──────────────────────────────── 57 + let pds_host: Option<Host> = resolved.pds.host().map(|h| h.to_owned()); 58 + 59 + // ── Step 2: Account status + desync state + PDS mode ──────────────────── 56 60 let rev = commit.rev.clone(); 57 61 let db2 = db.clone(); 58 62 let did2 = did.clone(); 63 + let pds_host2 = pds_host.clone(); 59 64 let step2 = 60 - tokio::task::spawn_blocking(move || check_step2_blocking(&db2, did2, &rev)).await??; 61 - let (info, prev) = match step2 { 62 - Step2Result::Proceed(info, prev) => (info, prev), 65 + tokio::task::spawn_blocking(move || check_step2_blocking(&db2, did2, &rev, pds_host2)) 66 + .await??; 67 + let (info, prev, pds_mode) = match step2 { 68 + Step2Result::Proceed(info, prev, mode) => (info, prev, mode), 63 69 Step2Result::Drop => return Ok(()), 64 70 Step2Result::Buffer => { 65 71 // Repo is mid-resync. Serialize the commit and buffer it so it can ··· 121 127 let new_mst = Mst::load(Arc::clone(&storage), mst_root_cid, None); 122 128 123 129 // ── Step 5: Inductive proof ─────────────────────────────────────────────── 124 - if !verify_inductive_proof(new_mst, &commit.ops, commit.prev_data.as_ref()).await? { 130 + let strict = pds_mode == Sync11Mode::Strict; 131 + if !verify_inductive_proof(new_mst, &commit.ops, commit.prev_data.as_ref(), strict).await? { 125 132 metrics::counter!("lightrail_commit_dropped_total", "reason" => "proof_failed") 126 133 .increment(1); 127 134 debug!(did = %did, "commit dropped: inductive proof failed"); ··· 156 163 new_mst_root_bytes, 157 164 born, 158 165 died, 166 + pds_host, 167 + pds_mode, 159 168 ) 160 169 }) 161 170 .await??; ··· 234 243 mut mst: Mst<MemoryBlockStore>, 235 244 ops: &[RepoOp<'_>], 236 245 prev_data: Option<&CidLink<'_>>, 246 + strict: bool, 237 247 ) -> crate::error::Result<bool> { 238 248 // Without prevData we cannot check the final root. 239 249 let Some(prev_data_link) = prev_data else { 240 - // TODO: lenient handling for pre-sync1.1 commits (no prevData) 241 - return Ok(true); 250 + // Strict PDSes must always supply prevData. 251 + return Ok(!strict); 242 252 }; 243 253 244 254 let expected_prev_root = match prev_data_link.to_ipld() { 245 255 Ok(cid) => cid, 246 256 Err(_) => { 247 - // TODO: lenient handling for malformed prevData 248 - return Ok(true); 257 + // Malformed prevData CID — strict PDSes should never send this. 258 + return Ok(!strict); 249 259 } 250 260 }; 251 261 ··· 253 263 // key's path from root is available in the block store. 254 264 for op in ops.iter().rev() { 255 265 let Some(verified_op) = convert_op(op) else { 256 - // An op is missing required CIDs — pre-sync1.1 commits may omit 257 - // `prev` on Update/Delete ops. 258 - // TODO: lenient handling for pre-sync1.1 commits 259 - return Ok(true); 266 + // An op is missing required CIDs — strict PDSes must always include them. 267 + return Ok(!strict); 260 268 }; 261 269 262 270 let ok = mst ··· 309 317 /// Outcome of the step-2 storage check. 310 318 enum Step2Result { 311 319 /// All good — proceed with signature verification and the rest of the pipeline. 312 - Proceed(RepoInfo, Option<RepoPrev>), 320 + Proceed(RepoInfo, Option<RepoPrev>, Sync11Mode), 313 321 /// Drop this event (inactive account, desynchronized state, stale rev, etc.). 314 322 Drop, 315 323 /// Repo is mid-resync. Caller should buffer the commit for replay. ··· 329 337 db: &DbRef, 330 338 did: Did<'static>, 331 339 rev: &Tid, 340 + pds_host: Option<Host>, 332 341 ) -> crate::error::Result<Step2Result> { 333 342 let Some((info, prev)) = storage::repo::get(db, &did)? else { 334 343 // Unknown repo — create an entry and enqueue for initial fetch so that ··· 369 378 if validate::should_drop(&info, prev.as_ref(), rev, "commit", &did) { 370 379 return Ok(Step2Result::Drop); 371 380 } 372 - Ok(Step2Result::Proceed(info, prev)) 381 + let mode = match pds_host { 382 + Some(ref host) => pds_host::get(db, host)?.sync11_mode, 383 + None => Sync11Mode::Lenient, 384 + }; 385 + Ok(Step2Result::Proceed(info, prev, mode)) 373 386 } 374 387 375 388 /// Perform the storage-backed validation steps (6–9) and, if all pass, ··· 387 400 new_mst_root_bytes: Vec<u8>, 388 401 born: Vec<Nsid<'static>>, 389 402 died: Vec<Nsid<'static>>, 403 + pds_host: Option<Host>, 404 + current_mode: Sync11Mode, 390 405 ) -> crate::error::Result<()> { 391 406 if let Some(prev) = &prev { 392 407 // Step 8: `since` must match repo_prev's rev (the previous rev in the chain). ··· 432 447 info!(did = %did, collections = names, "collection death"); 433 448 } 434 449 435 - // All checks passed — atomically update the chain tip and the collection 450 + // All checks passed — atomically update the prev_data and the collection 436 451 // index (born → insert, died → remove). Also record each born collection 437 452 // in the global collection list (blind overwrite, never deleted). 438 453 let n_born = born.len() as u64; ··· 469 484 for coll in died { 470 485 storage::collection_index::remove_into(&mut batch, db, &did, coll); 471 486 } 487 + 488 + // Upgrade PDS to strict on first prevData-bearing commit — atomically with 489 + // the prev_data and collection index writes above. 490 + if incoming_prev_data.is_some() 491 + && current_mode == Sync11Mode::Lenient 492 + && let Some(ref host) = pds_host 493 + { 494 + let mut pds_info = pds_host::get(db, host)?; 495 + pds_info.sync11_mode = Sync11Mode::Strict; 496 + pds_host::put_into(&mut batch, db, host, &pds_info); 497 + metrics::counter!("lightrail_pds_mode_upgraded_total", "trigger" => "commit").increment(1); 498 + info!(pds = %host, "PDS upgraded to strict sync1.1 mode"); 499 + } 500 + 472 501 batch 473 502 .commit() 474 503 .map_err(Into::<crate::storage::StorageError>::into)?; ··· 481 510 metrics::counter!("lightrail_collection_deaths_total", "source" => "firehose") 482 511 .increment(n_died); 483 512 } 484 - metrics::counter!("lightrail_commits_indexed_total").increment(1); 513 + metrics::counter!("lightrail_commits_indexed_total", "mode" => current_mode.as_str()) 514 + .increment(1); 485 515 486 516 Ok(()) 487 517 }
+25 -4
src/sync/firehose/sync_event.rs
··· 19 19 20 20 use jacquard_api::com_atproto::sync::subscribe_repos::Sync; 21 21 use jacquard_common::types::{string::Did, tid::Tid}; 22 - use tracing::debug; 22 + use tracing::{debug, info}; 23 23 24 24 use super::validate::{self, CarDrop}; 25 25 use crate::identity::Resolver; 26 26 use crate::storage::{ 27 27 self, DbRef, 28 + pds_host::{self, Sync11Mode}, 28 29 repo::{RepoInfo, RepoState}, 29 30 resync_queue::ResyncItem, 30 31 }; 32 + use jacquard_common::url::Host; 31 33 32 34 // --------------------------------------------------------------------------- 33 35 // Public entry point ··· 69 71 } 70 72 }; 71 73 74 + let pds_host: Option<Host> = resolved.pds.host().map(|h| h.to_owned()); 75 + 72 76 // ── Steps 2, 6, 7: Blocking storage checks + mark desync + enqueue ─────── 73 77 let db = db.clone(); 74 78 let blocks = sync.blocks.to_vec(); 75 - tokio::task::spawn_blocking(move || check_and_enqueue(&db, did, rev, mst_root, blocks)) 76 - .await??; 79 + tokio::task::spawn_blocking(move || { 80 + check_and_enqueue(&db, did, rev, mst_root, blocks, pds_host) 81 + }) 82 + .await??; 77 83 78 84 Ok(()) 79 85 } ··· 127 133 rev: Tid, 128 134 mst_root: Vec<u8>, 129 135 blocks: Vec<u8>, 136 + pds_host: Option<Host>, 130 137 ) -> crate::error::Result<()> { 138 + // A #sync event is unambiguously sync1.1 — upgrade the PDS mode eagerly, 139 + // before any drop checks, since the presence of a valid signed event is 140 + // enough to confirm the PDS supports sync1.1. 141 + if let Some(ref host) = pds_host { 142 + let mut pds_info = pds_host::get(db, host)?; 143 + if pds_info.sync11_mode == Sync11Mode::Lenient { 144 + pds_info.sync11_mode = Sync11Mode::Strict; 145 + pds_host::put(db, host, &pds_info)?; 146 + metrics::counter!("lightrail_pds_mode_upgraded_total", "trigger" => "sync") 147 + .increment(1); 148 + info!(pds = %host, "PDS upgraded to strict sync1.1 mode via #sync event"); 149 + } 150 + } 151 + 131 152 let (info, prev) = match storage::repo::get(db, &did)? { 132 153 Some(r) => r, 133 154 None => return Ok(()), // repo not indexed by us; drop silently 134 155 }; 135 156 136 - // If the sync event's commit exactly matches our stored chain tip, the repo 157 + // If the sync event's commit exactly matches our stored prev_data, the repo 137 158 // state hasn't changed — this is a no-op re-announcement (e.g. a PDS 138 159 // migration where the relay didn't miss any commits) and there is nothing 139 160 // to resync.