lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

defend against repeated listRepos cursor

phil fad4608b 6df8317f

+38 -27
+8 -8
hacking.md
··· 87 87 - [x] swap in repo-stream 88 88 - [x] actually wire in the resync buffer (oops) 89 89 - [x] make sure we're doing the right thing on decode errors (seems we are, tungstenite closes connection) 90 - - [~] "deep crawl" mode for relays 90 + - [x] "deep crawl" mode for relays 91 91 - [x] listHosts -> listRepos on host instead of relying on relay listRepos 92 - - [ ] defensive loop-cursor handling 93 - - [ ] lenient pre-sync1.1 94 - - [ ] *don't* allow non-validating commits that look like sync1.1 95 - - [ ] rachet by PDS host: be lenient if we have never seen a sync1.1-looking commit, always strict after we see one. 96 - - [ ] boooo we might need more handling for pre-sync1.1 repos if they don't include adjacent keys 92 + - [x] defensive loop-cursor handling 93 + - [x] lenient pre-sync1.1 94 + - [x] *don't* allow non-validating commits that look like sync1.1 95 + - [x] rachet by PDS host: be lenient if we have never seen a sync1.1-looking commit, always strict after we see one. 96 + - [?] boooo we might need more handling for pre-sync1.1 repos if they don't include adjacent keys 97 97 - [ ] resync free hints from first phony getRecord 98 98 - [ ] short-circuit: tiny repos may incidentally return their entire CAR for getRecord 99 99 - [ ] estimate CAR size and `getRecord` if it's likely very small (bypass `describeRepo`) 100 - - [ ] add a `--heavy` mode that always uses `getRepo` and never `describeRepo` 101 100 - [ ] commit CAR handling: generate a list of keys with gaps noted, to reliably detect missing adjacent keys 102 101 - [ ] account status convergeance: if we receive commits from apparently-inactive accounts, should we check upstream status to make sure we're not stale? 103 102 - [ ] split the keyspace: put the rbc/cbr indexes on a second keyspace with larger block size, expect hits on main keyspace ··· 106 105 107 106 108 107 very much still todo but i'm getting tired 109 - - [ ] multi-relay listener 108 + - [ ] add a `--heavy` mode that always uses `getRepo` and never `describeRepo` 109 + - [ ] multi-relay subscriber 110 110 - [ ] special did:web behaviour to keep reusing a stale resolution on failure 111 111 - [ ] admin view of backfill state etc 112 112 - [ ] vanity stats for optimizations, like how many in-flight repos were saved from resync due to high-water-mark firehose cursor persistence
+30 -19
src/sync/backfill.rs
··· 43 43 /// as fatal and abort the walk immediately. Transient errors are retried up to 44 44 /// `MAX_PAGE_FAILURES` times before giving up. Returns `Ok(true)` when the 45 45 /// full walk completes, `Ok(false)` if cancelled or the host gives up. 46 + /// 47 + /// passing in `resolver` implies that this is a PDS, not a relay -- treat with 48 + /// lower trust: verify DIDs belong on that host and check for evil cursor. 46 49 pub async fn run( 47 50 host: Host, 48 51 db: DbRef, ··· 84 87 let page_len = dids.len(); 85 88 let now = unix_now(); 86 89 90 + // resolver is passed in for untrusted hosts so we can filter out dids that the host isn't authoritative over 91 + // TODO: if *many* dids are cleared (probably as a percentage?) we might mark this host sketchy 87 92 let dids = match &resolver { 88 93 Some(r) => validate_dids(dids, r, &host, &token).await, 89 94 None => dids, ··· 108 113 "backfill page" 109 114 ); 110 115 111 - match next_cursor { 112 - Some(c) => cursor = Some(c), 113 - None => { 114 - let db = db.clone(); 115 - let host_owned = host.clone(); 116 - tokio::task::spawn_blocking(move || { 117 - set( 118 - &db, 119 - &host_owned, 120 - &BackfillProgress { 121 - cursor: String::new(), 122 - completed_at: Some(now.to_string()), 123 - }, 124 - ) 125 - }) 126 - .await??; 127 - info!(host = %host, total_queued, "backfill complete"); 128 - return Ok(true); 129 - } 116 + if next_cursor != cursor { 117 + cursor = next_cursor; 118 + continue; 130 119 } 120 + 121 + if let Some(c) = next_cursor { 122 + warn!(host = %host, cursor = c, "evil cursor! (unchanged), bailing on this host."); 123 + // TODO: mark host as not trustworthy 124 + }; 125 + 126 + // persist cursor so we can restart 127 + let db = db.clone(); 128 + let host_owned = host.clone(); 129 + tokio::task::spawn_blocking(move || { 130 + set( 131 + &db, 132 + &host_owned, 133 + &BackfillProgress { 134 + cursor: "".to_string(), 135 + completed_at: Some(now.to_string()), 136 + }, 137 + ) 138 + }) 139 + .await??; 140 + info!(host = %host, total_queued, "backfill complete"); 141 + return Ok(true); 131 142 } 132 143 } 133 144