declarative relay deployment on hetzner relay-eval.waow.tech
atproto relay
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

add collectiondir backfill tooling, mark project as experimental

- add envFrom for collectiondir admin token secret
- add backfill script (stdlib-only uv script, batched crawl with progress)
- add backfill recipe to justfile (port-forward + host extraction)
- add dotenv-load to justfile
- add experimental warning to README, simplify what's here section

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz f30da0ca a5745312

+179 -23
+6 -23
README.md
··· 1 1 # relay.waow.tech 2 2 3 + > **experimental** — this is a personal project for learning ATProto infrastructure. the endpoints below may go down, lose data, or change without notice. do not depend on them for anything that matters. 4 + 3 5 a full-network [ATProto](https://atproto.com) relay running on a single Hetzner Cloud node with k3s. a [jetstream](https://github.com/bluesky-social/jetstream) instance runs alongside it, re-encoding the relay's CBOR firehose into plain JSON over websockets — easier to consume if you don't need the full atproto SDK. a [collectiondir](https://github.com/bluesky-social/indigo/tree/main/cmd/collectiondir) sidecar indexes `(DID, collection)` pairs from the firehose and serves `com.atproto.sync.listReposByCollection` — the endpoint TAP crawlers need to enumerate the network. 4 6 5 7 **relay endpoint:** `wss://relay.waow.tech` — raw CBOR firehose ([`com.atproto.sync.subscribeRepos`](https://docs.bsky.app/docs/advanced-guides/firehose)) ··· 57 59 58 60 ``` 59 61 . 60 - ├── scripts/ 61 - │ ├── firehose # uv script — raw CBOR firehose consumer 62 - │ └── jetstream # uv script — JSON jetstream consumer 63 - ├── justfile # all commands: deploy, status, logs, etc. 64 - ├── infra/ # terraform — hetzner server + k3s 65 - │ ├── main.tf 66 - │ ├── variables.tf 67 - │ ├── versions.tf 68 - │ └── outputs.tf 69 - └── deploy/ # helm values + k8s manifests 70 - ├── relay-values.yaml 71 - ├── jetstream-values.yaml 72 - ├── collectiondir-values.yaml 73 - ├── postgres-values.yaml 74 - ├── monitoring-values.yaml 75 - ├── relay-dashboard.json 76 - ├── relay-servicemonitor.yaml 77 - ├── jetstream-servicemonitor.yaml 78 - ├── collectiondir-servicemonitor.yaml 79 - ├── ingress.yaml 80 - ├── jetstream-ingress.yaml 81 - ├── grafana-ingress.yaml 82 - └── cluster-issuer.yaml 62 + ├── scripts/ # uv scripts — firehose, jetstream, backfill 63 + ├── justfile # all commands: deploy, status, logs, backfill, etc. 64 + ├── infra/ # terraform — hetzner server + k3s 65 + └── deploy/ # helm values + k8s manifests 83 66 ``` 84 67 85 68 ## why
+3
deploy/collectiondir-values.yaml
··· 32 32 timeoutSeconds: 3 33 33 failureThreshold: 5 34 34 readiness: *probes 35 + envFrom: 36 + - secretRef: 37 + name: collectiondir-secret 35 38 resources: 36 39 requests: 37 40 memory: 128Mi
+35
justfile
··· 2 2 # required env vars: HCLOUD_TOKEN, RELAY_DOMAIN, RELAY_ADMIN_PASSWORD, POSTGRES_PASSWORD, LETSENCRYPT_EMAIL 3 3 # optional env vars: GRAFANA_DOMAIN (default: relay-metrics.waow.tech), GRAFANA_ADMIN_PASSWORD, JETSTREAM_DOMAIN (default: jetstream.waow.tech) 4 4 5 + set dotenv-load 6 + 5 7 export KUBECONFIG := justfile_directory() / "kubeconfig.yaml" 6 8 7 9 # show available recipes ··· 132 134 sed "s|GRAFANA_DOMAIN_PLACEHOLDER|$GRAFANA_DOMAIN|g" deploy/grafana-ingress.yaml \ 133 135 | kubectl apply -f - 134 136 137 + echo "==> creating collectiondir secret" 138 + kubectl create secret generic collectiondir-secret \ 139 + --namespace relay \ 140 + --from-literal=COLLECTIONS_ADMIN_TOKEN="${COLLECTIONDIR_ADMIN_TOKEN:-}" \ 141 + --dry-run=client -o yaml | kubectl apply -f - 142 + 135 143 echo "==> installing collectiondir" 136 144 helm upgrade --install collectiondir bjw-s/app-template \ 137 145 --namespace relay \ ··· 221 229 # consume the jetstream (default: 10s of all events) 222 230 jetstream *args: 223 231 ./scripts/jetstream {{ args }} 232 + 233 + # backfill collectiondir with full network PDS hosts 234 + backfill *args: 235 + #!/usr/bin/env bash 236 + set -euo pipefail 237 + : "${COLLECTIONDIR_ADMIN_TOKEN:?set COLLECTIONDIR_ADMIN_TOKEN}" 238 + : "${RELAY_ADMIN_PASSWORD:?set RELAY_ADMIN_PASSWORD}" 239 + 240 + echo "==> port-forwarding to relay and collectiondir" 241 + kubectl port-forward -n relay svc/relay 12470:2470 & 242 + RELAY_PF=$! 243 + kubectl port-forward -n relay svc/collectiondir 2510:2510 & 244 + CDIR_PF=$! 245 + trap "kill $RELAY_PF $CDIR_PF 2>/dev/null" EXIT 246 + sleep 3 247 + 248 + echo "==> extracting connected PDS host list from relay" 249 + curl -sf -u "admin:$RELAY_ADMIN_PASSWORD" http://localhost:12470/admin/pds/list \ 250 + | jq -r '.[] | select(.HasActiveConnection) | .Host' > /tmp/relay-hosts.txt 251 + TOTAL=$(curl -sf -u "admin:$RELAY_ADMIN_PASSWORD" http://localhost:12470/admin/pds/list | jq 'length') 252 + echo " $(wc -l < /tmp/relay-hosts.txt | tr -d ' ') connected hosts (of $TOTAL total)" 253 + 254 + echo "==> starting backfill" 255 + ./scripts/backfill \ 256 + --token "$COLLECTIONDIR_ADMIN_TOKEN" \ 257 + --hosts /tmp/relay-hosts.txt \ 258 + {{ args }}
+135
scripts/backfill
··· 1 + #!/usr/bin/env -S uv run --script --quiet 2 + # /// script 3 + # requires-python = ">=3.12" 4 + # dependencies = [] 5 + # /// 6 + """ 7 + backfill collectiondir by crawling PDS hosts in batches. 8 + 9 + reads a host list and sends batched requestCrawl calls, polling crawlStatus 10 + between batches to let active crawls drain before proceeding. 11 + 12 + usage: 13 + ./scripts/backfill --token "$TOKEN" --hosts hosts.txt 14 + ./scripts/backfill --token "$TOKEN" --hosts hosts.txt --batch-size 20 --pause 30 15 + ./scripts/backfill --token "$TOKEN" --hosts hosts.txt --url http://localhost:2510 16 + """ 17 + 18 + import argparse 19 + import json 20 + import signal 21 + import time 22 + import urllib.request 23 + import urllib.error 24 + 25 + 26 + def request_crawl(url: str, token: str, hosts: list[str]) -> dict: 27 + """POST /admin/pds/requestCrawl with a batch of hosts.""" 28 + data = json.dumps({"hosts": hosts}).encode() 29 + req = urllib.request.Request( 30 + f"{url}/admin/pds/requestCrawl", 31 + data=data, 32 + headers={ 33 + "Authorization": f"Bearer {token}", 34 + "Content-Type": "application/json", 35 + }, 36 + method="POST", 37 + ) 38 + with urllib.request.urlopen(req, timeout=30) as resp: 39 + return json.loads(resp.read()) 40 + 41 + 42 + def crawl_status(url: str, token: str) -> dict: 43 + """GET /admin/crawlStatus — returns active crawl info.""" 44 + req = urllib.request.Request( 45 + f"{url}/admin/crawlStatus", 46 + headers={"Authorization": f"Bearer {token}"}, 47 + ) 48 + with urllib.request.urlopen(req, timeout=30) as resp: 49 + return json.loads(resp.read()) 50 + 51 + 52 + def wait_for_drain(url: str, token: str, poll_interval: float = 3.0) -> int: 53 + """poll crawlStatus until no active crawls remain. returns total repos seen.""" 54 + total_seen = 0 55 + while True: 56 + status = crawl_status(url, token) 57 + active = status.get("host_starts", {}) 58 + if not active: 59 + return total_seen 60 + seen = sum(h.get("seen", 0) for h in active.values()) 61 + total_seen = max(total_seen, seen) 62 + print(f" waiting... {len(active)} active crawls, {seen} repos seen", end="\r") 63 + time.sleep(poll_interval) 64 + 65 + 66 + def main(): 67 + parser = argparse.ArgumentParser(description="backfill collectiondir via batched PDS crawl") 68 + parser.add_argument("--token", required=True, help="admin bearer token") 69 + parser.add_argument("--hosts", required=True, help="file with one PDS hostname per line") 70 + parser.add_argument("--url", default="http://localhost:2510", help="collectiondir base url (default: http://localhost:2510)") 71 + parser.add_argument("--batch-size", type=int, default=10, help="hosts per batch (default: 10)") 72 + parser.add_argument("--pause", type=int, default=5, help="seconds to pause between batches (default: 5)") 73 + args = parser.parse_args() 74 + 75 + with open(args.hosts) as f: 76 + all_hosts = [line.strip() for line in f if line.strip()] 77 + 78 + if not all_hosts: 79 + print("no hosts found in file") 80 + return 81 + 82 + # batches 83 + batches = [all_hosts[i : i + args.batch_size] for i in range(0, len(all_hosts), args.batch_size)] 84 + 85 + stopping = False 86 + 87 + def stop(*_): 88 + nonlocal stopping 89 + print("\n\nctrl-c: finishing current batch, then stopping...") 90 + stopping = True 91 + 92 + signal.signal(signal.SIGINT, stop) 93 + 94 + print(f"backfill: {len(all_hosts)} hosts in {len(batches)} batches of {args.batch_size}") 95 + print(f"target: {args.url}") 96 + print() 97 + 98 + start = time.time() 99 + total_repos = 0 100 + hosts_crawled = 0 101 + 102 + for i, batch in enumerate(batches): 103 + if stopping: 104 + break 105 + 106 + elapsed = time.time() - start 107 + print(f"batch {i + 1}/{len(batches)} ({len(batch)} hosts) [{elapsed:.0f}s elapsed, {hosts_crawled} hosts done, {total_repos} repos]") 108 + 109 + try: 110 + request_crawl(args.url, args.token, batch) 111 + except urllib.error.HTTPError as e: 112 + body = e.read().decode(errors="replace") 113 + print(f" error: {e.code} {e.reason} — {body}") 114 + if e.code == 403: 115 + print(" check that COLLECTIONS_ADMIN_TOKEN is set on the collectiondir pod") 116 + return 117 + continue 118 + except Exception as e: 119 + print(f" error sending batch: {e}") 120 + continue 121 + 122 + repos = wait_for_drain(args.url, args.token) 123 + total_repos += repos 124 + hosts_crawled += len(batch) 125 + print() 126 + 127 + if i < len(batches) - 1 and not stopping: 128 + time.sleep(args.pause) 129 + 130 + elapsed = time.time() - start 131 + print(f"done: {hosts_crawled}/{len(all_hosts)} hosts, {total_repos} repos, {elapsed:.0f}s") 132 + 133 + 134 + if __name__ == "__main__": 135 + main()