very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust
fjall
at-protocol
atproto
indexer
1#!/usr/bin/env nu
2use common.nu *
3
4def test-crawler-sources [url: string, pid: int] {
5 print "=== test: crawler sources ==="
6
7 # initial state: no sources
8 print " GET /crawler/sources (expect empty)..."
9 let initial = (http get $"($url)/crawler/sources")
10 if ($initial | length) != 0 {
11 fail $"expected empty list, got ($initial | length) entries" $pid
12 }
13 print " ok: starts empty"
14
15 # add a list_repos source
16 print " POST /crawler/sources (list_repos)..."
17 http post -f -e -t application/json $"($url)/crawler/sources" {
18 url: "https://bsky.network",
19 mode: "list_repos"
20 } | assert-status 201 "POST /crawler/sources" $pid
21 print " ok: 201 Created"
22
23 # verify the source appears with correct fields
24 print " GET /crawler/sources (expect 1 entry)..."
25 let sources = (http get $"($url)/crawler/sources")
26 if ($sources | length) != 1 {
27 fail $"expected 1 source, got ($sources | length)" $pid
28 }
29 let s = ($sources | first)
30 if $s.mode != "list_repos" {
31 fail $"expected mode=list_repos, got ($s.mode)" $pid
32 }
33 print $" ok: 1 source, url=($s.url), mode=($s.mode)"
34
35 # posting the same URL with a different mode replaces the existing entry
36 print " POST /crawler/sources (should override)..."
37 http post -f -e -t application/json $"($url)/crawler/sources" {
38 url: "https://bsky.network",
39 mode: "by_collection"
40 } | assert-status 201 "POST /crawler/sources override" $pid
41 let after_replace = (http get $"($url)/crawler/sources")
42 if ($after_replace | length) != 1 {
43 fail $"expected 1 source after override, got ($after_replace | length)" $pid
44 }
45 if ($after_replace | first).mode != "by_collection" {
46 fail "expected mode to be updated to by_collection after override" $pid
47 }
48 print " ok: duplicate add replaced existing entry (mode updated)"
49
50 # remove the source
51 print " DELETE /crawler/sources..."
52 http delete -f -e -t application/json $"($url)/crawler/sources" --data {
53 url: "https://bsky.network"
54 } | assert-status 200 "DELETE /crawler/sources" $pid
55 let after_del = (http get $"($url)/crawler/sources")
56 if ($after_del | length) != 0 {
57 fail "expected empty list after delete" $pid
58 }
59 print " ok: source removed"
60
61 # deleting a non-existent source returns 404
62 print " DELETE /crawler/sources (should be 404)..."
63 http delete -f -e -t application/json $"($url)/crawler/sources" --data {
64 url: "https://bsky.network"
65 } | assert-status 404 "DELETE /crawler/sources missing" $pid
66 print " ok: 404 for non-existent source"
67
68 print "crawler source tests passed!"
69}
70
71# verify that dynamically added sources are written to the database and survive a restart.
72def test-source-persistence [binary: string, db_path: string, port: int] {
73 print "=== test: dynamically added sources persist across restart ==="
74
75 let url = $"http://localhost:($port)"
76
77 let instance = (with-env { HYDRANT_CRAWLER_URLS: "" } {
78 start-hydrant $binary $db_path $port
79 })
80 if not (wait-for-api $url) {
81 fail "hydrant did not start"
82 }
83
84 print " adding source..."
85 http post -t application/json $"($url)/crawler/sources" {
86 url: "https://lightrail.microcosm.blue",
87 mode: "by_collection"
88 }
89
90 let before = (http get $"($url)/crawler/sources")
91 if ($before | length) != 1 {
92 fail "source was not added" $instance.pid
93 }
94
95 # restart hydrant against the same database
96 print " restarting hydrant..."
97 kill $instance.pid
98 sleep 2sec
99
100 let instance2 = (with-env { HYDRANT_CRAWLER_URLS: "" } {
101 start-hydrant $binary $db_path $port
102 })
103 if not (wait-for-api $url) {
104 fail "hydrant did not restart" $instance2.pid
105 }
106
107 print " checking source survived restart..."
108 let after = (http get $"($url)/crawler/sources")
109 if ($after | length) != 1 {
110 fail $"expected 1 source after restart, got ($after | length)" $instance2.pid
111 }
112 let s = ($after | first)
113 if $s.mode != "by_collection" {
114 fail $"expected mode=by_collection after restart, got ($s.mode)" $instance2.pid
115 }
116 print " ok: persisted source survived restart"
117
118 kill $instance2.pid
119 print "source persistence test passed!"
120}
121
122# verify that CRAWLER_URLS sources are not written to the database (persisted=false),
123# can be stopped at runtime, but reappear on the next restart because the env var
124# is re-applied at startup.
125def test-config-source-not-persisted [binary: string, db_path: string, port: int] {
126 print "=== test: CRAWLER_URLS sources are not persisted ==="
127
128 let url = $"http://localhost:($port)"
129 let crawler_url = "https://lightrail.microcosm.blue"
130
131 let instance = (with-env { HYDRANT_CRAWLER_URLS: $"by_collection::($crawler_url)" } {
132 start-hydrant $binary $db_path $port
133 })
134 if not (wait-for-api $url) {
135 fail "hydrant did not start"
136 }
137
138 # config source should appear
139 print " checking config source appears..."
140 let sources = (http get $"($url)/crawler/sources")
141 if ($sources | length) != 1 {
142 fail $"expected 1 source, got ($sources | length)" $instance.pid
143 }
144 print " ok: config source present"
145
146 # the task can be stopped at runtime
147 print " deleting config source at runtime..."
148 http delete -f -e -t application/json $"($url)/crawler/sources" --data {
149 url: $crawler_url
150 } | assert-status 200 "DELETE /crawler/sources runtime" $instance.pid
151 let after_del = (http get $"($url)/crawler/sources")
152 if ($after_del | length) != 0 {
153 fail "expected source to be gone after runtime delete" $instance.pid
154 }
155 print " ok: config source removed at runtime"
156
157 # after a restart with the same CRAWLER_URLS, the config source reappears
158 print " restarting with same CRAWLER_URLS..."
159 kill $instance.pid
160 sleep 2sec
161
162 let instance2 = (with-env { HYDRANT_CRAWLER_URLS: $"by_collection::($crawler_url)" } {
163 start-hydrant $binary $db_path $port
164 })
165 if not (wait-for-api $url) {
166 fail "hydrant did not restart" $instance2.pid
167 }
168
169 let after_restart = (http get $"($url)/crawler/sources")
170 if ($after_restart | length) != 1 {
171 fail $"expected config source to reappear after restart, got ($after_restart | length)" $instance2.pid
172 }
173 print " ok: config source reappears on restart (not persisted to DB)"
174
175 kill $instance2.pid
176 print "config source persistence test passed!"
177}
178
179def test-firehose-sources [url: string, pid: int] {
180 print "=== test: firehose sources ==="
181
182 # initial state: no sources (we start with HYDRANT_RELAY_HOSTS="")
183 print " GET /firehose/sources (expect empty)..."
184 let initial = (http get $"($url)/firehose/sources")
185 if ($initial | length) != 0 {
186 fail $"expected empty list, got ($initial | length) entries" $pid
187 }
188 print " ok: starts empty"
189
190 # add a relay source
191 print " POST /firehose/sources..."
192 http post -f -e -t application/json $"($url)/firehose/sources" {
193 url: "wss://test.bsky.network"
194 } | assert-status 201 "POST /firehose/sources" $pid
195 print " ok: 201 Created"
196
197 # verify it appears
198 print " GET /firehose/sources (expect 1 entry)..."
199 let sources = (http get $"($url)/firehose/sources")
200 if ($sources | length) != 1 {
201 fail $"expected 1 source, got ($sources | length)" $pid
202 }
203 let s = ($sources | first)
204 print $" ok: 1 source, url=($s.url)"
205
206 # posting the same URL replaces the existing entry
207 print " POST /firehose/sources (should override)..."
208 http post -f -e -t application/json $"($url)/firehose/sources" {
209 url: "wss://test.bsky.network"
210 } | assert-status 201 "POST /firehose/sources override" $pid
211 let after_replace = (http get $"($url)/firehose/sources")
212 if ($after_replace | length) != 1 {
213 fail $"expected 1 source after override, got ($after_replace | length)" $pid
214 }
215 print " ok: duplicate add replaced existing entry"
216
217 # remove the source
218 print " DELETE /firehose/sources..."
219 http delete -f -e -t application/json $"($url)/firehose/sources" --data {
220 url: "wss://test.bsky.network"
221 } | assert-status 200 "DELETE /firehose/sources" $pid
222 let after_del = (http get $"($url)/firehose/sources")
223 if ($after_del | length) != 0 {
224 fail "expected empty list after delete" $pid
225 }
226 print " ok: source removed"
227
228 # deleting a non-existent source returns 404
229 print " DELETE /firehose/sources (should be 404)..."
230 http delete -f -e -t application/json $"($url)/firehose/sources" --data {
231 url: "wss://test.bsky.network"
232 } | assert-status 404 "DELETE /firehose/sources missing" $pid
233 print " ok: 404 for non-existent source"
234
235 print "firehose source tests passed!"
236}
237
238def test-pds-tiers [url: string, pid: int] {
239 print "=== test: pds tier management ==="
240
241 # initial state: no assignments, built-in rate tiers present
242 print " GET /pds/tiers (expect empty assignments, built-in rate_tiers)..."
243 let initial = (http get $"($url)/pds/tiers")
244 if ($initial.assignments | columns | length) != 0 {
245 fail $"expected empty assignments, got ($initial.assignments | columns | length)" $pid
246 }
247 if not ("default" in $initial.rate_tiers) {
248 fail "expected 'default' tier in rate_tiers" $pid
249 }
250 if not ("trusted" in $initial.rate_tiers) {
251 fail "expected 'trusted' tier in rate_tiers" $pid
252 }
253 print " ok: empty assignments and built-in rate tiers present"
254
255 # GET /pds/rate-tiers returns the same definitions with the right fields
256 print " GET /pds/rate-tiers (check structure)..."
257 let rate_tiers = (http get $"($url)/pds/rate-tiers")
258 for tier_name in ["default", "trusted"] {
259 let tier = ($rate_tiers | get $tier_name)
260 for field in ["per_second_base", "per_second_account_mul", "per_hour", "per_day", "account_limit"] {
261 if not ($field in $tier) {
262 fail $"($tier_name) tier missing field ($field)" $pid
263 }
264 }
265 }
266 # trusted tier must have higher per-second limit than default
267 if ($rate_tiers.trusted.per_second_base) <= ($rate_tiers.default.per_second_base) {
268 fail $"expected trusted.per_second_base > default, got ($rate_tiers.trusted.per_second_base) vs ($rate_tiers.default.per_second_base)" $pid
269 }
270 print " ok: rate tier definitions have correct fields and expected ordering"
271
272 # assign a host to the trusted tier
273 print " PUT /pds/tiers (assign to trusted)..."
274 http put -f -e -t application/json $"($url)/pds/tiers" {
275 host: "pds.example.com",
276 tier: "trusted"
277 } | assert-status 200 "PUT /pds/tiers" $pid
278 let after_assign = (http get $"($url)/pds/tiers")
279 if ($after_assign.assignments | columns | length) != 1 {
280 fail $"expected 1 assignment, got ($after_assign.assignments | columns | length)" $pid
281 }
282 if not ("pds.example.com" in $after_assign.assignments) {
283 fail $"expected host=pds.example.com to be assigned" $pid
284 }
285 if ($after_assign.assignments | get "pds.example.com") != "trusted" {
286 fail $"expected tier=trusted" $pid
287 }
288 print $" ok: assignment created host=pds.example.com, tier=trusted"
289
290 # re-assigning the same host to a different tier updates without creating a duplicate
291 print " PUT /pds/tiers (re-assign to default)..."
292 http put -f -e -t application/json $"($url)/pds/tiers" {
293 host: "pds.example.com",
294 tier: "default"
295 } | assert-status 200 "PUT /pds/tiers re-assign" $pid
296 let after_reassign = (http get $"($url)/pds/tiers")
297 if ($after_reassign.assignments | columns | length) != 1 {
298 fail $"expected 1 assignment after re-assign, got ($after_reassign.assignments | columns | length)" $pid
299 }
300 if ($after_reassign.assignments | get "pds.example.com") != "default" {
301 fail $"expected tier=default after re-assign" $pid
302 }
303 print " ok: re-assign updates tier without creating a duplicate"
304
305 # assigning an unknown tier name is rejected with 400
306 print " PUT /pds/tiers (unknown tier, expect 400)..."
307 http put -f -e -t application/json $"($url)/pds/tiers" {
308 host: "pds.example.com",
309 tier: "nonexistent"
310 } | assert-status 400 "PUT /pds/tiers unknown tier" $pid
311 let after_bad = (http get $"($url)/pds/tiers")
312 if ($after_bad.assignments | columns | length) != 1 {
313 fail "expected assignment count unchanged after rejected request" $pid
314 }
315 if ($after_bad.assignments | get "pds.example.com") != "default" {
316 fail "expected tier unchanged after rejected request" $pid
317 }
318 print " ok: unknown tier name rejected with 400, existing assignment unchanged"
319
320 # add a second host to verify multi-assignment listing works
321 print " PUT /pds/tiers (second host)..."
322 http put -f -e -t application/json $"($url)/pds/tiers" {
323 host: "other.example.com",
324 tier: "trusted"
325 } | assert-status 200 "PUT /pds/tiers second host" $pid
326 let after_second = (http get $"($url)/pds/tiers")
327 if ($after_second.assignments | columns | length) != 2 {
328 fail $"expected 2 assignments, got ($after_second.assignments | columns | length)" $pid
329 }
330 print " ok: two distinct hosts listed independently"
331
332 # remove the first host
333 print " DELETE /pds/tiers (first host)..."
334 http delete -f -e $"($url)/pds/tiers?host=pds.example.com" | assert-status 200 "DELETE /pds/tiers" $pid
335 let after_del = (http get $"($url)/pds/tiers")
336 if ($after_del.assignments | columns | length) != 1 {
337 fail $"expected 1 assignment after delete, got ($after_del.assignments | columns | length)" $pid
338 }
339 if not ("other.example.com" in $after_del.assignments) {
340 fail "expected only other.example.com to remain after delete" $pid
341 }
342 print " ok: correct host removed, other assignment intact"
343
344 # remove the second host
345 http delete -f -e $"($url)/pds/tiers?host=other.example.com" | assert-status 200 "DELETE /pds/tiers second" $pid
346
347 # deleting a non-existent host is idempotent (returns 200, not an error)
348 print " DELETE /pds/tiers (non-existent, expect 200)..."
349 http delete -f -e $"($url)/pds/tiers?host=pds.example.com" | assert-status 200 "DELETE /pds/tiers non-existent" $pid
350 let after_idempotent = (http get $"($url)/pds/tiers")
351 if ($after_idempotent.assignments | columns | length) != 0 {
352 fail "expected empty assignments after cleanup" $pid
353 }
354 print " ok: delete of non-existent host is idempotent"
355
356 print "pds tier management tests passed!"
357}
358
359# verify that tier assignments are written to the database and survive a restart.
360def test-pds-tier-persistence [binary: string, db_path: string, port: int] {
361 print "=== test: pds tier assignments persist across restart ==="
362
363 let url = $"http://localhost:($port)"
364
365 let instance = (with-env { HYDRANT_CRAWLER_URLS: "", HYDRANT_RELAY_HOSTS: "" } {
366 start-hydrant $binary $db_path $port
367 })
368 if not (wait-for-api $url) {
369 fail "hydrant did not start"
370 }
371
372 print " assigning host to trusted tier..."
373 http put -t application/json $"($url)/pds/tiers" {
374 host: "persist.example.com",
375 tier: "trusted"
376 }
377
378 let before = (http get $"($url)/pds/tiers")
379 if ($before.assignments | columns | length) != 1 {
380 fail "assignment was not created" $instance.pid
381 }
382
383 print " restarting hydrant..."
384 kill $instance.pid
385 sleep 2sec
386
387 let instance2 = (with-env { HYDRANT_CRAWLER_URLS: "", HYDRANT_RELAY_HOSTS: "" } {
388 start-hydrant $binary $db_path $port
389 })
390 if not (wait-for-api $url) {
391 fail "hydrant did not restart" $instance2.pid
392 }
393
394 print " checking assignment survived restart..."
395 let after = (http get $"($url)/pds/tiers")
396 if ($after.assignments | columns | length) != 1 {
397 fail $"expected 1 assignment after restart, got ($after.assignments | columns | length)" $instance2.pid
398 }
399 if not ("persist.example.com" in $after.assignments) {
400 fail $"expected host=persist.example.com after restart" $instance2.pid
401 }
402 if ($after.assignments | get "persist.example.com") != "trusted" {
403 fail $"expected tier=trusted after restart" $instance2.pid
404 }
405 print " ok: tier assignment persisted across restart"
406
407 kill $instance2.pid
408 print "pds tier persistence test passed!"
409}
410
411# verify that a custom tier defined via HYDRANT_RATE_TIERS is visible and assignable.
412def test-pds-custom-rate-tier [binary: string, db_path: string, port: int] {
413 print "=== test: custom rate tier via HYDRANT_RATE_TIERS ==="
414
415 let url = $"http://localhost:($port)"
416
417 # custom:100/1.0/360000/8640000 — base=100, mul=1.0, hourly=360000, daily=8640000
418 let instance = (with-env {
419 HYDRANT_CRAWLER_URLS: "",
420 HYDRANT_RELAY_HOSTS: "",
421 HYDRANT_RATE_TIERS: "custom:100/1.0/360000/8640000"
422 } {
423 start-hydrant $binary $db_path $port
424 })
425 if not (wait-for-api $url) {
426 fail "hydrant did not start"
427 }
428
429 # custom tier should appear alongside the built-in tiers
430 print " checking custom tier is listed in /pds/rate-tiers..."
431 let rate_tiers = (http get $"($url)/pds/rate-tiers")
432 if not ("custom" in $rate_tiers) {
433 fail "expected 'custom' tier in rate_tiers" $instance.pid
434 }
435 if not ("default" in $rate_tiers) {
436 fail "built-in 'default' tier should still be present alongside custom tier" $instance.pid
437 }
438 let custom = ($rate_tiers | get custom)
439 if $custom.per_second_base != 100 {
440 fail $"expected custom.per_second_base=100, got ($custom.per_second_base)" $instance.pid
441 }
442 if $custom.per_hour != 360000 {
443 fail $"expected custom.per_hour=360000, got ($custom.per_hour)" $instance.pid
444 }
445 print $" ok: custom tier listed with correct parameters"
446
447 # a host can be assigned to the custom tier
448 print " assigning host to custom tier..."
449 http put -f -e -t application/json $"($url)/pds/tiers" {
450 host: "custom.example.com",
451 tier: "custom"
452 } | assert-status 200 "PUT /pds/tiers custom tier" $instance.pid
453 let after = (http get $"($url)/pds/tiers")
454 if not ("custom.example.com" in $after.assignments) {
455 fail "expected assignment for custom.example.com" $instance.pid
456 }
457 if ($after.assignments | get "custom.example.com") != "custom" {
458 fail $"expected tier=custom" $instance.pid
459 }
460 print " ok: host assigned to custom tier successfully"
461
462 kill $instance.pid
463 print "custom rate tier test passed!"
464}
465
466def test-pds-banned [url: string, pid: int] {
467 print "=== test: pds ban management ==="
468
469 print " GET /pds/banned (expect empty)..."
470 let initial = (http get $"($url)/pds/banned")
471 if ($initial | length) != 0 {
472 fail $"expected empty banned list, got ($initial | length)" $pid
473 }
474 print " ok: starts empty"
475
476 print " PUT /pds/banned (ban host)..."
477 http put -f -e -t application/json $"($url)/pds/banned" {
478 host: "bad.example.com"
479 } | assert-status 200 "PUT /pds/banned" $pid
480
481 let after_ban = (http get $"($url)/pds/banned")
482 if ($after_ban | length) != 1 {
483 fail $"expected 1 banned host, got ($after_ban | length)" $pid
484 }
485 if ($after_ban | first) != "bad.example.com" {
486 fail $"expected bad.example.com, got ($after_ban | first)" $pid
487 }
488 print " ok: host banned"
489
490 print " DELETE /pds/banned (unban host)..."
491 http delete -f -e -t application/json $"($url)/pds/banned" --data {
492 host: "bad.example.com"
493 } | assert-status 200 "DELETE /pds/banned" $pid
494
495 let after_unban = (http get $"($url)/pds/banned")
496 if ($after_unban | length) != 0 {
497 fail "expected empty banned list after unban" $pid
498 }
499 print " ok: host unbanned"
500
501 print "pds ban management tests passed!"
502}
503
504# verify that banned hosts are written to the database and survive a restart.
505def test-pds-banned-persistence [binary: string, db_path: string, port: int] {
506 print "=== test: pds banned assignments persist across restart ==="
507
508 let url = $"http://localhost:($port)"
509
510 let instance = (with-env { HYDRANT_CRAWLER_URLS: "", HYDRANT_RELAY_HOSTS: "" } {
511 start-hydrant $binary $db_path $port
512 })
513 if not (wait-for-api $url) {
514 fail "hydrant did not start"
515 }
516
517 print " banning host..."
518 http put -t application/json $"($url)/pds/banned" {
519 host: "persist-ban.example.com"
520 }
521
522 let before = (http get $"($url)/pds/banned")
523 if ($before | length) != 1 {
524 fail "host was not banned" $instance.pid
525 }
526
527 print " restarting hydrant..."
528 kill $instance.pid
529 sleep 2sec
530
531 let instance2 = (with-env { HYDRANT_CRAWLER_URLS: "", HYDRANT_RELAY_HOSTS: "" } {
532 start-hydrant $binary $db_path $port
533 })
534 if not (wait-for-api $url) {
535 fail "hydrant did not restart" $instance2.pid
536 }
537
538 print " checking ban survived restart..."
539 let after = (http get $"($url)/pds/banned")
540 if ($after | length) != 1 {
541 fail $"expected 1 banned host after restart, got ($after | length)" $instance2.pid
542 }
543 if ($after | first) != "persist-ban.example.com" {
544 fail $"expected persist-ban.example.com after restart, got ($after | first)" $instance2.pid
545 }
546 print " ok: banned host persisted across restart"
547
548 kill $instance2.pid
549 print "pds ban persistence test passed!"
550}
551
552def main [] {
553 let port = resolve-test-port 3007
554 let url = $"http://localhost:($port)"
555
556 let binary = build-hydrant
557
558 let db = (mktemp -d -t hydrant_api.XXXXXX)
559 print $"db: ($db)"
560
561 let instance = (with-env { HYDRANT_CRAWLER_URLS: "", HYDRANT_RELAY_HOSTS: "" } {
562 start-hydrant $binary $db $port
563 })
564 if not (wait-for-api $url) {
565 fail "hydrant did not start" $instance.pid
566 }
567
568 test-crawler-sources $url $instance.pid
569 test-firehose-sources $url $instance.pid
570 test-pds-tiers $url $instance.pid
571 test-pds-banned $url $instance.pid
572
573 kill $instance.pid
574 sleep 2sec
575
576 let db_persist = (mktemp -d -t hydrant_api.XXXXXX)
577 print $"db: ($db_persist)"
578 test-source-persistence $binary $db_persist $port
579
580 sleep 1sec
581
582 let db_config = (mktemp -d -t hydrant_api.XXXXXX)
583 print $"db: ($db_config)"
584 test-config-source-not-persisted $binary $db_config $port
585
586 sleep 1sec
587
588 let db_pds_persist = (mktemp -d -t hydrant_api.XXXXXX)
589 print $"db: ($db_pds_persist)"
590 test-pds-tier-persistence $binary $db_pds_persist $port
591
592 sleep 1sec
593
594 let db_pds_custom = (mktemp -d -t hydrant_api.XXXXXX)
595 print $"db: ($db_pds_custom)"
596 test-pds-custom-rate-tier $binary $db_pds_custom $port
597
598 sleep 1sec
599
600 let db_pds_banned = (mktemp -d -t hydrant_api.XXXXXX)
601 print $"db: ($db_pds_banned)"
602 test-pds-banned-persistence $binary $db_pds_banned $port
603
604 print ""
605 print "all api tests passed!"
606}