very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
59
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 261 lines 9.4 kB view raw
1source common.nu 2 3source mock_pds.nu 4 5def main [] { 6 let port = resolve-test-port 3033 7 let url = $"http://localhost:($port)" 8 let binary = build-hydrant 9 let db = (mktemp -d -t hydrant_test.XXXXXX) 10 11 let instance = (with-env { 12 HYDRANT_RELAY_HOSTS: "", 13 HYDRANT_CRAWLER_URLS: "", 14 HYDRANT_RATE_TIERS: "custom:1/1/1/1/0" 15 } { 16 start-hydrant $binary $db $port 17 }) 18 if not (wait-for-api $url) { 19 fail "hydrant did not start" $instance.pid 20 } 21 22 let mock_port = resolve-test-mock-port 9999 23 let mock_host = "127.0.0.1" 24 25 # kill any stale listener on the mock port from a previous failed run 26 try { bash -c $"fuser -k ($mock_port)/tcp" } catch {} 27 sleep 100ms 28 29 print "adding offline mock pds via firehose sources..." 30 http post -t application/json $"($url)/firehose/sources" { 31 url: $"ws://($mock_host):($mock_port)/", 32 is_pds: true 33 } 34 35 print "checking status transitions to Offline..." 36 mut offline = false 37 38 # the throttle backoff will cap at 1 second in debug builds. 39 # it takes 4 consecutive failures to mark as offline. 40 # therefore, 4 * 1 = ~4 seconds maximum for transition. 41 for i in 1..20 { 42 let res = (http get -fe $"($url)/xrpc/com.atproto.sync.getHostStatus?hostname=($mock_host)") 43 if $res.status == 200 { 44 if $res.body.status == "offline" { 45 $offline = true 46 break 47 } 48 if $res.body.status == "active" { 49 print $" ... currently ($res.body.status), waiting for offline" 50 } 51 } else { 52 print $" ... could not get status, waiting: ($res.status)" 53 } 54 sleep 2sec 55 } 56 57 if not $offline { 58 fail "host did not transition to offline within time limit" $instance.pid 59 } 60 print "ok: host transitioned to offline successfully." 61 62 print "starting mock pds websocket server..." 63 let mock_pds_handle = (start-mock-pds $mock_port) 64 65 sleep 500ms 66 http post -t application/json $"($url)/firehose/sources" { 67 url: $"ws://($mock_host):($mock_port)/", 68 is_pds: true 69 } 70 71 print "checking status transitions back to Active..." 72 mut active = false 73 74 # now wait for it to successfully reconnect and the active_sleep of 1s to pass. 75 for i in 1..20 { 76 let res = (http get -fe $"($url)/xrpc/com.atproto.sync.getHostStatus?hostname=($mock_host)") 77 if $res.status == 200 { 78 if $res.body.status == "active" { 79 $active = true 80 break 81 } 82 if $res.body.status == "offline" { 83 print $" ... currently ($res.body.status), waiting for active" 84 } 85 } else { 86 print $" ... could not get status, waiting: ($res.status)" 87 } 88 sleep 2sec 89 } 90 91 if $active { 92 print "ok: host transitioned to active successfully." 93 } else { 94 stop-mock-pds $mock_pds_handle 95 try { kill $instance.pid } 96 fail "host did not transition to active within time limit" 97 } 98 99 print "checking status transitions to Throttled..." 100 let put_res = (http put -fe -t application/json $"($url)/pds/tiers" { 101 host: $mock_host, 102 tier: "custom" 103 }) 104 if $put_res.status != 200 { 105 print $"PUT /pds/tiers failed with status ($put_res.status)" 106 print $put_res.body 107 stop-mock-pds $mock_pds_handle 108 try { kill $instance.pid } 109 fail "failed to change tier" 110 } 111 112 # since we updated the tier via API, the status should change immediately 113 mut throttled = false 114 let res = (http get -fe $"($url)/xrpc/com.atproto.sync.getHostStatus?hostname=($mock_host)") 115 if $res.status == 200 and $res.body.status == "throttled" { 116 $throttled = true 117 } 118 119 if not $throttled { 120 stop-mock-pds $mock_pds_handle 121 try { kill $instance.pid } 122 fail "host did not transition to throttled after tier update" 123 } 124 print "ok: host transitioned to throttled successfully." 125 126 print "checking status transitions back to Active when limits loosen..." 127 http delete -fe $"($url)/pds/tiers?host=($mock_host)" 128 129 # should change back immediately 130 mut re_active = false 131 let res = (http get -fe $"($url)/xrpc/com.atproto.sync.getHostStatus?hostname=($mock_host)") 132 if $res.status == 200 and $res.body.status == "active" { 133 $re_active = true 134 } 135 136 stop-mock-pds $mock_pds_handle 137 try { kill $instance.pid } 138 139 if $re_active { 140 print "ok: host transitioned back to active successfully." 141 } else { 142 fail "host did not transition back to active after tier removed" 143 } 144 145 # verify that, 146 # 1. the glob rule resolves the tier for unassigned hosts (no explicit PUT /pds/tiers needed) 147 # 2. after removing an explicit tier override, the glob rule still applies -> host stays throttled 148 149 print "starting hydrant instance with a glob tier rule..." 150 let port2 = ($port + 100) 151 let url2 = $"http://localhost:($port2)" 152 let db2 = (mktemp -d -t hydrant_test.XXXXXX) 153 154 let instance2 = (with-env { 155 HYDRANT_RELAY_HOSTS: "", 156 HYDRANT_CRAWLER_URLS: "", 157 HYDRANT_RATE_TIERS: "custom:1/1/1/1/0", 158 HYDRANT_TIER_RULES: $"127.0.0.*:custom" 159 } { 160 start-hydrant $binary $db2 $port2 161 }) 162 if not (wait-for-api $url2) { 163 try { kill $instance2.pid } 164 fail "hydrant instance did not start" 165 } 166 167 # kill any stale listener from first round 168 try { bash -c $"fuser -k ($mock_port)/tcp" } catch {} 169 sleep 100ms 170 171 # connect mock pds and wait for offline -> active cycle (same as above) 172 http post -t application/json $"($url2)/firehose/sources" { 173 url: $"ws://($mock_host):($mock_port)/", 174 is_pds: true 175 } 176 177 # wait for offline 178 print "waiting for offline..." 179 mut offline2 = false 180 for i in 1..20 { 181 let res = (http get -fe $"($url2)/xrpc/com.atproto.sync.getHostStatus?hostname=($mock_host)") 182 if $res.status == 200 and $res.body.status == "offline" { 183 $offline2 = true 184 break 185 } 186 sleep 2sec 187 } 188 if not $offline2 { 189 try { kill $instance2.pid } 190 fail "glob test: host did not go offline" 191 } 192 193 print "starting mock pds for glob test..." 194 let mock_pds2 = (start-mock-pds $mock_port) 195 sleep 500ms 196 http post -t application/json $"($url2)/firehose/sources" { 197 url: $"ws://($mock_host):($mock_port)/", 198 is_pds: true 199 } 200 201 # with account_limit=0 and the glob rule active, the host goes straight to throttled 202 # on the first successful connection — no explicit set_tier call needed. 203 print "waiting for connected (expect throttled, not active)..." 204 mut connected2 = false 205 for i in 1..20 { 206 let res = (http get -fe $"($url2)/xrpc/com.atproto.sync.getHostStatus?hostname=($mock_host)") 207 if $res.status == 200 and $res.body.status != "offline" { 208 $connected2 = true 209 print $" connected with status: ($res.body.status)" 210 break 211 } 212 sleep 2sec 213 } 214 if not $connected2 { 215 stop-mock-pds $mock_pds2 216 try { kill $instance2.pid } 217 fail "glob test: host did not reconnect" 218 } 219 220 # verify the glob rule throttled the host automatically (no set_tier was called) 221 print "checking glob test: glob rule throttles host without explicit tier assignment..." 222 let res = (http get -fe $"($url2)/xrpc/com.atproto.sync.getHostStatus?hostname=($mock_host)") 223 print $" status \(no set_tier\): ($res.body.status?)" 224 if $res.status != 200 or $res.body.status != "throttled" { 225 stop-mock-pds $mock_pds2 226 try { kill $instance2.pid } 227 fail $"glob test: expected throttled without set_tier \(glob rule should apply\), got ($res.body.status?)" 228 } 229 print "ok: host throttled by glob rule without explicit tier assignment." 230 231 # set explicit tier -> still throttled (sanity check) 232 print "checking glob test: explicit tier assignment also throttles..." 233 http put -fe -t application/json $"($url2)/pds/tiers" { host: $mock_host, tier: "custom" } 234 let res = (http get -fe $"($url2)/xrpc/com.atproto.sync.getHostStatus?hostname=($mock_host)") 235 print $" status after set_tier: ($res.body.status?)" 236 if $res.status != 200 or $res.body.status != "throttled" { 237 stop-mock-pds $mock_pds2 238 try { kill $instance2.pid } 239 fail $"glob test: expected throttled after set_tier, got ($res.body.status?)" 240 } 241 print "ok: host throttled via explicit tier." 242 243 # remove explicit override -> glob rule still applies -> still throttled 244 print "checking glob test: remove explicit tier keeps host throttled via glob rule..." 245 http delete -fe $"($url2)/pds/tiers?host=($mock_host)" 246 247 sleep 500ms 248 let res = (http get -fe $"($url2)/xrpc/com.atproto.sync.getHostStatus?hostname=($mock_host)") 249 print $" status after remove_tier: ($res.body.status?)" 250 let still_throttled = ($res.status == 200 and $res.body.status == "throttled") 251 252 stop-mock-pds $mock_pds2 253 try { kill $instance2.pid } 254 255 if $still_throttled { 256 print "ok: host remains throttled after tier override removed (glob rule applies)." 257 exit 0 258 } else { 259 fail $"glob test: expected throttled after remove_tier \(glob rule should apply\), got ($res.body.status?)" 260 } 261}