very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
59
fork

Configure Feed

Select the types of activity you want to include in your feed.

[backfill] fix count tracking on resync

dawn 53d93fb5 310cfe5e

+245 -11
+9 -5
src/backfill/mod.rs
··· 628 628 let path = (collection.to_smolstr(), rkey.clone()); 629 629 let cid_obj = Cid::ipld(cid); 630 630 631 + *collection_counts.entry(path.0.clone()).or_default() += 1; 632 + 631 633 // check if this record already exists with same CID 632 634 let existing_cid = existing_cids.remove(&path); 633 635 let action = if let Some(existing_cid) = &existing_cid { ··· 667 669 added_blocks += 1; 668 670 if action == DbAction::Create { 669 671 delta += 1; 670 - *collection_counts.entry(path.0.clone()).or_default() += 1; 671 672 } 672 673 673 674 #[cfg(feature = "indexer_stream")] ··· 767 768 768 769 // add the counts 769 770 if !ephemeral { 770 - for (col, cnt) in collection_counts { 771 - db::set_record_count(&mut batch, &app_state.db, &did, &col, cnt); 772 - } 771 + db::replace_record_counts( 772 + &mut batch, 773 + &app_state.db, 774 + &did, 775 + collection_counts.iter().map(|(col, cnt)| (col.as_str(), *cnt)), 776 + )?; 773 777 } 774 778 775 779 batch.commit().into_diagnostic()?; ··· 808 812 trace!(ops = count, elapsed = %start.elapsed().as_secs_f32(), "did ops"); 809 813 810 814 // do the counts 811 - if records_cnt_delta > 0 { 815 + if records_cnt_delta != 0 { 812 816 app_state 813 817 .db 814 818 .update_count_async("records", records_cnt_delta)
+59
src/db/indexer.rs
··· 99 99 batch.insert(&db.counts, key, count.to_be_bytes()); 100 100 } 101 101 102 + pub fn replace_record_counts<'a>( 103 + batch: &mut OwnedWriteBatch, 104 + db: &Db, 105 + did: &Did<'_>, 106 + counts: impl IntoIterator<Item = (&'a str, u64)>, 107 + ) -> Result<()> { 108 + let prefix = keys::did_collection_prefix(did); 109 + for guard in db.counts.prefix(&prefix) { 110 + let key = guard.key().into_diagnostic()?; 111 + batch.remove(&db.counts, key); 112 + } 113 + 114 + for (collection, count) in counts { 115 + set_record_count(batch, db, did, collection, count); 116 + } 117 + 118 + Ok(()) 119 + } 120 + 102 121 pub fn update_record_count( 103 122 batch: &mut OwnedWriteBatch, 104 123 db: &Db, ··· 164 183 } 165 184 Ok(sources) 166 185 } 186 + 187 + #[cfg(test)] 188 + mod tests { 189 + use super::*; 190 + 191 + #[test] 192 + fn replace_record_counts_clears_stale_collections() -> Result<()> { 193 + let tmp = tempfile::tempdir().into_diagnostic()?; 194 + let cfg = crate::config::Config { 195 + database_path: tmp.path().to_path_buf(), 196 + ..Default::default() 197 + }; 198 + let db = Db::open(&cfg)?; 199 + let did = Did::new("did:plc:yk4q3id7id6p5z3bypvshc64").into_diagnostic()?; 200 + 201 + let mut batch = db.inner.batch(); 202 + set_record_count(&mut batch, &db, &did, "app.bsky.feed.post", 3); 203 + set_record_count(&mut batch, &db, &did, "app.bsky.feed.like", 2); 204 + batch.commit().into_diagnostic()?; 205 + 206 + let mut batch = db.inner.batch(); 207 + replace_record_counts( 208 + &mut batch, 209 + &db, 210 + &did, 211 + [("app.bsky.feed.like", 7), ("app.bsky.actor.profile", 1)], 212 + )?; 213 + batch.commit().into_diagnostic()?; 214 + 215 + assert_eq!(get_record_count(&db, &did, "app.bsky.feed.post")?, 0); 216 + assert_eq!(get_record_count(&db, &did, "app.bsky.feed.like")?, 7); 217 + assert_eq!(get_record_count(&db, &did, "app.bsky.actor.profile")?, 1); 218 + assert_eq!( 219 + db.counts.prefix(keys::did_collection_prefix(&did)).count(), 220 + 2 221 + ); 222 + 223 + Ok(()) 224 + } 225 + }
+1 -1
tests/by_collection.nu
··· 69 69 print "waiting for collection-index discovery pass..." 70 70 mut discovered = false 71 71 for i in 1..60 { 72 - let stats = (try { (http get $"($url)/stats?accurate=true").counts } catch { {} }) 72 + let stats = (try { (http get $"($url)/stats").counts } catch { {} }) 73 73 let repos = ($stats | get --optional repos | default 0 | into int) 74 74 print $"[($i)/60] repos: ($repos)" 75 75 if $repos >= ($known_dids | length) {
+1 -1
tests/common.nu
··· 174 174 export def wait-for-backfill [url: string] { 175 175 print "waiting for backfill to complete..." 176 176 for i in 1..120 { 177 - let stats = (http get $"($url)/stats?accurate=true").counts 177 + let stats = (http get $"($url)/stats").counts 178 178 let pending = ($stats.pending | into int) 179 179 let records = ($stats.records | into int) 180 180 let repos = ($stats.repos | into int)
+170
tests/count_tracking.nu
··· 1 + #!/usr/bin/env nu 2 + use common.nu * 3 + 4 + def record-data [text: string] { 5 + let timestamp = (date now | format date "%Y-%m-%dT%H:%M:%SZ") 6 + { 7 + "$type": "app.bsky.feed.post", 8 + text: $text, 9 + createdAt: $timestamp 10 + } 11 + } 12 + 13 + def api-count [url: string, did: string, collection: string] { 14 + (http get $"($url)/xrpc/systems.gaze.hydrant.countRecords?identifier=($did)&collection=($collection)").count | into int 15 + } 16 + 17 + def debug-count [debug_url: string, did: string, collection: string] { 18 + (http get $"($debug_url)/debug/count?did=($did)&collection=($collection)").count | into int 19 + } 20 + 21 + def stats-records [url: string] { 22 + (http get $"($url)/stats").counts.records | into int 23 + } 24 + 25 + def count-state [url: string, debug_url: string, did: string, collection: string] { 26 + { 27 + api: (api-count $url $did $collection), 28 + debug: (debug-count $debug_url $did $collection), 29 + records: (stats-records $url) 30 + } 31 + } 32 + 33 + def assert-count-state [label: string, state: record, expected_collection: int, expected_records: int] { 34 + print $"($label): api=($state.api), debug=($state.debug), stats.records=($state.records)" 35 + 36 + if $state.api != $state.debug { 37 + error make {msg: $"($label): countRecords mismatch. api=($state.api) debug=($state.debug)"} 38 + } 39 + 40 + if $state.api != $expected_collection { 41 + error make {msg: $"($label): expected collection count ($expected_collection), got ($state.api)"} 42 + } 43 + 44 + if $state.records != $expected_records { 45 + error make {msg: $"($label): expected stats.records ($expected_records), got ($state.records)"} 46 + } 47 + } 48 + 49 + def set-firehose [url: string, enabled: bool] { 50 + http patch -t application/json $"($url)/ingestion" { firehose: $enabled } | ignore 51 + let status = (http get $"($url)/ingestion") 52 + if $status.firehose != $enabled { 53 + error make {msg: $"expected firehose=($enabled), got ($status.firehose)"} 54 + } 55 + } 56 + 57 + def force-resync [url: string, did: string] { 58 + let resp = (http post -f -e -t application/json $"($url)/repos/resync" [{ did: $did }]) 59 + if $resp.status != 200 { 60 + error make {msg: $"repos/resync failed: ($resp.status) body=($resp.body)"} 61 + } 62 + } 63 + 64 + def main [] { 65 + let env_vars = load-env-file 66 + let did = ($env_vars | get --optional TEST_REPO) 67 + let password = ($env_vars | get --optional TEST_PASSWORD) 68 + 69 + if ($did | is-empty) or ($password | is-empty) { 70 + print "SKIP: TEST_REPO and TEST_PASSWORD not set in .env" 71 + exit 0 72 + } 73 + 74 + let pds_url = resolve-pds $did 75 + let collection = "app.bsky.feed.post" 76 + let port = resolve-test-port 3008 77 + let debug_port = resolve-test-debug-port ($port + 1) 78 + let url = $"http://localhost:($port)" 79 + let debug_url = $"http://localhost:($debug_port)" 80 + let db_path = (mktemp -d -t hydrant_count_tracking.XXXXXX) 81 + 82 + print $"testing count tracking for ($did)..." 83 + print $"database path: ($db_path)" 84 + 85 + let session = authenticate $pds_url $did $password 86 + let jwt = $session.accessJwt 87 + 88 + let seed = create-record $pds_url $jwt $did $collection (record-data "hydrant count seed") 89 + let seed_rkey = ($seed.uri | split row "/" | last) 90 + mut stale_rkey = "" 91 + mut instance = null 92 + mut success = false 93 + 94 + try { 95 + let binary = build-hydrant 96 + let relay = "wss://relay.fire.hose.cam" 97 + $instance = (with-env { HYDRANT_RELAY_HOSTS: $relay } { 98 + start-hydrant $binary $db_path $port 99 + }) 100 + 101 + if not (wait-for-api $url) { 102 + error make {msg: "api failed to start"} 103 + } 104 + 105 + print $"adding repo ($did) to tracking..." 106 + http put -t application/json $"($url)/repos" [{ did: $did }] 107 + 108 + if not (wait-for-backfill $url) { 109 + error make {msg: "initial backfill failed"} 110 + } 111 + 112 + let baseline = (count-state $url $debug_url $did $collection) 113 + if $baseline.api < 1 { 114 + error make {msg: $"expected at least one seeded record, got ($baseline.api)"} 115 + } 116 + assert-count-state "baseline" $baseline $baseline.api $baseline.records 117 + 118 + print "pausing firehose..." 119 + set-firehose $url false 120 + 121 + let stale = create-record $pds_url $jwt $did $collection (record-data "hydrant count stale create") 122 + $stale_rkey = ($stale.uri | split row "/" | last) 123 + sleep 2sec 124 + 125 + let stale_before_resync = (count-state $url $debug_url $did $collection) 126 + assert-count-state "stale before resync" $stale_before_resync $baseline.api $baseline.records 127 + 128 + print "forcing resync after stale create..." 129 + force-resync $url $did 130 + if not (wait-for-backfill $url) { 131 + error make {msg: "resync after create failed"} 132 + } 133 + 134 + let after_create = (count-state $url $debug_url $did $collection) 135 + assert-count-state "after create resync" $after_create ($baseline.api + 1) ($baseline.records + 1) 136 + 137 + delete-record $pds_url $jwt $did $collection $stale_rkey 138 + sleep 2sec 139 + 140 + let stale_before_delete_resync = (count-state $url $debug_url $did $collection) 141 + assert-count-state "stale before delete resync" $stale_before_delete_resync ($baseline.api + 1) ($baseline.records + 1) 142 + 143 + print "forcing resync after stale delete..." 144 + force-resync $url $did 145 + if not (wait-for-backfill $url) { 146 + error make {msg: "resync after delete failed"} 147 + } 148 + 149 + let after_delete = (count-state $url $debug_url $did $collection) 150 + assert-count-state "after delete resync" $after_delete $baseline.api $baseline.records 151 + 152 + $success = true 153 + } catch { |e| 154 + print $"test failed: ($e.msg)" 155 + } 156 + 157 + if ($instance | describe) != "nothing" { 158 + try { set-firehose $url true } 159 + try { kill --force $instance.pid } 160 + } 161 + 162 + if ($stale_rkey | is-not-empty) { 163 + try { delete-record $pds_url $jwt $did $collection $stale_rkey } 164 + } 165 + try { delete-record $pds_url $jwt $did $collection $seed_rkey } 166 + 167 + if not $success { 168 + exit 1 169 + } 170 + }
+2 -1
tests/run_all.nu
··· 48 48 # discover all test scripts, excluding infrastructure files 49 49 mut excluded = ["common", "mock_relay", "mock_pds", "run_all"] 50 50 if $skip_creds { 51 - $excluded = ($excluded | append ["authenticated_stream", "repo_sync_integrity"]) 51 + $excluded = ($excluded | append ["authenticated_stream", "count_tracking", "repo_sync_integrity"]) 52 52 } 53 53 let discovered = ( 54 54 ls tests/*.nu ··· 78 78 79 79 let groups = { 80 80 "authenticated_stream": "event_dependent", 81 + "count_tracking": "event_dependent", 81 82 "signal_filter": "event_dependent", 82 83 } 83 84 let grouped = $assigned | group-by {|t| $groups | get -o $t.name | default $t.name}
+1 -1
tests/stream.nu
··· 67 67 68 68 sleep 2sec 69 69 70 - let stats = (http get $"($url)/stats?accurate=true").counts 70 + let stats = (http get $"($url)/stats").counts 71 71 let events_count = ($stats.events | into int) 72 72 print $"total events in db: ($events_count)" 73 73
+1 -1
tests/throttling.nu
··· 62 62 63 63 # retry check for 30s 64 64 for i in 1..30 { 65 - let stats = (http get $"($url)/stats?accurate=true").counts 65 + let stats = (http get $"($url)/stats").counts 66 66 let pending = ($stats.pending | into int) 67 67 68 68 # we expect 5 repos from the mock, but max pending is 2.
+1 -1
tests/verify_crawler.nu
··· 67 67 68 68 # retry check for 30s 69 69 for i in 1..30 { 70 - let stats = (http get $"($url)/stats?accurate=true").counts 70 + let stats = (http get $"($url)/stats").counts 71 71 let pending = ($stats.pending | into int) 72 72 let repos = ($stats.repos | default 0 | into int) 73 73