very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
59
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 170 lines 6.0 kB view raw
1#!/usr/bin/env nu 2use common.nu * 3 4def record-data [text: string] { 5 let timestamp = (date now | format date "%Y-%m-%dT%H:%M:%SZ") 6 { 7 "$type": "app.bsky.feed.post", 8 text: $text, 9 createdAt: $timestamp 10 } 11} 12 13def api-count [url: string, did: string, collection: string] { 14 (http get $"($url)/xrpc/systems.gaze.hydrant.countRecords?identifier=($did)&collection=($collection)").count | into int 15} 16 17def debug-count [debug_url: string, did: string, collection: string] { 18 (http get $"($debug_url)/debug/count?did=($did)&collection=($collection)").count | into int 19} 20 21def stats-records [url: string] { 22 (http get $"($url)/stats").counts.records | into int 23} 24 25def count-state [url: string, debug_url: string, did: string, collection: string] { 26 { 27 api: (api-count $url $did $collection), 28 debug: (debug-count $debug_url $did $collection), 29 records: (stats-records $url) 30 } 31} 32 33def assert-count-state [label: string, state: record, expected_collection: int, expected_records: int] { 34 print $"($label): api=($state.api), debug=($state.debug), stats.records=($state.records)" 35 36 if $state.api != $state.debug { 37 error make {msg: $"($label): countRecords mismatch. api=($state.api) debug=($state.debug)"} 38 } 39 40 if $state.api != $expected_collection { 41 error make {msg: $"($label): expected collection count ($expected_collection), got ($state.api)"} 42 } 43 44 if $state.records != $expected_records { 45 error make {msg: $"($label): expected stats.records ($expected_records), got ($state.records)"} 46 } 47} 48 49def set-firehose [url: string, enabled: bool] { 50 http patch -t application/json $"($url)/ingestion" { firehose: $enabled } | ignore 51 let status = (http get $"($url)/ingestion") 52 if $status.firehose != $enabled { 53 error make {msg: $"expected firehose=($enabled), got ($status.firehose)"} 54 } 55} 56 57def force-resync [url: string, did: string] { 58 let resp = (http post -f -e -t application/json $"($url)/repos/resync" [{ did: $did }]) 59 if $resp.status != 200 { 60 error make {msg: $"repos/resync failed: ($resp.status) body=($resp.body)"} 61 } 62} 63 64def main [] { 65 let env_vars = load-env-file 66 let did = ($env_vars | get --optional TEST_REPO) 67 let password = ($env_vars | get --optional TEST_PASSWORD) 68 69 if ($did | is-empty) or ($password | is-empty) { 70 print "SKIP: TEST_REPO and TEST_PASSWORD not set in .env" 71 exit 0 72 } 73 74 let pds_url = resolve-pds $did 75 let collection = "app.bsky.feed.post" 76 let port = resolve-test-port 3008 77 let debug_port = resolve-test-debug-port ($port + 1) 78 let url = $"http://localhost:($port)" 79 let debug_url = $"http://localhost:($debug_port)" 80 let db_path = (mktemp -d -t hydrant_count_tracking.XXXXXX) 81 82 print $"testing count tracking for ($did)..." 83 print $"database path: ($db_path)" 84 85 let session = authenticate $pds_url $did $password 86 let jwt = $session.accessJwt 87 88 let seed = create-record $pds_url $jwt $did $collection (record-data "hydrant count seed") 89 let seed_rkey = ($seed.uri | split row "/" | last) 90 mut stale_rkey = "" 91 mut instance = null 92 mut success = false 93 94 try { 95 let binary = build-hydrant 96 let relay = "wss://relay.fire.hose.cam" 97 $instance = (with-env { HYDRANT_RELAY_HOSTS: $relay } { 98 start-hydrant $binary $db_path $port 99 }) 100 101 if not (wait-for-api $url) { 102 error make {msg: "api failed to start"} 103 } 104 105 print $"adding repo ($did) to tracking..." 106 http put -t application/json $"($url)/repos" [{ did: $did }] 107 108 if not (wait-for-backfill $url) { 109 error make {msg: "initial backfill failed"} 110 } 111 112 let baseline = (count-state $url $debug_url $did $collection) 113 if $baseline.api < 1 { 114 error make {msg: $"expected at least one seeded record, got ($baseline.api)"} 115 } 116 assert-count-state "baseline" $baseline $baseline.api $baseline.records 117 118 print "pausing firehose..." 119 set-firehose $url false 120 121 let stale = create-record $pds_url $jwt $did $collection (record-data "hydrant count stale create") 122 $stale_rkey = ($stale.uri | split row "/" | last) 123 sleep 2sec 124 125 let stale_before_resync = (count-state $url $debug_url $did $collection) 126 assert-count-state "stale before resync" $stale_before_resync $baseline.api $baseline.records 127 128 print "forcing resync after stale create..." 129 force-resync $url $did 130 if not (wait-for-backfill $url) { 131 error make {msg: "resync after create failed"} 132 } 133 134 let after_create = (count-state $url $debug_url $did $collection) 135 assert-count-state "after create resync" $after_create ($baseline.api + 1) ($baseline.records + 1) 136 137 delete-record $pds_url $jwt $did $collection $stale_rkey 138 sleep 2sec 139 140 let stale_before_delete_resync = (count-state $url $debug_url $did $collection) 141 assert-count-state "stale before delete resync" $stale_before_delete_resync ($baseline.api + 1) ($baseline.records + 1) 142 143 print "forcing resync after stale delete..." 144 force-resync $url $did 145 if not (wait-for-backfill $url) { 146 error make {msg: "resync after delete failed"} 147 } 148 149 let after_delete = (count-state $url $debug_url $did $collection) 150 assert-count-state "after delete resync" $after_delete $baseline.api $baseline.records 151 152 $success = true 153 } catch { |e| 154 print $"test failed: ($e.msg)" 155 } 156 157 if ($instance | describe) != "nothing" { 158 try { set-firehose $url true } 159 try { kill --force $instance.pid } 160 } 161 162 if ($stale_rkey | is-not-empty) { 163 try { delete-record $pds_url $jwt $did $collection $stale_rkey } 164 } 165 try { delete-record $pds_url $jwt $did $collection $seed_rkey } 166 167 if not $success { 168 exit 1 169 } 170}