very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust
fjall
at-protocol
atproto
indexer
1source common.nu
2
3source mock_pds.nu
4
5def main [] {
6 let port = resolve-test-port 3033
7 let url = $"http://localhost:($port)"
8 let binary = build-hydrant
9 let db = (mktemp -d -t hydrant_test.XXXXXX)
10
11 let instance = (with-env {
12 HYDRANT_RELAY_HOSTS: "",
13 HYDRANT_CRAWLER_URLS: "",
14 HYDRANT_RATE_TIERS: "custom:1/1/1/1/0"
15 } {
16 start-hydrant $binary $db $port
17 })
18 if not (wait-for-api $url) {
19 fail "hydrant did not start" $instance.pid
20 }
21
22 let mock_port = resolve-test-mock-port 9999
23 let mock_host = "127.0.0.1"
24
25 # kill any stale listener on the mock port from a previous failed run
26 try { bash -c $"fuser -k ($mock_port)/tcp" } catch {}
27 sleep 100ms
28
29 print "adding offline mock pds via firehose sources..."
30 http post -t application/json $"($url)/firehose/sources" {
31 url: $"ws://($mock_host):($mock_port)/",
32 is_pds: true
33 }
34
35 print "checking status transitions to Offline..."
36 mut offline = false
37
38 # the throttle backoff will cap at 1 second in debug builds.
39 # it takes 4 consecutive failures to mark as offline.
40 # therefore, 4 * 1 = ~4 seconds maximum for transition.
41 for i in 1..20 {
42 let res = (http get -fe $"($url)/xrpc/com.atproto.sync.getHostStatus?hostname=($mock_host)")
43 if $res.status == 200 {
44 if $res.body.status == "offline" {
45 $offline = true
46 break
47 }
48 if $res.body.status == "active" {
49 print $" ... currently ($res.body.status), waiting for offline"
50 }
51 } else {
52 print $" ... could not get status, waiting: ($res.status)"
53 }
54 sleep 2sec
55 }
56
57 if not $offline {
58 fail "host did not transition to offline within time limit" $instance.pid
59 }
60 print "ok: host transitioned to offline successfully."
61
62 print "starting mock pds websocket server..."
63 let mock_pds_handle = (start-mock-pds $mock_port)
64
65 sleep 500ms
66 http post -t application/json $"($url)/firehose/sources" {
67 url: $"ws://($mock_host):($mock_port)/",
68 is_pds: true
69 }
70
71 print "checking status transitions back to Active..."
72 mut active = false
73
74 # now wait for it to successfully reconnect and the active_sleep of 1s to pass.
75 for i in 1..20 {
76 let res = (http get -fe $"($url)/xrpc/com.atproto.sync.getHostStatus?hostname=($mock_host)")
77 if $res.status == 200 {
78 if $res.body.status == "active" {
79 $active = true
80 break
81 }
82 if $res.body.status == "offline" {
83 print $" ... currently ($res.body.status), waiting for active"
84 }
85 } else {
86 print $" ... could not get status, waiting: ($res.status)"
87 }
88 sleep 2sec
89 }
90
91 if $active {
92 print "ok: host transitioned to active successfully."
93 } else {
94 stop-mock-pds $mock_pds_handle
95 try { kill $instance.pid }
96 fail "host did not transition to active within time limit"
97 }
98
99 print "checking status transitions to Throttled..."
100 let put_res = (http put -fe -t application/json $"($url)/pds/tiers" {
101 host: $mock_host,
102 tier: "custom"
103 })
104 if $put_res.status != 200 {
105 print $"PUT /pds/tiers failed with status ($put_res.status)"
106 print $put_res.body
107 stop-mock-pds $mock_pds_handle
108 try { kill $instance.pid }
109 fail "failed to change tier"
110 }
111
112 # since we updated the tier via API, the status should change immediately
113 mut throttled = false
114 let res = (http get -fe $"($url)/xrpc/com.atproto.sync.getHostStatus?hostname=($mock_host)")
115 if $res.status == 200 and $res.body.status == "throttled" {
116 $throttled = true
117 }
118
119 if not $throttled {
120 stop-mock-pds $mock_pds_handle
121 try { kill $instance.pid }
122 fail "host did not transition to throttled after tier update"
123 }
124 print "ok: host transitioned to throttled successfully."
125
126 print "checking status transitions back to Active when limits loosen..."
127 http delete -fe $"($url)/pds/tiers?host=($mock_host)"
128
129 # should change back immediately
130 mut re_active = false
131 let res = (http get -fe $"($url)/xrpc/com.atproto.sync.getHostStatus?hostname=($mock_host)")
132 if $res.status == 200 and $res.body.status == "active" {
133 $re_active = true
134 }
135
136 stop-mock-pds $mock_pds_handle
137 try { kill $instance.pid }
138
139 if $re_active {
140 print "ok: host transitioned back to active successfully."
141 } else {
142 fail "host did not transition back to active after tier removed"
143 }
144
145 # verify that,
146 # 1. the glob rule resolves the tier for unassigned hosts (no explicit PUT /pds/tiers needed)
147 # 2. after removing an explicit tier override, the glob rule still applies -> host stays throttled
148
149 print "starting hydrant instance with a glob tier rule..."
150 let port2 = ($port + 100)
151 let url2 = $"http://localhost:($port2)"
152 let db2 = (mktemp -d -t hydrant_test.XXXXXX)
153
154 let instance2 = (with-env {
155 HYDRANT_RELAY_HOSTS: "",
156 HYDRANT_CRAWLER_URLS: "",
157 HYDRANT_RATE_TIERS: "custom:1/1/1/1/0",
158 HYDRANT_TIER_RULES: $"127.0.0.*:custom"
159 } {
160 start-hydrant $binary $db2 $port2
161 })
162 if not (wait-for-api $url2) {
163 try { kill $instance2.pid }
164 fail "hydrant instance did not start"
165 }
166
167 # kill any stale listener from first round
168 try { bash -c $"fuser -k ($mock_port)/tcp" } catch {}
169 sleep 100ms
170
171 # connect mock pds and wait for offline -> active cycle (same as above)
172 http post -t application/json $"($url2)/firehose/sources" {
173 url: $"ws://($mock_host):($mock_port)/",
174 is_pds: true
175 }
176
177 # wait for offline
178 print "waiting for offline..."
179 mut offline2 = false
180 for i in 1..20 {
181 let res = (http get -fe $"($url2)/xrpc/com.atproto.sync.getHostStatus?hostname=($mock_host)")
182 if $res.status == 200 and $res.body.status == "offline" {
183 $offline2 = true
184 break
185 }
186 sleep 2sec
187 }
188 if not $offline2 {
189 try { kill $instance2.pid }
190 fail "glob test: host did not go offline"
191 }
192
193 print "starting mock pds for glob test..."
194 let mock_pds2 = (start-mock-pds $mock_port)
195 sleep 500ms
196 http post -t application/json $"($url2)/firehose/sources" {
197 url: $"ws://($mock_host):($mock_port)/",
198 is_pds: true
199 }
200
201 # with account_limit=0 and the glob rule active, the host goes straight to throttled
202 # on the first successful connection — no explicit set_tier call needed.
203 print "waiting for connected (expect throttled, not active)..."
204 mut connected2 = false
205 for i in 1..20 {
206 let res = (http get -fe $"($url2)/xrpc/com.atproto.sync.getHostStatus?hostname=($mock_host)")
207 if $res.status == 200 and $res.body.status != "offline" {
208 $connected2 = true
209 print $" connected with status: ($res.body.status)"
210 break
211 }
212 sleep 2sec
213 }
214 if not $connected2 {
215 stop-mock-pds $mock_pds2
216 try { kill $instance2.pid }
217 fail "glob test: host did not reconnect"
218 }
219
220 # verify the glob rule throttled the host automatically (no set_tier was called)
221 print "checking glob test: glob rule throttles host without explicit tier assignment..."
222 let res = (http get -fe $"($url2)/xrpc/com.atproto.sync.getHostStatus?hostname=($mock_host)")
223 print $" status \(no set_tier\): ($res.body.status?)"
224 if $res.status != 200 or $res.body.status != "throttled" {
225 stop-mock-pds $mock_pds2
226 try { kill $instance2.pid }
227 fail $"glob test: expected throttled without set_tier \(glob rule should apply\), got ($res.body.status?)"
228 }
229 print "ok: host throttled by glob rule without explicit tier assignment."
230
231 # set explicit tier -> still throttled (sanity check)
232 print "checking glob test: explicit tier assignment also throttles..."
233 http put -fe -t application/json $"($url2)/pds/tiers" { host: $mock_host, tier: "custom" }
234 let res = (http get -fe $"($url2)/xrpc/com.atproto.sync.getHostStatus?hostname=($mock_host)")
235 print $" status after set_tier: ($res.body.status?)"
236 if $res.status != 200 or $res.body.status != "throttled" {
237 stop-mock-pds $mock_pds2
238 try { kill $instance2.pid }
239 fail $"glob test: expected throttled after set_tier, got ($res.body.status?)"
240 }
241 print "ok: host throttled via explicit tier."
242
243 # remove explicit override -> glob rule still applies -> still throttled
244 print "checking glob test: remove explicit tier keeps host throttled via glob rule..."
245 http delete -fe $"($url2)/pds/tiers?host=($mock_host)"
246
247 sleep 500ms
248 let res = (http get -fe $"($url2)/xrpc/com.atproto.sync.getHostStatus?hostname=($mock_host)")
249 print $" status after remove_tier: ($res.body.status?)"
250 let still_throttled = ($res.status == 200 and $res.body.status == "throttled")
251
252 stop-mock-pds $mock_pds2
253 try { kill $instance2.pid }
254
255 if $still_throttled {
256 print "ok: host remains throttled after tier override removed (glob rule applies)."
257 exit 0
258 } else {
259 fail $"glob test: expected throttled after remove_tier \(glob rule should apply\), got ($res.body.status?)"
260 }
261}