declarative relay deployment on hetzner relay-eval.waow.tech
atproto relay
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

move scripts to scripts/, add jetstream consumer

- git mv firehose -> scripts/firehose
- new scripts/jetstream: websockets-based JSON consumer for jetstream
- update justfile paths, add `just jetstream` recipe
- README: document jetstream alongside relay, update paths and examples

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz 3ed4fc10 cd5ea664

+356 -158
+43 -10
README.md
··· 1 1 # relay.waow.tech 2 2 3 - a full-network [ATProto](https://atproto.com) relay running on a single Hetzner Cloud node with k3s. 3 + a full-network [ATProto](https://atproto.com) relay running on a single Hetzner Cloud node with k3s. a [jetstream](https://github.com/bluesky-social/jetstream) instance runs alongside it, re-encoding the relay's CBOR firehose into plain JSON over websockets — easier to consume if you don't need the full atproto SDK. 4 4 5 - **relay endpoint:** `wss://relay.waow.tech` 5 + **relay endpoint:** `wss://relay.waow.tech` — raw CBOR firehose ([`com.atproto.sync.subscribeRepos`](https://docs.bsky.app/docs/advanced-guides/firehose)) 6 + 7 + **jetstream endpoint:** `wss://jetstream.waow.tech/subscribe` — same data, JSON over websockets 6 8 7 9 **health check:** [`https://relay.waow.tech/xrpc/_health`](https://relay.waow.tech/xrpc/_health) 8 10 ··· 10 12 11 13 ## try it 12 14 13 - the `firehose` script consumes events from the relay using the [atproto](https://github.com/MarshalX/atproto) python SDK. it's a self-contained [uv script](https://docs.astral.sh/uv/guides/scripts/) — no virtualenv or install needed. 15 + both scripts are self-contained [uv scripts](https://docs.astral.sh/uv/guides/scripts/) — no virtualenv or install needed. 16 + 17 + ### firehose 18 + 19 + consumes the raw CBOR firehose using the [atproto](https://github.com/MarshalX/atproto) python SDK. 14 20 15 21 ```bash 16 22 # watch posts scroll by for 10 seconds 17 - ./firehose 23 + ./scripts/firehose 18 24 19 25 # run longer, filter by collection 20 - ./firehose --duration 30 21 - ./firehose --collection app.bsky.feed.like 22 - ./firehose --duration 0 # forever (ctrl-c to stop) 26 + ./scripts/firehose --duration 30 27 + ./scripts/firehose --collection app.bsky.feed.like 28 + ./scripts/firehose --duration 0 # forever (ctrl-c to stop) 23 29 24 30 # point at a different relay 25 - ./firehose --relay-url wss://bsky.network 31 + ./scripts/firehose --relay-url wss://bsky.network 32 + ``` 33 + 34 + ### jetstream 35 + 36 + consumes the simplified JSON firehose via [jetstream](https://github.com/bluesky-social/jetstream) — no atproto SDK needed, just plain websockets. 37 + 38 + ```bash 39 + # watch all events for 10 seconds 40 + ./scripts/jetstream 41 + 42 + # filter to specific collections 43 + ./scripts/jetstream --collection app.bsky.feed.post 44 + ./scripts/jetstream --collection app.bsky.feed.like --collection app.bsky.graph.follow 45 + 46 + # run longer, or forever 47 + ./scripts/jetstream --duration 30 48 + ./scripts/jetstream --duration 0 # forever (ctrl-c to stop) 49 + 50 + # point at a different jetstream instance 51 + ./scripts/jetstream --url wss://jetstream1.us-east.bsky.network 26 52 ``` 27 53 28 54 ## what's here 29 55 30 56 ``` 31 57 . 32 - ├── firehose # uv script — firehose consumer 58 + ├── scripts/ 59 + │ ├── firehose # uv script — raw CBOR firehose consumer 60 + │ └── jetstream # uv script — JSON jetstream consumer 33 61 ├── justfile # all commands: deploy, status, logs, etc. 34 62 ├── infra/ # terraform — hetzner server + k3s 35 63 │ ├── main.tf ··· 38 66 │ └── outputs.tf 39 67 └── deploy/ # helm values + k8s manifests 40 68 ├── relay-values.yaml 69 + ├── jetstream-values.yaml 41 70 ├── postgres-values.yaml 42 71 ├── monitoring-values.yaml 43 72 ├── relay-dashboard.json 44 73 ├── relay-servicemonitor.yaml 74 + ├── jetstream-servicemonitor.yaml 45 75 ├── ingress.yaml 76 + ├── jetstream-ingress.yaml 46 77 ├── grafana-ingress.yaml 47 78 └── cluster-issuer.yaml 48 79 ``` ··· 109 140 just status # nodes, pods, health check 110 141 just logs # tail relay logs 111 142 just health # curl the public health endpoint 112 - just firehose # consume the firehose (passes args through to ./firehose) 143 + just firehose # consume the firehose (passes args through) 144 + just jetstream # consume the jetstream (passes args through) 113 145 just ssh # ssh into the server 114 146 just destroy # tear down everything 115 147 ``` ··· 129 161 ### workloads 130 162 131 163 - **relay** — [`ghcr.io/bluesky-social/indigo`](https://github.com/bluesky-social/indigo/pkgs/container/indigo) (tagged per-commit, e.g. `relay-bf41e2ee...`), deployed via [bjw-s/app-template](https://github.com/bjw-s-labs/helm-charts) helm chart with `hostNetwork: true` for lower-overhead networking 164 + - **jetstream** — [`ghcr.io/bluesky-social/jetstream`](https://github.com/bluesky-social/jetstream) subscribes to the relay's firehose over localhost (`ws://relay:2470`) and re-serves it as JSON WebSocket events at [`jetstream.waow.tech/subscribe`](https://jetstream.waow.tech/subscribe). lightweight alternative for consumers that don't need CBOR/CAR decoding 132 165 - **postgresql** — relay's backing database, deployed via [bitnami/postgresql](https://github.com/bitnami/charts/tree/main/bitnami/postgresql) helm chart 133 166 - **prometheus + grafana** — metrics collection and dashboards via [kube-prometheus-stack](https://github.com/prometheus-community/helm-charts/tree/main/charts/kube-prometheus-stack), public read-only access at [`relay-metrics.waow.tech`](https://relay-metrics.waow.tech) 134 167
-145
firehose
··· 1 - #!/usr/bin/env -S uv run --script --quiet 2 - # /// script 3 - # requires-python = ">=3.12" 4 - # dependencies = ["atproto"] 5 - # /// 6 - """ 7 - consume the firehose from an atproto relay and print events. 8 - 9 - usage: 10 - ./firehose 11 - ./firehose --duration 30 12 - ./firehose --relay-url wss://bsky.network 13 - ./firehose --collection app.bsky.feed.like 14 - """ 15 - 16 - import argparse 17 - import signal 18 - import time 19 - from collections import defaultdict 20 - 21 - from atproto import ( 22 - CAR, 23 - AtUri, 24 - FirehoseSubscribeReposClient, 25 - firehose_models, 26 - models, 27 - parse_subscribe_repos_message, 28 - ) 29 - 30 - 31 - def get_ops_by_type( 32 - commit: models.ComAtprotoSyncSubscribeRepos.Commit, 33 - collections: set[str], 34 - ) -> defaultdict: 35 - ops = defaultdict(lambda: {"created": [], "deleted": []}) 36 - 37 - car = CAR.from_bytes(commit.blocks) 38 - for op in commit.ops: 39 - uri = AtUri.from_str(f"at://{commit.repo}/{op.path}") 40 - 41 - if op.action == "create" and op.cid: 42 - raw = car.blocks.get(op.cid) 43 - if not raw: 44 - continue 45 - 46 - if collections and uri.collection not in collections: 47 - continue 48 - 49 - record = models.get_or_create(raw, strict=False) 50 - ops[uri.collection]["created"].append( 51 - {"record": record, "uri": str(uri), "author": commit.repo} 52 - ) 53 - 54 - elif op.action == "delete": 55 - ops[uri.collection]["deleted"].append({"uri": str(uri)}) 56 - 57 - return ops 58 - 59 - 60 - def main(): 61 - parser = argparse.ArgumentParser(description="consume an atproto relay firehose") 62 - parser.add_argument( 63 - "--relay-url", 64 - default="wss://relay.waow.tech", 65 - help="relay websocket url (default: wss://relay.waow.tech)", 66 - ) 67 - parser.add_argument( 68 - "--duration", 69 - type=int, 70 - default=10, 71 - help="seconds to consume (default: 10, 0 = forever)", 72 - ) 73 - parser.add_argument( 74 - "--collection", 75 - action="append", 76 - default=None, 77 - help="filter by collection (default: app.bsky.feed.post). repeatable.", 78 - ) 79 - args = parser.parse_args() 80 - 81 - collections = set(args.collection) if args.collection else {"app.bsky.feed.post"} 82 - deadline = time.time() + args.duration if args.duration > 0 else float("inf") 83 - 84 - counts: dict[str, int] = defaultdict(int) 85 - total = 0 86 - 87 - base_uri = args.relay_url.rstrip("/") + "/xrpc" 88 - client = FirehoseSubscribeReposClient(base_uri=base_uri) 89 - 90 - def stop(*_): 91 - client.stop() 92 - 93 - signal.signal(signal.SIGINT, stop) 94 - 95 - def on_message(message: firehose_models.MessageFrame) -> None: 96 - nonlocal total 97 - 98 - if time.time() >= deadline: 99 - client.stop() 100 - return 101 - 102 - commit = parse_subscribe_repos_message(message) 103 - if not isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Commit): 104 - return 105 - 106 - if not commit.blocks: 107 - return 108 - 109 - ops = get_ops_by_type(commit, collections) 110 - 111 - for collection, actions in ops.items(): 112 - for item in actions["created"]: 113 - record = item["record"] 114 - author = item["author"] 115 - counts[collection] += 1 116 - total += 1 117 - 118 - if collection == "app.bsky.feed.post": 119 - text = getattr(record, "text", "") 120 - inline = text.replace("\n", " ")[:120] 121 - print(f"[{author}] {inline}") 122 - elif collection == "app.bsky.feed.like": 123 - subject = getattr(record, "subject", None) 124 - uri = getattr(subject, "uri", "?") if subject else "?" 125 - print(f"[{author}] liked {uri}") 126 - elif collection == "app.bsky.graph.follow": 127 - subject = getattr(record, "subject", "?") 128 - print(f"[{author}] followed {subject}") 129 - else: 130 - print(f"[{author}] {collection}") 131 - 132 - print(f"consuming from {args.relay_url} for {args.duration}s...") 133 - print(f"filtering: {', '.join(collections)}") 134 - print() 135 - 136 - client.start(on_message) 137 - 138 - print() 139 - print(f"--- {total} events in {args.duration}s ---") 140 - for col, count in sorted(counts.items()): 141 - print(f" {col}: {count}") 142 - 143 - 144 - if __name__ == "__main__": 145 - main()
+21 -3
justfile
··· 1 1 # ATProto relay deployment 2 2 # required env vars: HCLOUD_TOKEN, RELAY_DOMAIN, RELAY_ADMIN_PASSWORD, POSTGRES_PASSWORD, LETSENCRYPT_EMAIL 3 - # optional env vars: GRAFANA_DOMAIN (default: relay-metrics.waow.tech), GRAFANA_ADMIN_PASSWORD 3 + # optional env vars: GRAFANA_DOMAIN (default: relay-metrics.waow.tech), GRAFANA_ADMIN_PASSWORD, JETSTREAM_DOMAIN (default: jetstream.waow.tech) 4 4 5 5 export KUBECONFIG := justfile_directory() / "kubeconfig.yaml" 6 6 ··· 132 132 sed "s|GRAFANA_DOMAIN_PLACEHOLDER|$GRAFANA_DOMAIN|g" deploy/grafana-ingress.yaml \ 133 133 | kubectl apply -f - 134 134 135 + echo "==> installing jetstream" 136 + JETSTREAM_DOMAIN="${JETSTREAM_DOMAIN:-jetstream.waow.tech}" 137 + helm upgrade --install jetstream bjw-s/app-template \ 138 + --namespace relay \ 139 + --values deploy/jetstream-values.yaml \ 140 + --wait --timeout 5m 141 + 142 + echo "==> applying jetstream ingress" 143 + sed "s|JETSTREAM_DOMAIN_PLACEHOLDER|$JETSTREAM_DOMAIN|g" deploy/jetstream-ingress.yaml \ 144 + | kubectl apply -f - 145 + kubectl apply -f deploy/jetstream-servicemonitor.yaml 146 + 135 147 echo "" 136 148 echo "done. point DNS:" 137 149 echo " $RELAY_DOMAIN -> $(just server-ip)" 138 150 echo " $GRAFANA_DOMAIN -> $(just server-ip)" 151 + echo " $JETSTREAM_DOMAIN -> $(just server-ip)" 139 152 echo "then check:" 140 153 echo " curl https://$RELAY_DOMAIN/xrpc/_health" 141 154 echo " curl https://$GRAFANA_DOMAIN" 155 + echo " curl https://$JETSTREAM_DOMAIN" 142 156 143 157 # seed the relay with hosts from the network (includes restart so slurper picks them up) 144 158 bootstrap: ··· 177 191 grafana-password: 178 192 @kubectl get secret -n monitoring kube-prometheus-stack-grafana -o jsonpath="{.data.admin-password}" | base64 -d && echo 179 193 180 - # --- firehose --- 194 + # --- scripts --- 181 195 182 196 # consume the firehose (default: 10s of bsky posts) 183 197 firehose *args: 184 - ./firehose {{ args }} 198 + ./scripts/firehose {{ args }} 199 + 200 + # consume the jetstream (default: 10s of all events) 201 + jetstream *args: 202 + ./scripts/jetstream {{ args }}
+145
scripts/firehose
··· 1 + #!/usr/bin/env -S uv run --script --quiet 2 + # /// script 3 + # requires-python = ">=3.12" 4 + # dependencies = ["atproto"] 5 + # /// 6 + """ 7 + consume the firehose from an atproto relay and print events. 8 + 9 + usage: 10 + ./scripts/firehose 11 + ./scripts/firehose --duration 30 12 + ./scripts/firehose --relay-url wss://bsky.network 13 + ./scripts/firehose --collection app.bsky.feed.like 14 + """ 15 + 16 + import argparse 17 + import signal 18 + import time 19 + from collections import defaultdict 20 + 21 + from atproto import ( 22 + CAR, 23 + AtUri, 24 + FirehoseSubscribeReposClient, 25 + firehose_models, 26 + models, 27 + parse_subscribe_repos_message, 28 + ) 29 + 30 + 31 + def get_ops_by_type( 32 + commit: models.ComAtprotoSyncSubscribeRepos.Commit, 33 + collections: set[str], 34 + ) -> defaultdict: 35 + ops = defaultdict(lambda: {"created": [], "deleted": []}) 36 + 37 + car = CAR.from_bytes(commit.blocks) 38 + for op in commit.ops: 39 + uri = AtUri.from_str(f"at://{commit.repo}/{op.path}") 40 + 41 + if op.action == "create" and op.cid: 42 + raw = car.blocks.get(op.cid) 43 + if not raw: 44 + continue 45 + 46 + if collections and uri.collection not in collections: 47 + continue 48 + 49 + record = models.get_or_create(raw, strict=False) 50 + ops[uri.collection]["created"].append( 51 + {"record": record, "uri": str(uri), "author": commit.repo} 52 + ) 53 + 54 + elif op.action == "delete": 55 + ops[uri.collection]["deleted"].append({"uri": str(uri)}) 56 + 57 + return ops 58 + 59 + 60 + def main(): 61 + parser = argparse.ArgumentParser(description="consume an atproto relay firehose") 62 + parser.add_argument( 63 + "--relay-url", 64 + default="wss://relay.waow.tech", 65 + help="relay websocket url (default: wss://relay.waow.tech)", 66 + ) 67 + parser.add_argument( 68 + "--duration", 69 + type=int, 70 + default=10, 71 + help="seconds to consume (default: 10, 0 = forever)", 72 + ) 73 + parser.add_argument( 74 + "--collection", 75 + action="append", 76 + default=None, 77 + help="filter by collection (default: app.bsky.feed.post). repeatable.", 78 + ) 79 + args = parser.parse_args() 80 + 81 + collections = set(args.collection) if args.collection else {"app.bsky.feed.post"} 82 + deadline = time.time() + args.duration if args.duration > 0 else float("inf") 83 + 84 + counts: dict[str, int] = defaultdict(int) 85 + total = 0 86 + 87 + base_uri = args.relay_url.rstrip("/") + "/xrpc" 88 + client = FirehoseSubscribeReposClient(base_uri=base_uri) 89 + 90 + def stop(*_): 91 + client.stop() 92 + 93 + signal.signal(signal.SIGINT, stop) 94 + 95 + def on_message(message: firehose_models.MessageFrame) -> None: 96 + nonlocal total 97 + 98 + if time.time() >= deadline: 99 + client.stop() 100 + return 101 + 102 + commit = parse_subscribe_repos_message(message) 103 + if not isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Commit): 104 + return 105 + 106 + if not commit.blocks: 107 + return 108 + 109 + ops = get_ops_by_type(commit, collections) 110 + 111 + for collection, actions in ops.items(): 112 + for item in actions["created"]: 113 + record = item["record"] 114 + author = item["author"] 115 + counts[collection] += 1 116 + total += 1 117 + 118 + if collection == "app.bsky.feed.post": 119 + text = getattr(record, "text", "") 120 + inline = text.replace("\n", " ")[:120] 121 + print(f"[{author}] {inline}") 122 + elif collection == "app.bsky.feed.like": 123 + subject = getattr(record, "subject", None) 124 + uri = getattr(subject, "uri", "?") if subject else "?" 125 + print(f"[{author}] liked {uri}") 126 + elif collection == "app.bsky.graph.follow": 127 + subject = getattr(record, "subject", "?") 128 + print(f"[{author}] followed {subject}") 129 + else: 130 + print(f"[{author}] {collection}") 131 + 132 + print(f"consuming from {args.relay_url} for {args.duration}s...") 133 + print(f"filtering: {', '.join(collections)}") 134 + print() 135 + 136 + client.start(on_message) 137 + 138 + print() 139 + print(f"--- {total} events in {args.duration}s ---") 140 + for col, count in sorted(counts.items()): 141 + print(f" {col}: {count}") 142 + 143 + 144 + if __name__ == "__main__": 145 + main()
+147
scripts/jetstream
··· 1 + #!/usr/bin/env -S uv run --script --quiet 2 + # /// script 3 + # requires-python = ">=3.12" 4 + # dependencies = ["websockets"] 5 + # /// 6 + """ 7 + consume events from an atproto jetstream endpoint and print them. 8 + 9 + jetstream re-encodes the relay's CBOR firehose as plain JSON over websockets, 10 + so no atproto SDK is needed. 11 + 12 + usage: 13 + ./scripts/jetstream 14 + ./scripts/jetstream --duration 30 15 + ./scripts/jetstream --url wss://jetstream1.us-east.bsky.network 16 + ./scripts/jetstream --collection app.bsky.feed.like 17 + ./scripts/jetstream --collection app.bsky.feed.post --collection app.bsky.feed.like 18 + """ 19 + 20 + import argparse 21 + import json 22 + import signal 23 + import time 24 + from collections import defaultdict 25 + from urllib.parse import urlencode 26 + 27 + import websockets.sync.client as ws 28 + 29 + 30 + def format_event(event: dict) -> str | None: 31 + """format a jetstream event for display. returns None to skip.""" 32 + kind = event.get("kind") 33 + did = event.get("did", "?") 34 + 35 + if kind == "commit": 36 + commit = event.get("commit", {}) 37 + collection = commit.get("collection", "") 38 + operation = commit.get("operation", "") 39 + record = commit.get("record", {}) 40 + 41 + if operation == "delete": 42 + return None 43 + 44 + if collection == "app.bsky.feed.post": 45 + text = record.get("text", "") 46 + inline = text.replace("\n", " ")[:120] 47 + return f"[{did}] {inline}" 48 + elif collection == "app.bsky.feed.like": 49 + uri = record.get("subject", {}).get("uri", "?") 50 + return f"[{did}] liked {uri}" 51 + elif collection == "app.bsky.graph.follow": 52 + subject = record.get("subject", "?") 53 + return f"[{did}] followed {subject}" 54 + else: 55 + return f"[{did}] {collection}#{operation}" 56 + 57 + elif kind == "identity": 58 + handle = event.get("identity", {}).get("handle", "?") 59 + return f"[{did}] identity -> {handle}" 60 + 61 + return None 62 + 63 + 64 + def main(): 65 + parser = argparse.ArgumentParser(description="consume an atproto jetstream") 66 + parser.add_argument( 67 + "--url", 68 + default="wss://jetstream.waow.tech", 69 + help="jetstream base url (default: wss://jetstream.waow.tech)", 70 + ) 71 + parser.add_argument( 72 + "--duration", 73 + type=int, 74 + default=10, 75 + help="seconds to consume (default: 10, 0 = forever)", 76 + ) 77 + parser.add_argument( 78 + "--collection", 79 + action="append", 80 + default=None, 81 + help="filter by collection (repeatable). omit for all events.", 82 + ) 83 + args = parser.parse_args() 84 + 85 + # build subscribe url with wantedCollections query params 86 + params = {} 87 + if args.collection: 88 + params["wantedCollections"] = args.collection 89 + query = urlencode(params, doseq=True) 90 + url = f"{args.url.rstrip('/')}/subscribe" 91 + if query: 92 + url = f"{url}?{query}" 93 + 94 + deadline = time.time() + args.duration if args.duration > 0 else float("inf") 95 + counts: dict[str, int] = defaultdict(int) 96 + total = 0 97 + stopping = False 98 + 99 + def stop(*_): 100 + nonlocal stopping 101 + stopping = True 102 + 103 + signal.signal(signal.SIGINT, stop) 104 + 105 + collections_desc = ", ".join(args.collection) if args.collection else "all" 106 + duration_desc = f"{args.duration}s" if args.duration > 0 else "forever" 107 + print(f"consuming from {args.url} for {duration_desc}...") 108 + print(f"filtering: {collections_desc}") 109 + print() 110 + 111 + try: 112 + with ws.connect(url) as conn: 113 + while not stopping: 114 + if time.time() >= deadline: 115 + break 116 + 117 + try: 118 + raw = conn.recv(timeout=1.0) 119 + except TimeoutError: 120 + continue 121 + 122 + event = json.loads(raw) 123 + kind = event.get("kind", "unknown") 124 + 125 + collection = "" 126 + if kind == "commit": 127 + collection = event.get("commit", {}).get("collection", "unknown") 128 + counts[collection] += 1 129 + else: 130 + counts[kind] += 1 131 + 132 + total += 1 133 + line = format_event(event) 134 + if line: 135 + print(line) 136 + 137 + except Exception as e: 138 + print(f"\nerror: {e}") 139 + 140 + print() 141 + print(f"--- {total} events ---") 142 + for col, count in sorted(counts.items()): 143 + print(f" {col}: {count}") 144 + 145 + 146 + if __name__ == "__main__": 147 + main()