very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust
fjall
at-protocol
atproto
indexer
1#!/usr/bin/env nu
2use common.nu *
3
4def record-data [text: string] {
5 let timestamp = (date now | format date "%Y-%m-%dT%H:%M:%SZ")
6 {
7 "$type": "app.bsky.feed.post",
8 text: $text,
9 createdAt: $timestamp
10 }
11}
12
13def api-count [url: string, did: string, collection: string] {
14 (http get $"($url)/xrpc/systems.gaze.hydrant.countRecords?identifier=($did)&collection=($collection)").count | into int
15}
16
17def debug-count [debug_url: string, did: string, collection: string] {
18 (http get $"($debug_url)/debug/count?did=($did)&collection=($collection)").count | into int
19}
20
21def stats-records [url: string] {
22 (http get $"($url)/stats").counts.records | into int
23}
24
25def count-state [url: string, debug_url: string, did: string, collection: string] {
26 {
27 api: (api-count $url $did $collection),
28 debug: (debug-count $debug_url $did $collection),
29 records: (stats-records $url)
30 }
31}
32
33def assert-count-state [label: string, state: record, expected_collection: int, expected_records: int] {
34 print $"($label): api=($state.api), debug=($state.debug), stats.records=($state.records)"
35
36 if $state.api != $state.debug {
37 error make {msg: $"($label): countRecords mismatch. api=($state.api) debug=($state.debug)"}
38 }
39
40 if $state.api != $expected_collection {
41 error make {msg: $"($label): expected collection count ($expected_collection), got ($state.api)"}
42 }
43
44 if $state.records != $expected_records {
45 error make {msg: $"($label): expected stats.records ($expected_records), got ($state.records)"}
46 }
47}
48
49def set-firehose [url: string, enabled: bool] {
50 http patch -t application/json $"($url)/ingestion" { firehose: $enabled } | ignore
51 let status = (http get $"($url)/ingestion")
52 if $status.firehose != $enabled {
53 error make {msg: $"expected firehose=($enabled), got ($status.firehose)"}
54 }
55}
56
57def force-resync [url: string, did: string] {
58 let resp = (http post -f -e -t application/json $"($url)/repos/resync" [{ did: $did }])
59 if $resp.status != 200 {
60 error make {msg: $"repos/resync failed: ($resp.status) body=($resp.body)"}
61 }
62}
63
64def main [] {
65 let env_vars = load-env-file
66 let did = ($env_vars | get --optional TEST_REPO)
67 let password = ($env_vars | get --optional TEST_PASSWORD)
68
69 if ($did | is-empty) or ($password | is-empty) {
70 print "SKIP: TEST_REPO and TEST_PASSWORD not set in .env"
71 exit 0
72 }
73
74 let pds_url = resolve-pds $did
75 let collection = "app.bsky.feed.post"
76 let port = resolve-test-port 3008
77 let debug_port = resolve-test-debug-port ($port + 1)
78 let url = $"http://localhost:($port)"
79 let debug_url = $"http://localhost:($debug_port)"
80 let db_path = (mktemp -d -t hydrant_count_tracking.XXXXXX)
81
82 print $"testing count tracking for ($did)..."
83 print $"database path: ($db_path)"
84
85 let session = authenticate $pds_url $did $password
86 let jwt = $session.accessJwt
87
88 let seed = create-record $pds_url $jwt $did $collection (record-data "hydrant count seed")
89 let seed_rkey = ($seed.uri | split row "/" | last)
90 mut stale_rkey = ""
91 mut instance = null
92 mut success = false
93
94 try {
95 let binary = build-hydrant
96 let relay = "wss://relay.fire.hose.cam"
97 $instance = (with-env { HYDRANT_RELAY_HOSTS: $relay } {
98 start-hydrant $binary $db_path $port
99 })
100
101 if not (wait-for-api $url) {
102 error make {msg: "api failed to start"}
103 }
104
105 print $"adding repo ($did) to tracking..."
106 http put -t application/json $"($url)/repos" [{ did: $did }]
107
108 if not (wait-for-backfill $url) {
109 error make {msg: "initial backfill failed"}
110 }
111
112 let baseline = (count-state $url $debug_url $did $collection)
113 if $baseline.api < 1 {
114 error make {msg: $"expected at least one seeded record, got ($baseline.api)"}
115 }
116 assert-count-state "baseline" $baseline $baseline.api $baseline.records
117
118 print "pausing firehose..."
119 set-firehose $url false
120
121 let stale = create-record $pds_url $jwt $did $collection (record-data "hydrant count stale create")
122 $stale_rkey = ($stale.uri | split row "/" | last)
123 sleep 2sec
124
125 let stale_before_resync = (count-state $url $debug_url $did $collection)
126 assert-count-state "stale before resync" $stale_before_resync $baseline.api $baseline.records
127
128 print "forcing resync after stale create..."
129 force-resync $url $did
130 if not (wait-for-backfill $url) {
131 error make {msg: "resync after create failed"}
132 }
133
134 let after_create = (count-state $url $debug_url $did $collection)
135 assert-count-state "after create resync" $after_create ($baseline.api + 1) ($baseline.records + 1)
136
137 delete-record $pds_url $jwt $did $collection $stale_rkey
138 sleep 2sec
139
140 let stale_before_delete_resync = (count-state $url $debug_url $did $collection)
141 assert-count-state "stale before delete resync" $stale_before_delete_resync ($baseline.api + 1) ($baseline.records + 1)
142
143 print "forcing resync after stale delete..."
144 force-resync $url $did
145 if not (wait-for-backfill $url) {
146 error make {msg: "resync after delete failed"}
147 }
148
149 let after_delete = (count-state $url $debug_url $did $collection)
150 assert-count-state "after delete resync" $after_delete $baseline.api $baseline.records
151
152 $success = true
153 } catch { |e|
154 print $"test failed: ($e.msg)"
155 }
156
157 if ($instance | describe) != "nothing" {
158 try { set-firehose $url true }
159 try { kill --force $instance.pid }
160 }
161
162 if ($stale_rkey | is-not-empty) {
163 try { delete-record $pds_url $jwt $did $collection $stale_rkey }
164 }
165 try { delete-record $pds_url $jwt $did $collection $seed_rkey }
166
167 if not $success {
168 exit 1
169 }
170}