atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: harden event stream spec compliance, update docs

- ignore unknown frame types instead of persisting as #identity (spec forward-compat)
- align blocks size limits with lexicon maxLength (2000000, 10000)
- update docs with current memory numbers (~1.2 GiB after CA bundle fix)
- fix websocket.zig link to point at fork
- document shared CA bundle, DID dedupe, migration interleave in design.md

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz bb9593d1 becfa547

+35 -15
+1 -1
README.md
··· 23 23 | dependency | purpose | 24 24 |---|---| 25 25 | [zat](https://tangled.org/zzstoatzz.io/zat) | AT Protocol primitives (CBOR, CAR, signatures, DID resolution) | 26 - | [websocket.zig](https://github.com/nicholasgasior/websocket.zig) | WebSocket client/server | 26 + | [websocket.zig](https://github.com/zzstoatzz/websocket.zig) | WebSocket client/server (fork with HTTP fallback + TCP split fixes) | 27 27 | [pg.zig](https://github.com/karlseguin/pg.zig) | PostgreSQL driver | 28 28 | [rocksdb-zig](https://github.com/Syndica/rocksdb-zig) | RocksDB bindings | 29 29
+6 -3
docs/deployment.md
··· 73 73 74 74 ## memory tuning 75 75 76 - two changes brought steady-state memory from ~6.6 GiB down to ~2.9 GiB at 2,738 connected hosts: 76 + three changes brought steady-state memory from ~6.6 GiB down to ~1.2 GiB at ~2,750 connected hosts: 77 + 78 + **shared TLS CA bundle.** the biggest single win. websocket.zig's TLS client calls `Bundle.rescan()` per connection, loading the system CA certificates into a per-connection arena. with ~2,750 PDS connections, that's ~2,750 copies of the CA bundle in memory (~800 KB each = ~2.2 GiB). fix: load the bundle once in the slurper, pass it to all subscribers via `config.ca_bundle`. memory dropped from ~3.3 GiB to ~1.2 GiB (~65% reduction). 77 79 78 80 **thread stack sizes.** zig's default thread stack is 16 MB. with ~2,750 subscriber threads that maps 44 GB of virtual memory. most threads just read websockets and decode CBOR — 2 MB is generous. all `Thread.spawn` calls now pass `.{ .stack_size = 2 * 1024 * 1024 }`. the constant is defined in `main.zig` as `default_stack_size` for the threads spawned there; other modules use the literal directly. 79 81 ··· 83 85 84 86 | metric | value | 85 87 |--------|-------| 86 - | memory | ~2.9 GiB steady state (~2,750 hosts) | 88 + | memory | ~1.2 GiB steady state (~2,750 hosts) | 87 89 | CPU | ~1.5 cores peak | 88 - | limits | 8 GiB memory, 250m CPU request | 90 + | requests | 1 GiB memory, 1000m CPU | 91 + | limits | 3 GiB memory | 89 92 | PVC | 20 GiB (events + RocksDB collection index) | 90 93 | postgres | ~238 MiB | 91 94
+16 -6
docs/design.md
··· 34 34 additionally: 35 35 - **collection index** (RocksDB): subscriber calls `trackCommitOps` on each 36 36 validated commit; stores `(collection, did)` pairs for `listReposByCollection` 37 - - **event log**: append-only files rotated every 10K events, 72h retention. 38 - supports cursor replay — disk first, then in-memory ring buffer (50K frames) 37 + - **event log**: append-only files rotated every 10K events, configurable 38 + retention (default: 72h, env: `RELAY_RETENTION_HOURS`). supports cursor 39 + replay — disk first, then in-memory ring buffer (50K frames) 39 40 - **slurper**: orchestrates subscribers. bootstraps host list from seed relay's 40 41 `listHosts` API, spawns/stops workers, processes `requestCrawl` requests 41 42 ··· 68 69 is a debug allocator that never returns freed pages — unsuitable for 69 70 long-running servers. 70 71 72 + **shared TLS CA bundle**: loaded once by the slurper, passed to all ~2,750 73 + subscriber connections via `config.ca_bundle`. without this, each websocket.zig 74 + TLS client calls `Bundle.rescan()` and loads its own copy (~800 KB each), 75 + totaling ~2.2 GiB of duplicate CA certificates in memory. 76 + 71 77 **arena per frame**: each subscriber creates a `std.heap.ArenaAllocator` per 72 78 WebSocket message. all CBOR decode temporaries, CAR parse buffers, and MST 73 79 nodes live in this arena. freed in bulk after the frame is processed. this ··· 80 86 **validator cache**: `StringHashMap(CachedKey)` — DID string → 75-byte 81 87 fixed-size struct (key type + 33-byte compressed pubkey + resolve timestamp). 82 88 capped at 500K entries (env: `VALIDATOR_CACHE_SIZE`), LRU-ish eviction of 83 - oldest 10% when full. ~37 MB at capacity. 89 + oldest 10% when full. ~37 MB at capacity. the resolve queue uses a 90 + `StringHashMapUnmanaged(void)` as a dedupe set to prevent the same DID from 91 + being queued multiple times. migration checks are interleaved with DID 92 + resolutions (1 per 10) to prevent starvation. 84 93 85 94 **ring buffer**: 50K-entry in-memory frame history for cursor replay when 86 95 disk replay isn't available. entries are `(seq, data)` pairs with data duped ··· 127 136 ## scaling limits 128 137 129 138 current deployment: ~2,780 PDS hosts, running on a 32 GB / 16 CPU node. 130 - steady-state memory: ~3.5 GiB. postgres alongside at ~240 MiB. 139 + steady-state memory: ~1.2 GiB (after shared CA bundle fix). postgres alongside 140 + at ~240 MiB. resource limits: 3 GiB memory, 1 GiB request, 1000m CPU. 131 141 132 142 | component | current (~2,750 PDS) | at 10x (~27,500 PDS) | status | 133 143 |---|---|---|---| 134 144 | thread stacks | ~5.5 GB virtual (2,750 × 2 MB) | ~55 GB virtual | **breaks** — exceeds 32 GB node | 135 145 | pg pool | 5 connections (hardcoded) | 5 connections | **breaks** — saturates under concurrent UID lookups | 136 - | resolver queue | unbounded `ArrayList` | unbounded | **risk** — backlog grows if resolvers can't keep up | 146 + | resolver queue | `ArrayList` + dedupe set | same | **ok** — dedupe prevents unbounded growth from duplicate DIDs | 137 147 | validator cache | 500K entries, ~37 MB | same (capped) | **degrades** — miss rate climbs with more unique DIDs | 138 148 | broadcaster | O(n consumers) under mutex | same | **risk** — lock contention at high consumer count | 139 149 | RocksDB | manageable write rate | ~1.4M writes/sec projected | **needs** compaction tuning | 140 150 | event log | buffered, 100ms flush | fine — sequential I/O | ok | 141 151 | kernel threads | ~2,800 (below 30K default) | ~28,000 (near default max) | **breaks** without `sysctl` tuning | 142 - | RSS | ~3.5 GiB | ~15–20 GiB projected (malloc overhead scales sublinearly) | **tight** — needs larger node | 152 + | RSS | ~1.2 GiB | ~5–8 GiB projected (shared CA bundle, malloc overhead scales sublinearly) | ok — fits 32 GB node | 143 153 144 154 ### what breaks first 145 155
+8 -2
src/subscriber.zig
··· 323 323 return; 324 324 } 325 325 326 - // route by frame type 326 + // route by frame type — unknown types are ignored per spec (forward-compat) 327 327 const is_commit = std.mem.eql(u8, frame_type, "#commit"); 328 328 const is_sync = std.mem.eql(u8, frame_type, "#sync"); 329 329 const is_account = std.mem.eql(u8, frame_type, "#account"); 330 + const is_identity = std.mem.eql(u8, frame_type, "#identity"); 331 + 332 + if (!is_commit and !is_sync and !is_account and !is_identity) { 333 + log.debug("host {s}: unknown frame type '{s}', ignoring", .{ sub.options.hostname, frame_type }); 334 + return; 335 + } 330 336 331 337 // extract DID: "repo" for commits, "did" for identity/account 332 338 const did: ?[]const u8 = if (is_commit) ··· 424 430 .sync 425 431 else if (is_account) 426 432 .account 427 - else 433 + else // is_identity (unknown types already filtered above) 428 434 .identity; 429 435 430 436 // persist and get relay-assigned seq, broadcast raw bytes.
+4 -3
src/validator.zig
··· 153 153 }; 154 154 155 155 // #sync CAR should be small (just the signed commit block) 156 - if (blocks.len > 10 * 1024) { 156 + // lexicon maxLength: 10000 157 + if (blocks.len > 10_000) { 157 158 _ = self.stats.failed.fetchAdd(1, .monotonic); 158 159 return .{ .valid = false, .skipped = false }; 159 160 } ··· 246 247 // extract blocks (raw CAR bytes) from the pre-decoded payload 247 248 const blocks = payload.getBytes("blocks") orelse return error.InvalidFrame; 248 249 249 - // blocks size check 250 - if (blocks.len > 2 * 1024 * 1024) return error.InvalidFrame; 250 + // blocks size check — lexicon maxLength: 2000000 251 + if (blocks.len > 2_000_000) return error.InvalidFrame; 251 252 252 253 // build public key for verification 253 254 const public_key = zat.multicodec.PublicKey{