atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 230 lines 11 kB view raw view rendered
1# zlay — system design 2 3an AT Protocol relay that crawls PDS instances directly, validates commit 4signatures, and rebroadcasts to downstream consumers over WebSocket. 5 6## data flow 7 8``` 9PDS instances (N hosts) 10 11 12Subscriber (one reader thread per host) 13 │ header decode, cursor tracking, rate limiting 14 │ submits raw frame to processing pool 15 16Frame Pool (configurable workers, default 16) 17 │ full CBOR decode, DID → UID resolution via postgres 18 │ ▼ 19 │ Validator 20 │ │ cache lookup: DID → signing key (secp256k1 / p256) 21 │ │ cache hit → verify commit signature (zat SDK) 22 │ │ cache miss → skip, queue background resolution 23 │ ▼ 24 │ DiskPersist 25 │ │ append to event log (28-byte LE header + CBOR payload) 26 │ │ assign relay sequence number (monotonic, relay-scoped) 27 │ │ write postgres metadata (account state, host cursor) 28 │ ▼ 29 │ Broadcaster 30 │ resequence frame with relay seq 31 │ fan out to all connected consumers via SharedFrame (ref-counted) 32 │ per-consumer ring buffer (8,192 frames) + write thread 33 34Downstream consumers (WebSocket) 35``` 36 37additionally: 38- **collection index** (RocksDB): subscriber calls `trackCommitOps` on each 39 validated commit; stores `(collection, did)` pairs for `listReposByCollection` 40- **event log**: append-only files rotated every 10K events, configurable 41 retention (default: 72h, env: `RELAY_RETENTION_HOURS`). supports cursor 42 replay — disk first, then in-memory ring buffer (50K frames) 43- **slurper**: orchestrates subscribers. bootstraps host list from seed relay's 44 `listHosts` API, spawns/stops workers, processes `requestCrawl` requests 45 46## threading model 47 48| thread type | count at ~2,750 PDS | stack size | responsibility | 49|---|---|---|---| 50| subscriber readers | ~2,750 | 8 MB | one per host — lightweight WebSocket read loop, header decode, cursor tracking, rate limiting, submit to pool | 51| frame pool workers | 16 (env: `FRAME_WORKERS`) | 8 MB | CBOR decode, validation, DB persist, broadcast | 52| resolver threads | 4–8 (env: `RESOLVER_THREADS`) | 8 MB | DID document resolution, signing key extraction, cache population | 53| consumer write threads | 1 per downstream consumer | 8 MB | drain ring buffer → WebSocket write, ping/pong keepalive | 54| flush thread | 1 | 8 MB | batched fsync of event log (100ms or 400 events) | 55| GC thread | 1 | 8 MB | event log file cleanup every 10 minutes | 56| crawl queue thread | 1 | 8 MB | process `requestCrawl` — validate hostname, describeServer, spawn worker | 57| metrics server | 1 | 8 MB | HTTP on internal port, prometheus scrape | 58| main thread | 1 | default | signal handling, shutdown coordination | 59 60total: ~2,770 + consumers. subscriber reader threads run blocking WebSocket 61read loops — lightweight, no async runtime, no event loop. each reader does 62minimal work per frame (header decode, cursor update, rate limit check) and 63spends most time blocked in `recv()`. heavy processing (full CBOR decode, CAR 64parse, ECDSA verify, DB persist, broadcast) runs on pool workers. the header 65is intentionally decoded twice — once by the reader for routing, once by the 66pool worker for full processing. this double decode costs ~1–2μs per frame, 67far cheaper than serializing parsed state across threads. 68 69the 8 MB stack size (vs zig's 16 MB default) supports ReleaseSafe's TLS 70handshake path (~134 KiB peak stack from `tls.Client.init` + `KeyShare.init`). 71only touched pages count as RSS — reader threads use ~0.45 MiB RSS each since 72the deepest paths (crypto, CBOR, DB) now run on pool workers. 73 74## memory model 75 76**allocator**: `std.heap.c_allocator` (libc malloc). glibc has per-thread 77arenas and `madvise`-based page return. the general-purpose allocator (GPA) 78is a debug allocator that never returns freed pages — unsuitable for 79long-running servers. 80 81**shared TLS CA bundle**: loaded once by the slurper, passed to all ~2,750 82subscriber connections via `config.ca_bundle`. without this, each websocket.zig 83TLS client calls `Bundle.rescan()` and loads its own copy (~800 KB each), 84totaling ~2.2 GiB of duplicate CA certificates in memory. 85 86**arena per frame**: each subscriber creates a `std.heap.ArenaAllocator` per 87WebSocket message. all CBOR decode temporaries, CAR parse buffers, and MST 88nodes live in this arena. freed in bulk after the frame is processed. this 89prevents fragmentation from per-field allocations. 90 91**shared frames**: the broadcaster creates one `SharedFrame` per broadcast. 92consumers acquire references; the frame is freed when the last consumer 93releases. this avoids copying frame bytes per consumer. 94 95**validator cache**: `StringHashMap(CachedKey)` — DID string → 75-byte 96fixed-size struct (key type + 33-byte compressed pubkey + resolve timestamp). 97capped at 250K entries (env: `VALIDATOR_CACHE_SIZE`), LRU-ish eviction of 98oldest 10% when full. ~37 MB at capacity. the resolve queue uses a 99`StringHashMapUnmanaged(void)` as a dedupe set to prevent the same DID from 100being queued multiple times. migration checks are interleaved with DID 101resolutions (1 per 10) to prevent starvation. 102 103**ring buffer**: 50K-entry in-memory frame history for cursor replay when 104disk replay isn't available. entries are `(seq, data)` pairs with data duped 105from broadcast. 106 107## persistence 108 109### event log (append-only files) 110 111format matches indigo's `diskpersist`: 112``` 113[4B flags LE] [4B kind LE] [4B payload_len LE] [8B uid LE] [8B seq LE] [payload] 114``` 115 116files named `evts-{startSeq}`, rotated every 10K events. buffered writes 117flushed every 100ms or 400 events (whichever comes first). GC deletes files 118older than `RELAY_RETENTION_HOURS` (default: 72h). 119 120cursor replay: `playback(cursor)` binary-searches log files for the starting 121seq, then streams entries forward. the broadcaster tries disk first, falls 122back to in-memory ring buffer. 123 124### postgres 125 126tables: 127- `account` — uid, did, status, upstream_status, host_id 128- `account_repo` — uid, rev, commit_data_cid (latest repo state) 129- `host` — id, hostname, status, last_seq, failed_attempts 130- `log_file_refs` — seq→file mapping for cursor binary search 131- `domain_ban` — banned domain suffixes 132- `backfill_progress` — collection backfill cursor tracking 133 134connection pool: 5 connections (hardcoded in pg.zig pool init). 135 136### RocksDB (collection index) 137 138two column families: 139- `rbc`: `<collection>\0<did>``()` — prefix scan by collection 140- `cbr`: `<did>\0<collection>``()` — per-repo deletion 141 142populated live from firehose commits. backfill from source relay's 143`listReposByCollection` for historical data. 144 145## scaling limits 146 147current deployment: ~2,780 PDS hosts, running on a 32 GB / 16 CPU node. 148steady-state memory: ~1.1 GiB (ReleaseSafe, with frame pool). postgres alongside 149at ~240 MiB. resource limits: 8 GiB memory, 1 GiB request, 1000m CPU. 150 151| component | current (~2,750 PDS) | at 10x (~27,500 PDS) | status | 152|---|---|---|---| 153| thread stacks | ~22 GB virtual (2,750 × 8 MB), ~1.2 GiB RSS | ~220 GB virtual | **breaks** — virtual exceeds 32 GB node, but RSS scales sublinearly (~0.45 MiB/thread) | 154| pg pool | 5 connections (hardcoded) | 5 connections | **breaks** — saturates under concurrent UID lookups | 155| resolver queue | `ArrayList` + dedupe set | same | **ok** — dedupe prevents unbounded growth from duplicate DIDs | 156| validator cache | 250K entries, ~19 MB | same (capped) | **degrades** — miss rate climbs with more unique DIDs | 157| broadcaster | O(n consumers) under mutex, contention from 16 pool workers (not ~2,750 threads) | same | **improved** — contention reduced from ~2,750 threads to N workers | 158| RocksDB | manageable write rate | ~1.4M writes/sec projected | **needs** compaction tuning | 159| event log | buffered, 100ms flush | fine — sequential I/O | ok | 160| kernel threads | ~2,800 (below 30K default) | ~28,000 (near default max) | **breaks** without `sysctl` tuning | 161| RSS | ~1.1 GiB (ReleaseSafe) | ~4–6 GiB projected | ok — fits 32 GB node | 162 163### what breaks first 164 1651. **thread count**: linux default `kernel.threads-max` is ~30K. at 27,500 166 subscriber threads + resolver + consumer + system threads, we hit the wall. 167 virtual address space for stacks alone is ~55 GB. 168 1692. **postgres pool**: 5 connections shared across ~2,750 subscriber threads 170 works because UID lookups are fast (~0.5ms) and only happen on new DIDs. 171 at 10x, queue contention becomes the bottleneck — every frame touches 172 `uidForDidFromHost`. 173 1743. **validator cache miss rate**: 250K cache with 60M+ DIDs means ~99% miss 175 rate for cold starts. resolver threads (4–8) can't keep up with the 176 resolution queue at 10x ingest rate. 177 178## migration path 179 180### near-term (no architecture change) 181- expose `PG_POOL_SIZE` env var, increase from 5 to 20–50 182- increase `VALIDATOR_CACHE_SIZE` from 250K to 2M+ (costs ~150 MB) 183- tune `RESOLVER_THREADS` to 16–32 for higher resolution throughput 184- `sysctl kernel.threads-max=65536` on deploy node 185 186### mid-term (thread pool — **done**, commit f0c7baf) 187- ~~replace one-thread-per-host with a thread pool of N workers~~ — **done**: reader threads (one per PDS) submit raw frames to a shared processing pool (default 16 workers). heavy work (CBOR decode, validation, persist, broadcast) runs on pool workers. 188- thread count is still O(hosts) for readers, but reader threads are lightweight — no crypto, no DB, no broadcast contention. 189- remaining: IO multiplexing to reduce reader thread count from O(hosts) to O(cores) 190 191### long-term (async I/O) 192- zig 0.16 introduces `Io` (io_uring on linux, kqueue on darwin) 193- single-threaded event loop with coroutines for all I/O 194- eliminates thread overhead entirely, scales to 100K+ hosts per process 195- requires rewriting subscriber, resolver, and consumer write paths 196- pg.zig and websocket.zig would need async-compatible forks 197 198## deliberate divergences from indigo 199 200documented policy choices where zlay intentionally differs from the Go relay 201(bluesky-social/indigo). these are not bugs — each reflects a tradeoff 202appropriate for zlay's architecture. 203 204### per-PDS concurrency model 205 206indigo uses goroutines (M:N scheduling on a small thread pool). zlay uses one 207OS thread per host — simple, no async runtime, no event loop. each thread 208spends most time blocked in `recv()` with minimal per-frame CPU work. 209 210observability: prometheus metrics expose thread count, RSS, per-host memory. 211the 0.16 `Io` migration (io_uring/kqueue) is the planned optimization path, 212replacing OS threads with coroutines. 213 214### skip-on-miss validation 215 216when the validator has no cached signing key for a DID (cache miss or pending 217new-account verification), zlay broadcasts the frame while the key resolves 218in the background. indigo blocks on DID resolution before forwarding. 219 220zlay trades a brief trust window for throughput. the window is bounded: 221- new accounts trigger async DID doc verification; on mismatch → rejected 222- signature failures trigger key eviction + re-resolution (sync spec guidance) 223- next commit from the same DID hits the refreshed cache 224 225### consumer buffer sizing 226 227zlay uses an 8K-entry per-consumer ring buffer (vs indigo's 16K-entry channel). 228can be tuned independently based on observed `ConsumerTooSlow` disconnect rate. 229the ring buffer is lock-free (atomic read/write indices), so the bottleneck is 230consumer write throughput, not buffer contention.