atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: extract HTTP API from main.zig, add design doc

main.zig was 1,174 lines doing env parsing, component init, signal
handling, and the entire HTTP API. now main.zig is 265 lines (entry
point + wiring) and api.zig has all handlers, response helpers, and
query string parsing.

adds docs/design.md covering data flow, threading model, memory model,
persistence, and an honest scaling limits table for 10x growth.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz f6434057 c1f99c2c

+1109 -912
+1
build.zig
··· 51 51 // tests 52 52 const test_step = b.step("test", "run unit tests"); 53 53 const test_files = .{ 54 + "src/api.zig", 54 55 "src/broadcaster.zig", 55 56 "src/validator.zig", 56 57 "src/subscriber.zig",
+179
docs/design.md
··· 1 + # zlay — system design 2 + 3 + an AT Protocol relay that crawls PDS instances directly, validates commit 4 + signatures, and rebroadcasts to downstream consumers over WebSocket. 5 + 6 + ## data flow 7 + 8 + ``` 9 + PDS instances (N hosts) 10 + 11 + 12 + Subscriber (one OS thread per host) 13 + │ decodes CBOR frames, tracks cursor, rate-limits per host 14 + │ resolves DID → numeric UID via postgres 15 + 16 + Validator 17 + │ cache lookup: DID → signing key (secp256k1 / p256) 18 + │ cache hit → verify commit signature (zat SDK) 19 + │ cache miss → skip, queue background resolution 20 + 21 + DiskPersist 22 + │ append to event log (28-byte LE header + CBOR payload) 23 + │ assign relay sequence number (monotonic, relay-scoped) 24 + │ write postgres metadata (account state, host cursor) 25 + 26 + Broadcaster 27 + │ resequence frame with relay seq 28 + │ fan out to all connected consumers via SharedFrame (ref-counted) 29 + │ per-consumer ring buffer (8,192 frames) + write thread 30 + 31 + Downstream consumers (WebSocket) 32 + ``` 33 + 34 + additionally: 35 + - **collection index** (RocksDB): subscriber calls `trackCommitOps` on each 36 + validated commit; stores `(collection, did)` pairs for `listReposByCollection` 37 + - **event log**: append-only files rotated every 10K events, 72h retention. 38 + supports cursor replay — disk first, then in-memory ring buffer (50K frames) 39 + - **slurper**: orchestrates subscribers. bootstraps host list from seed relay's 40 + `listHosts` API, spawns/stops workers, processes `requestCrawl` requests 41 + 42 + ## threading model 43 + 44 + | thread type | count at ~2,750 PDS | stack size | responsibility | 45 + |---|---|---|---| 46 + | subscriber workers | ~2,750 | 2 MB | one per host — WebSocket read loop, CBOR decode, validation call | 47 + | resolver threads | 4–8 (env: `RESOLVER_THREADS`) | 2 MB | DID document resolution, signing key extraction, cache population | 48 + | consumer write threads | 1 per downstream consumer | 2 MB | drain ring buffer → WebSocket write, ping/pong keepalive | 49 + | flush thread | 1 | 2 MB | batched fsync of event log (100ms or 400 events) | 50 + | GC thread | 1 | 2 MB | event log file cleanup every 10 minutes | 51 + | crawl queue thread | 1 | 2 MB | process `requestCrawl` — validate hostname, describeServer, spawn worker | 52 + | metrics server | 1 | 2 MB | HTTP on internal port, prometheus scrape | 53 + | main thread | 1 | default | signal handling, shutdown coordination | 54 + 55 + total: ~2,760 + consumers. each subscriber thread runs a blocking WebSocket 56 + read loop — simple, no async runtime, no event loop. this works because each 57 + thread does minimal work per frame (CBOR decode + optional signature verify) 58 + and spends most of its time blocked in `recv()`. 59 + 60 + the 2 MB stack size (vs zig's 16 MB default) is the key to fitting ~3K threads 61 + in memory. actual stack usage is far below 2 MB — the deepest path is CBOR 62 + decode → CAR parse → ECDSA verify, which uses ~50 KB of stack at peak. 63 + 64 + ## memory model 65 + 66 + **allocator**: `std.heap.c_allocator` (libc malloc). glibc has per-thread 67 + arenas and `madvise`-based page return. the general-purpose allocator (GPA) 68 + is a debug allocator that never returns freed pages — unsuitable for 69 + long-running servers. 70 + 71 + **arena per frame**: each subscriber creates a `std.heap.ArenaAllocator` per 72 + WebSocket message. all CBOR decode temporaries, CAR parse buffers, and MST 73 + nodes live in this arena. freed in bulk after the frame is processed. this 74 + prevents fragmentation from per-field allocations. 75 + 76 + **shared frames**: the broadcaster creates one `SharedFrame` per broadcast. 77 + consumers acquire references; the frame is freed when the last consumer 78 + releases. this avoids copying frame bytes per consumer. 79 + 80 + **validator cache**: `StringHashMap(CachedKey)` — DID string → 75-byte 81 + fixed-size struct (key type + 33-byte compressed pubkey + resolve timestamp). 82 + capped at 500K entries (env: `VALIDATOR_CACHE_SIZE`), LRU-ish eviction of 83 + oldest 10% when full. ~37 MB at capacity. 84 + 85 + **ring buffer**: 50K-entry in-memory frame history for cursor replay when 86 + disk replay isn't available. entries are `(seq, data)` pairs with data duped 87 + from broadcast. 88 + 89 + ## persistence 90 + 91 + ### event log (append-only files) 92 + 93 + format matches indigo's `diskpersist`: 94 + ``` 95 + [4B flags LE] [4B kind LE] [4B payload_len LE] [8B uid LE] [8B seq LE] [payload] 96 + ``` 97 + 98 + files named `evts-{startSeq}`, rotated every 10K events. buffered writes 99 + flushed every 100ms or 400 events (whichever comes first). GC deletes files 100 + older than `RELAY_RETENTION_HOURS` (default: 72h). 101 + 102 + cursor replay: `playback(cursor)` binary-searches log files for the starting 103 + seq, then streams entries forward. the broadcaster tries disk first, falls 104 + back to in-memory ring buffer. 105 + 106 + ### postgres 107 + 108 + tables: 109 + - `account` — uid, did, status, upstream_status, host_id 110 + - `account_repo` — uid, rev, commit_data_cid (latest repo state) 111 + - `host` — id, hostname, status, last_seq, failed_attempts 112 + - `log_file_refs` — seq→file mapping for cursor binary search 113 + - `domain_ban` — banned domain suffixes 114 + - `backfill_progress` — collection backfill cursor tracking 115 + 116 + connection pool: 5 connections (hardcoded in pg.zig pool init). 117 + 118 + ### RocksDB (collection index) 119 + 120 + two column families: 121 + - `rbc`: `<collection>\0<did>` → `()` — prefix scan by collection 122 + - `cbr`: `<did>\0<collection>` → `()` — per-repo deletion 123 + 124 + populated live from firehose commits. backfill from source relay's 125 + `listReposByCollection` for historical data. 126 + 127 + ## scaling limits 128 + 129 + current deployment: ~2,780 PDS hosts, running on a 32 GB / 16 CPU node. 130 + steady-state memory: ~3.5 GiB. postgres alongside at ~240 MiB. 131 + 132 + | component | current (~2,750 PDS) | at 10x (~27,500 PDS) | status | 133 + |---|---|---|---| 134 + | thread stacks | ~5.5 GB virtual (2,750 × 2 MB) | ~55 GB virtual | **breaks** — exceeds 32 GB node | 135 + | pg pool | 5 connections (hardcoded) | 5 connections | **breaks** — saturates under concurrent UID lookups | 136 + | resolver queue | unbounded `ArrayList` | unbounded | **risk** — backlog grows if resolvers can't keep up | 137 + | validator cache | 500K entries, ~37 MB | same (capped) | **degrades** — miss rate climbs with more unique DIDs | 138 + | broadcaster | O(n consumers) under mutex | same | **risk** — lock contention at high consumer count | 139 + | RocksDB | manageable write rate | ~1.4M writes/sec projected | **needs** compaction tuning | 140 + | event log | buffered, 100ms flush | fine — sequential I/O | ok | 141 + | kernel threads | ~2,800 (below 30K default) | ~28,000 (near default max) | **breaks** without `sysctl` tuning | 142 + | RSS | ~3.5 GiB | ~15–20 GiB projected (malloc overhead scales sublinearly) | **tight** — needs larger node | 143 + 144 + ### what breaks first 145 + 146 + 1. **thread count**: linux default `kernel.threads-max` is ~30K. at 27,500 147 + subscriber threads + resolver + consumer + system threads, we hit the wall. 148 + virtual address space for stacks alone is ~55 GB. 149 + 150 + 2. **postgres pool**: 5 connections shared across ~2,750 subscriber threads 151 + works because UID lookups are fast (~0.5ms) and only happen on new DIDs. 152 + at 10x, queue contention becomes the bottleneck — every frame touches 153 + `uidForDidFromHost`. 154 + 155 + 3. **validator cache miss rate**: 500K cache with 60M+ DIDs means ~99% miss 156 + rate for cold starts. resolver threads (4–8) can't keep up with the 157 + resolution queue at 10x ingest rate. 158 + 159 + ## migration path 160 + 161 + ### near-term (no architecture change) 162 + - expose `PG_POOL_SIZE` env var, increase from 5 to 20–50 163 + - expose `VALIDATOR_CACHE_SIZE`, increase to 2M+ (costs ~150 MB) 164 + - tune `RESOLVER_THREADS` to 16–32 for higher resolution throughput 165 + - `sysctl kernel.threads-max=65536` on deploy node 166 + 167 + ### mid-term (thread pool) 168 + - replace one-thread-per-host with a thread pool of N workers (N = CPU cores × 2) 169 + - each worker runs an epoll/kqueue loop over multiple host connections 170 + - subscriber becomes a state machine: connect → read → decode → validate → persist 171 + - reduces thread count from O(hosts) to O(cores), eliminates the stack memory wall 172 + - websocket.zig would need a non-blocking client mode or replacement 173 + 174 + ### long-term (async I/O) 175 + - zig 0.16 introduces `Io` (io_uring on linux, kqueue on darwin) 176 + - single-threaded event loop with coroutines for all I/O 177 + - eliminates thread overhead entirely, scales to 100K+ hosts per process 178 + - requires rewriting subscriber, resolver, and consumer write paths 179 + - pg.zig and websocket.zig would need async-compatible forks
+926
src/api.zig
··· 1 + //! HTTP API handlers for the relay 2 + //! 3 + //! serves XRPC endpoints, admin endpoints, health/stats, and the root banner 4 + //! via the websocket server's httpFallback mechanism. all handlers write raw 5 + //! HTTP responses to the websocket connection. 6 + 7 + const std = @import("std"); 8 + const http = std.http; 9 + const websocket = @import("websocket"); 10 + const broadcaster = @import("broadcaster.zig"); 11 + const validator_mod = @import("validator.zig"); 12 + const slurper_mod = @import("slurper.zig"); 13 + const event_log_mod = @import("event_log.zig"); 14 + const collection_index_mod = @import("collection_index.zig"); 15 + const backfill_mod = @import("backfill.zig"); 16 + 17 + const log = std.log.scoped(.relay); 18 + 19 + /// context for HTTP fallback handlers (passed as opaque pointer through broadcaster) 20 + pub const HttpContext = struct { 21 + stats: *broadcaster.Stats, 22 + persist: *event_log_mod.DiskPersist, 23 + slurper: *slurper_mod.Slurper, 24 + collection_index: *collection_index_mod.CollectionIndex, 25 + backfiller: *backfill_mod.Backfiller, 26 + bc: *broadcaster.Broadcaster, 27 + validator: *validator_mod.Validator, 28 + }; 29 + 30 + /// top-level HTTP request router — installed as bc.http_fallback 31 + pub fn handleHttpRequest( 32 + conn: *websocket.Conn, 33 + method: []const u8, 34 + url: []const u8, 35 + body: []const u8, 36 + headers: *const websocket.Handshake.KeyValue, 37 + opaque_ctx: ?*anyopaque, 38 + ) void { 39 + const ctx: *HttpContext = @ptrCast(@alignCast(opaque_ctx orelse return)); 40 + 41 + const qmark = std.mem.indexOfScalar(u8, url, '?'); 42 + const path = url[0..(qmark orelse url.len)]; 43 + const query = if (qmark) |q| url[q + 1 ..] else ""; 44 + 45 + if (std.mem.eql(u8, method, "GET")) { 46 + handleGet(conn, path, query, headers, ctx); 47 + } else if (std.mem.eql(u8, method, "POST")) { 48 + handlePost(conn, path, query, body, headers, ctx); 49 + } else { 50 + respondText(conn, .method_not_allowed, "method not allowed"); 51 + } 52 + } 53 + 54 + fn handleGet(conn: *websocket.Conn, path: []const u8, query: []const u8, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { 55 + if (std.mem.eql(u8, path, "/_health") or std.mem.eql(u8, path, "/xrpc/_health")) { 56 + respondJson(conn, .ok, "{\"status\":\"ok\"}"); 57 + } else if (std.mem.eql(u8, path, "/_stats")) { 58 + var stats_buf: [4096]u8 = undefined; 59 + const body = broadcaster.formatStatsResponse(ctx.stats, &stats_buf); 60 + respondJson(conn, .ok, body); 61 + } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listRepos")) { 62 + handleListRepos(conn, query, ctx.persist); 63 + } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.getRepoStatus")) { 64 + handleGetRepoStatus(conn, query, ctx.persist); 65 + } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.getLatestCommit")) { 66 + handleGetLatestCommit(conn, query, ctx.persist); 67 + } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listReposByCollection")) { 68 + handleListReposByCollection(conn, query, ctx.collection_index); 69 + } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listHosts")) { 70 + handleListHosts(conn, query, ctx.persist); 71 + } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.getHostStatus")) { 72 + handleGetHostStatus(conn, query, ctx.persist); 73 + } else if (std.mem.eql(u8, path, "/admin/hosts")) { 74 + handleAdminListHosts(conn, headers, ctx); 75 + } else if (std.mem.eql(u8, path, "/admin/backfill-collections")) { 76 + handleAdminBackfillStatus(conn, headers, ctx); 77 + } else if (std.mem.eql(u8, path, "/")) { 78 + respondText(conn, .ok, 79 + \\ _ 80 + \\ ___| | __ _ _ _ 81 + \\|_ / |/ _` | | | | 82 + \\ / /| | (_| | |_| | 83 + \\/___|_|\__,_|\__, | 84 + \\ |___/ 85 + \\ 86 + \\This is an atproto [https://atproto.com] relay instance, 87 + \\running the zlay codebase [https://tangled.org/zzstoatzz.io/zlay] 88 + \\ 89 + \\The firehose WebSocket path is at: /xrpc/com.atproto.sync.subscribeRepos 90 + \\ 91 + ); 92 + } else if (std.mem.eql(u8, path, "/favicon.svg") or std.mem.eql(u8, path, "/favicon.ico")) { 93 + httpRespond(conn, .ok, "image/svg+xml", 94 + \\<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 32 32"> 95 + \\<rect width="32" height="32" rx="6" fill="#1a1a2e"/> 96 + \\<text x="16" y="24" font-family="monospace" font-size="22" font-weight="bold" fill="#e94560" text-anchor="middle">Z</text> 97 + \\</svg> 98 + ); 99 + } else { 100 + respondText(conn, .not_found, "not found"); 101 + } 102 + } 103 + 104 + fn handlePost(conn: *websocket.Conn, path: []const u8, query: []const u8, body: []const u8, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { 105 + if (std.mem.eql(u8, path, "/admin/repo/ban")) { 106 + handleBan(conn, body, headers, ctx); 107 + } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.requestCrawl")) { 108 + handleRequestCrawl(conn, body, ctx.slurper); 109 + } else if (std.mem.eql(u8, path, "/admin/hosts/block")) { 110 + handleAdminBlockHost(conn, body, headers, ctx.persist); 111 + } else if (std.mem.eql(u8, path, "/admin/hosts/unblock")) { 112 + handleAdminUnblockHost(conn, body, headers, ctx.persist); 113 + } else if (std.mem.eql(u8, path, "/admin/backfill-collections")) { 114 + handleAdminBackfillTrigger(conn, query, headers, ctx.backfiller); 115 + } else { 116 + respondText(conn, .not_found, "not found"); 117 + } 118 + } 119 + 120 + fn handleBan(conn: *websocket.Conn, body: []const u8, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { 121 + if (!checkAdmin(conn, headers)) return; 122 + 123 + const parsed = std.json.parseFromSlice(struct { did: []const u8 }, ctx.persist.allocator, body, .{ .ignore_unknown_fields = true }) catch { 124 + respondJson(conn, .bad_request, "{\"error\":\"invalid JSON, expected {\\\"did\\\":\\\"...\\\"}\"}"); 125 + return; 126 + }; 127 + defer parsed.deinit(); 128 + const did = parsed.value.did; 129 + 130 + // resolve DID → UID and take down 131 + const uid = ctx.persist.uidForDid(did) catch { 132 + respondJson(conn, .internal_server_error, "{\"error\":\"failed to resolve DID\"}"); 133 + return; 134 + }; 135 + ctx.persist.takeDownUser(uid) catch { 136 + respondJson(conn, .internal_server_error, "{\"error\":\"takedown failed\"}"); 137 + return; 138 + }; 139 + 140 + // emit #account event so downstream consumers see the takedown 141 + if (buildAccountFrame(ctx.persist.allocator, did)) |frame_bytes| { 142 + if (ctx.persist.persist(.account, uid, frame_bytes)) |relay_seq| { 143 + ctx.bc.stats.relay_seq.store(relay_seq, .release); 144 + const broadcast_data = broadcaster.resequenceFrame(ctx.persist.allocator, frame_bytes, relay_seq) orelse frame_bytes; 145 + ctx.bc.broadcast(relay_seq, broadcast_data); 146 + log.info("admin: emitted #account takedown event for {s} (seq={d})", .{ did, relay_seq }); 147 + } else |err| { 148 + log.warn("admin: failed to persist #account takedown event: {s}", .{@errorName(err)}); 149 + } 150 + } 151 + 152 + log.info("admin: banned {s} (uid={d})", .{ did, uid }); 153 + respondJson(conn, .ok, "{\"success\":true}"); 154 + } 155 + 156 + fn handleRequestCrawl(conn: *websocket.Conn, body: []const u8, slurper: *slurper_mod.Slurper) void { 157 + const parsed = std.json.parseFromSlice(struct { hostname: []const u8 }, slurper.allocator, body, .{ .ignore_unknown_fields = true }) catch { 158 + respondJson(conn, .bad_request, "{\"error\":\"invalid JSON, expected {\\\"hostname\\\":\\\"...\\\"}\"}"); 159 + return; 160 + }; 161 + defer parsed.deinit(); 162 + 163 + // fast validation: hostname format (Go relay does this synchronously in handler) 164 + const hostname = slurper_mod.validateHostname(slurper.allocator, parsed.value.hostname) catch |err| { 165 + log.warn("requestCrawl rejected '{s}': {s}", .{ parsed.value.hostname, @errorName(err) }); 166 + respondJson(conn, .bad_request, switch (err) { 167 + error.EmptyHostname => "{\"error\":\"empty hostname\"}", 168 + error.InvalidCharacter => "{\"error\":\"hostname contains invalid characters\"}", 169 + error.InvalidLabel => "{\"error\":\"hostname has invalid label\"}", 170 + error.TooFewLabels => "{\"error\":\"hostname must have at least two labels (e.g. pds.example.com)\"}", 171 + error.LooksLikeIpAddress => "{\"error\":\"IP addresses not allowed, use a hostname\"}", 172 + error.PortNotAllowed => "{\"error\":\"port numbers not allowed\"}", 173 + error.LocalhostNotAllowed => "{\"error\":\"localhost not allowed\"}", 174 + else => "{\"error\":\"invalid hostname\"}", 175 + }); 176 + return; 177 + }; 178 + defer slurper.allocator.free(hostname); 179 + 180 + // fast validation: domain ban check 181 + if (slurper.persist.isDomainBanned(hostname)) { 182 + log.warn("requestCrawl rejected '{s}': domain banned", .{hostname}); 183 + respondJson(conn, .bad_request, "{\"error\":\"domain is banned\"}"); 184 + return; 185 + } 186 + 187 + // enqueue for async processing (describeServer check happens in crawl processor) 188 + slurper.addCrawlRequest(hostname) catch { 189 + respondJson(conn, .internal_server_error, "{\"error\":\"failed to store crawl request\"}"); 190 + return; 191 + }; 192 + 193 + log.info("crawl requested: {s}", .{hostname}); 194 + respondJson(conn, .ok, "{\"success\":true}"); 195 + } 196 + 197 + // --- admin host management --- 198 + 199 + fn handleAdminListHosts(conn: *websocket.Conn, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { 200 + if (!checkAdmin(conn, headers)) return; 201 + 202 + const persist = ctx.persist; 203 + const hosts = persist.listAllHosts(persist.allocator) catch { 204 + respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 205 + return; 206 + }; 207 + defer { 208 + for (hosts) |h| { 209 + persist.allocator.free(h.hostname); 210 + persist.allocator.free(h.status); 211 + } 212 + persist.allocator.free(hosts); 213 + } 214 + 215 + var list: std.ArrayListUnmanaged(u8) = .{}; 216 + defer list.deinit(persist.allocator); 217 + const w = list.writer(persist.allocator); 218 + 219 + w.writeAll("{\"hosts\":[") catch return; 220 + 221 + for (hosts, 0..) |host, i| { 222 + if (i > 0) w.writeByte(',') catch return; 223 + std.fmt.format(w, "{{\"id\":{d},\"hostname\":\"{s}\",\"status\":\"{s}\",\"last_seq\":{d},\"failed_attempts\":{d}}}", .{ 224 + host.id, 225 + host.hostname, 226 + host.status, 227 + host.last_seq, 228 + host.failed_attempts, 229 + }) catch return; 230 + } 231 + 232 + std.fmt.format(w, "],\"active_workers\":{d}}}", .{ctx.slurper.workerCount()}) catch return; 233 + respondJson(conn, .ok, list.items); 234 + } 235 + 236 + fn handleAdminBlockHost(conn: *websocket.Conn, body: []const u8, headers: *const websocket.Handshake.KeyValue, persist: *event_log_mod.DiskPersist) void { 237 + if (!checkAdmin(conn, headers)) return; 238 + 239 + const parsed = std.json.parseFromSlice(struct { hostname: []const u8 }, persist.allocator, body, .{ .ignore_unknown_fields = true }) catch { 240 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid JSON\"}"); 241 + return; 242 + }; 243 + defer parsed.deinit(); 244 + 245 + const host_info = persist.getOrCreateHost(parsed.value.hostname) catch { 246 + respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"host lookup failed\"}"); 247 + return; 248 + }; 249 + 250 + persist.updateHostStatus(host_info.id, "blocked") catch { 251 + respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"status update failed\"}"); 252 + return; 253 + }; 254 + 255 + log.info("admin: blocked host {s} (id={d})", .{ parsed.value.hostname, host_info.id }); 256 + respondJson(conn, .ok, "{\"success\":true}"); 257 + } 258 + 259 + fn handleAdminUnblockHost(conn: *websocket.Conn, body: []const u8, headers: *const websocket.Handshake.KeyValue, persist: *event_log_mod.DiskPersist) void { 260 + if (!checkAdmin(conn, headers)) return; 261 + 262 + const parsed = std.json.parseFromSlice(struct { hostname: []const u8 }, persist.allocator, body, .{ .ignore_unknown_fields = true }) catch { 263 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid JSON\"}"); 264 + return; 265 + }; 266 + defer parsed.deinit(); 267 + 268 + const host_info = persist.getOrCreateHost(parsed.value.hostname) catch { 269 + respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"host lookup failed\"}"); 270 + return; 271 + }; 272 + 273 + persist.updateHostStatus(host_info.id, "active") catch { 274 + respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"status update failed\"}"); 275 + return; 276 + }; 277 + persist.resetHostFailures(host_info.id) catch {}; 278 + 279 + log.info("admin: unblocked host {s} (id={d})", .{ parsed.value.hostname, host_info.id }); 280 + respondJson(conn, .ok, "{\"success\":true}"); 281 + } 282 + 283 + /// check admin auth via headers, send error response if not authorized. returns true if authorized. 284 + fn checkAdmin(conn: *websocket.Conn, headers: ?*const websocket.Handshake.KeyValue) bool { 285 + const admin_pw = std.posix.getenv("RELAY_ADMIN_PASSWORD") orelse { 286 + respondJson(conn, .forbidden, "{\"error\":\"admin endpoint not configured\"}"); 287 + return false; 288 + }; 289 + 290 + const kv = headers orelse { 291 + respondJson(conn, .unauthorized, "{\"error\":\"missing authorization header\"}"); 292 + return false; 293 + }; 294 + 295 + // handshake parser lowercases all header names 296 + const auth_value = kv.get("authorization") orelse { 297 + respondJson(conn, .unauthorized, "{\"error\":\"missing authorization header\"}"); 298 + return false; 299 + }; 300 + 301 + const bearer_prefix = "Bearer "; 302 + if (!std.mem.startsWith(u8, auth_value, bearer_prefix)) { 303 + respondJson(conn, .unauthorized, "{\"error\":\"invalid authorization scheme\"}"); 304 + return false; 305 + } 306 + const token = auth_value[bearer_prefix.len..]; 307 + if (!std.mem.eql(u8, token, admin_pw)) { 308 + respondJson(conn, .unauthorized, "{\"error\":\"invalid token\"}"); 309 + return false; 310 + } 311 + return true; 312 + } 313 + 314 + // --- XRPC endpoint handlers --- 315 + 316 + fn handleListRepos(conn: *websocket.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { 317 + const cursor_str = queryParam(query, "cursor") orelse "0"; 318 + const limit_str = queryParam(query, "limit") orelse "500"; 319 + 320 + const cursor_val = std.fmt.parseInt(i64, cursor_str, 10) catch { 321 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid cursor\"}"); 322 + return; 323 + }; 324 + if (cursor_val < 0) { 325 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"cursor must be >= 0\"}"); 326 + return; 327 + } 328 + 329 + const limit = std.fmt.parseInt(i64, limit_str, 10) catch { 330 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid limit\"}"); 331 + return; 332 + }; 333 + if (limit < 1 or limit > 1000) { 334 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"limit must be 1..1000\"}"); 335 + return; 336 + } 337 + 338 + // query accounts with repo state, paginated by UID 339 + // includes both local status and upstream_status for combined active check 340 + var result = persist.db.query( 341 + \\SELECT a.uid, a.did, a.status, a.upstream_status, COALESCE(r.rev, ''), COALESCE(r.commit_data_cid, '') 342 + \\FROM account a LEFT JOIN account_repo r ON a.uid = r.uid 343 + \\WHERE a.uid > $1 ORDER BY a.uid ASC LIMIT $2 344 + , .{ cursor_val, limit }) catch { 345 + respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 346 + return; 347 + }; 348 + defer result.deinit(); 349 + 350 + // build JSON response into a buffer 351 + var buf: [65536]u8 = undefined; 352 + var fbs = std.io.fixedBufferStream(&buf); 353 + const w = fbs.writer(); 354 + 355 + var count: i64 = 0; 356 + var last_uid: i64 = 0; 357 + 358 + w.writeAll("{\"repos\":[") catch return; 359 + 360 + while (result.nextUnsafe() catch null) |row| { 361 + if (count > 0) w.writeByte(',') catch return; 362 + 363 + const uid = row.get(i64, 0); 364 + const did = row.get([]const u8, 1); 365 + const local_status = row.get([]const u8, 2); 366 + const upstream_status = row.get([]const u8, 3); 367 + const rev = row.get([]const u8, 4); 368 + const head = row.get([]const u8, 5); 369 + 370 + // Go relay: Account.IsActive() — both local AND upstream must be active 371 + const local_ok = std.mem.eql(u8, local_status, "active"); 372 + const upstream_ok = std.mem.eql(u8, upstream_status, "active"); 373 + const active = local_ok and upstream_ok; 374 + // Go relay: Account.AccountStatus() — local takes priority 375 + const status = if (!local_ok) local_status else upstream_status; 376 + 377 + w.writeAll("{\"did\":\"") catch return; 378 + w.writeAll(did) catch return; 379 + w.writeAll("\"") catch return; 380 + 381 + if (head.len > 0) { 382 + w.writeAll(",\"head\":\"") catch return; 383 + w.writeAll(head) catch return; 384 + w.writeAll("\"") catch return; 385 + } 386 + if (rev.len > 0) { 387 + w.writeAll(",\"rev\":\"") catch return; 388 + w.writeAll(rev) catch return; 389 + w.writeAll("\"") catch return; 390 + } 391 + 392 + if (active) { 393 + w.writeAll(",\"active\":true") catch return; 394 + } else { 395 + w.writeAll(",\"active\":false,\"status\":\"") catch return; 396 + w.writeAll(status) catch return; 397 + w.writeAll("\"") catch return; 398 + } 399 + 400 + w.writeByte('}') catch return; 401 + last_uid = uid; 402 + count += 1; 403 + } 404 + 405 + w.writeByte(']') catch return; 406 + 407 + // include cursor if we got a full page 408 + if (count >= limit and count >= 2) { 409 + std.fmt.format(w, ",\"cursor\":\"{d}\"", .{last_uid}) catch return; 410 + } 411 + 412 + w.writeByte('}') catch return; 413 + 414 + respondJson(conn, .ok, fbs.getWritten()); 415 + } 416 + 417 + fn handleGetRepoStatus(conn: *websocket.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { 418 + var did_buf: [256]u8 = undefined; 419 + const did = queryParamDecoded(query, "did", &did_buf) orelse { 420 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"did parameter required\"}"); 421 + return; 422 + }; 423 + 424 + // basic DID syntax check 425 + if (!std.mem.startsWith(u8, did, "did:")) { 426 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid DID\"}"); 427 + return; 428 + } 429 + 430 + // look up account (includes both local and upstream status) 431 + var row = (persist.db.rowUnsafe( 432 + "SELECT a.uid, a.status, a.upstream_status, COALESCE(r.rev, '') FROM account a LEFT JOIN account_repo r ON a.uid = r.uid WHERE a.did = $1", 433 + .{did}, 434 + ) catch { 435 + respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 436 + return; 437 + }) orelse { 438 + respondJson(conn, .not_found, "{\"error\":\"RepoNotFound\",\"message\":\"account not found\"}"); 439 + return; 440 + }; 441 + defer row.deinit() catch {}; 442 + 443 + const local_status = row.get([]const u8, 1); 444 + const upstream_status = row.get([]const u8, 2); 445 + const rev = row.get([]const u8, 3); 446 + // Go relay: Account.IsActive() / AccountStatus() 447 + const local_ok = std.mem.eql(u8, local_status, "active"); 448 + const upstream_ok = std.mem.eql(u8, upstream_status, "active"); 449 + const active = local_ok and upstream_ok; 450 + const status = if (!local_ok) local_status else upstream_status; 451 + 452 + var buf: [4096]u8 = undefined; 453 + var fbs = std.io.fixedBufferStream(&buf); 454 + const w = fbs.writer(); 455 + 456 + w.writeAll("{\"did\":\"") catch return; 457 + w.writeAll(did) catch return; 458 + w.writeAll("\"") catch return; 459 + 460 + if (active) { 461 + w.writeAll(",\"active\":true") catch return; 462 + } else { 463 + w.writeAll(",\"active\":false,\"status\":\"") catch return; 464 + w.writeAll(status) catch return; 465 + w.writeAll("\"") catch return; 466 + } 467 + 468 + if (rev.len > 0) { 469 + w.writeAll(",\"rev\":\"") catch return; 470 + w.writeAll(rev) catch return; 471 + w.writeAll("\"") catch return; 472 + } 473 + 474 + w.writeByte('}') catch return; 475 + respondJson(conn, .ok, fbs.getWritten()); 476 + } 477 + 478 + fn handleGetLatestCommit(conn: *websocket.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { 479 + var did_buf: [256]u8 = undefined; 480 + const did = queryParamDecoded(query, "did", &did_buf) orelse { 481 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"did parameter required\"}"); 482 + return; 483 + }; 484 + 485 + if (!std.mem.startsWith(u8, did, "did:")) { 486 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid DID\"}"); 487 + return; 488 + } 489 + 490 + // look up account + repo state (includes both local and upstream status) 491 + var row = (persist.db.rowUnsafe( 492 + "SELECT a.status, a.upstream_status, COALESCE(r.rev, ''), COALESCE(r.commit_data_cid, '') FROM account a LEFT JOIN account_repo r ON a.uid = r.uid WHERE a.did = $1", 493 + .{did}, 494 + ) catch { 495 + respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 496 + return; 497 + }) orelse { 498 + respondJson(conn, .not_found, "{\"error\":\"RepoNotFound\",\"message\":\"account not found\"}"); 499 + return; 500 + }; 501 + defer row.deinit() catch {}; 502 + 503 + const local_status = row.get([]const u8, 0); 504 + const upstream_status = row.get([]const u8, 1); 505 + const rev = row.get([]const u8, 2); 506 + const cid = row.get([]const u8, 3); 507 + 508 + // combined status: local takes priority (Go relay: AccountStatus()) 509 + const status = if (!std.mem.eql(u8, local_status, "active")) local_status else upstream_status; 510 + 511 + // check account status (match Go relay behavior) 512 + if (std.mem.eql(u8, status, "takendown") or std.mem.eql(u8, status, "suspended")) { 513 + respondJson(conn, .forbidden, "{\"error\":\"RepoTakendown\",\"message\":\"account has been taken down\"}"); 514 + return; 515 + } else if (std.mem.eql(u8, status, "deactivated")) { 516 + respondJson(conn, .forbidden, "{\"error\":\"RepoDeactivated\",\"message\":\"account is deactivated\"}"); 517 + return; 518 + } else if (std.mem.eql(u8, status, "deleted")) { 519 + respondJson(conn, .forbidden, "{\"error\":\"RepoDeleted\",\"message\":\"account is deleted\"}"); 520 + return; 521 + } else if (!std.mem.eql(u8, status, "active")) { 522 + respondJson(conn, .forbidden, "{\"error\":\"RepoInactive\",\"message\":\"account is not active\"}"); 523 + return; 524 + } 525 + 526 + if (rev.len == 0 or cid.len == 0) { 527 + respondJson(conn, .not_found, "{\"error\":\"RepoNotSynchronized\",\"message\":\"relay has no repo data for this account\"}"); 528 + return; 529 + } 530 + 531 + var buf: [4096]u8 = undefined; 532 + var fbs = std.io.fixedBufferStream(&buf); 533 + const w = fbs.writer(); 534 + 535 + w.writeAll("{\"cid\":\"") catch return; 536 + w.writeAll(cid) catch return; 537 + w.writeAll("\",\"rev\":\"") catch return; 538 + w.writeAll(rev) catch return; 539 + w.writeAll("\"}") catch return; 540 + 541 + respondJson(conn, .ok, fbs.getWritten()); 542 + } 543 + 544 + fn handleListReposByCollection(conn: *websocket.Conn, query: []const u8, ci: *collection_index_mod.CollectionIndex) void { 545 + const collection = queryParam(query, "collection") orelse { 546 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"collection parameter required\"}"); 547 + return; 548 + }; 549 + 550 + if (collection.len == 0 or !std.mem.containsAtLeast(u8, collection, 1, ".")) { 551 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid collection NSID\"}"); 552 + return; 553 + } 554 + 555 + const limit_str = queryParam(query, "limit") orelse "500"; 556 + const limit = std.fmt.parseInt(usize, limit_str, 10) catch { 557 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid limit\"}"); 558 + return; 559 + }; 560 + if (limit < 1 or limit > 1000) { 561 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"limit must be 1..1000\"}"); 562 + return; 563 + } 564 + 565 + var cursor_buf: [256]u8 = undefined; 566 + const cursor_did = queryParamDecoded(query, "cursor", &cursor_buf); 567 + 568 + // scan collection index 569 + var did_buf: [65536]u8 = undefined; 570 + const ci_result = ci.listReposByCollection(collection, limit, cursor_did, &did_buf) catch { 571 + respondJson(conn, .internal_server_error, "{\"error\":\"InternalError\",\"message\":\"index scan failed\"}"); 572 + return; 573 + }; 574 + 575 + // build JSON response 576 + var buf: [65536]u8 = undefined; 577 + var fbs = std.io.fixedBufferStream(&buf); 578 + const w = fbs.writer(); 579 + 580 + w.writeAll("{\"repos\":[") catch return; 581 + for (0..ci_result.count) |i| { 582 + if (i > 0) w.writeByte(',') catch return; 583 + w.writeAll("{\"did\":\"") catch return; 584 + w.writeAll(ci_result.getDid(i)) catch return; 585 + w.writeAll("\"}") catch return; 586 + } 587 + w.writeByte(']') catch return; 588 + 589 + if (ci_result.last_did) |last| { 590 + if (ci_result.count >= limit) { 591 + w.writeAll(",\"cursor\":\"") catch return; 592 + w.writeAll(last) catch return; 593 + w.writeAll("\"") catch return; 594 + } 595 + } 596 + 597 + w.writeByte('}') catch return; 598 + respondJson(conn, .ok, fbs.getWritten()); 599 + } 600 + 601 + fn handleListHosts(conn: *websocket.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { 602 + const cursor_str = queryParam(query, "cursor") orelse "0"; 603 + const limit_str = queryParam(query, "limit") orelse "200"; 604 + 605 + const cursor_val = std.fmt.parseInt(i64, cursor_str, 10) catch { 606 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid cursor\"}"); 607 + return; 608 + }; 609 + if (cursor_val < 0) { 610 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"cursor must be >= 0\"}"); 611 + return; 612 + } 613 + 614 + const limit = std.fmt.parseInt(i64, limit_str, 10) catch { 615 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid limit\"}"); 616 + return; 617 + }; 618 + if (limit < 1 or limit > 1000) { 619 + respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"limit must be 1..1000\"}"); 620 + return; 621 + } 622 + 623 + var result = persist.db.query( 624 + "SELECT id, hostname, status, last_seq FROM host WHERE id > $1 AND last_seq > 0 ORDER BY id ASC LIMIT $2", 625 + .{ cursor_val, limit }, 626 + ) catch { 627 + respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 628 + return; 629 + }; 630 + defer result.deinit(); 631 + 632 + var buf: [65536]u8 = undefined; 633 + var fbs = std.io.fixedBufferStream(&buf); 634 + const w = fbs.writer(); 635 + 636 + var count: i64 = 0; 637 + var last_id: i64 = 0; 638 + 639 + w.writeAll("{\"hosts\":[") catch return; 640 + 641 + while (result.nextUnsafe() catch null) |row| { 642 + if (count > 0) w.writeByte(',') catch return; 643 + 644 + const id = row.get(i64, 0); 645 + const hostname = row.get([]const u8, 1); 646 + const status = row.get([]const u8, 2); 647 + const seq = row.get(i64, 3); 648 + 649 + w.writeAll("{\"hostname\":\"") catch return; 650 + w.writeAll(hostname) catch return; 651 + w.writeAll("\"") catch return; 652 + std.fmt.format(w, ",\"seq\":{d}", .{seq}) catch return; 653 + w.writeAll(",\"status\":\"") catch return; 654 + w.writeAll(status) catch return; 655 + w.writeAll("\"}") catch return; 656 + 657 + last_id = id; 658 + count += 1; 659 + } 660 + 661 + w.writeByte(']') catch return; 662 + 663 + if (count >= limit and count > 1) { 664 + std.fmt.format(w, ",\"cursor\":\"{d}\"", .{last_id}) catch return; 665 + } 666 + 667 + w.writeByte('}') catch return; 668 + respondJson(conn, .ok, fbs.getWritten()); 669 + } 670 + 671 + fn handleGetHostStatus(conn: *websocket.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { 672 + var hostname_buf: [256]u8 = undefined; 673 + const hostname = queryParamDecoded(query, "hostname", &hostname_buf) orelse { 674 + respondJson(conn, .bad_request, "{\"error\":\"InvalidRequest\",\"message\":\"hostname parameter required\"}"); 675 + return; 676 + }; 677 + 678 + // look up host 679 + var row = (persist.db.rowUnsafe( 680 + "SELECT id, hostname, status, last_seq FROM host WHERE hostname = $1", 681 + .{hostname}, 682 + ) catch { 683 + respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 684 + return; 685 + }) orelse { 686 + respondJson(conn, .bad_request, "{\"error\":\"HostNotFound\",\"message\":\"host not found\"}"); 687 + return; 688 + }; 689 + defer row.deinit() catch {}; 690 + 691 + const host_id = row.get(i64, 0); 692 + const host_name = row.get([]const u8, 1); 693 + const raw_status = row.get([]const u8, 2); 694 + const seq = row.get(i64, 3); 695 + 696 + // map internal status to lexicon hostStatus values 697 + const status = if (std.mem.eql(u8, raw_status, "blocked")) 698 + "banned" 699 + else if (std.mem.eql(u8, raw_status, "exhausted")) 700 + "offline" 701 + else 702 + raw_status; // active, idle pass through 703 + 704 + // count accounts on this host 705 + const account_count: i64 = if (persist.db.rowUnsafe( 706 + "SELECT COUNT(*) FROM account WHERE host_id = $1", 707 + .{host_id}, 708 + ) catch null) |cnt_row| blk: { 709 + var r = cnt_row; 710 + defer r.deinit() catch {}; 711 + break :blk r.get(i64, 0); 712 + } else 0; 713 + 714 + var buf: [4096]u8 = undefined; 715 + var fbs = std.io.fixedBufferStream(&buf); 716 + const w = fbs.writer(); 717 + 718 + w.writeAll("{\"hostname\":\"") catch return; 719 + w.writeAll(host_name) catch return; 720 + w.writeAll("\"") catch return; 721 + std.fmt.format(w, ",\"seq\":{d},\"accountCount\":{d}", .{ seq, account_count }) catch return; 722 + w.writeAll(",\"status\":\"") catch return; 723 + w.writeAll(status) catch return; 724 + w.writeAll("\"}") catch return; 725 + 726 + respondJson(conn, .ok, fbs.getWritten()); 727 + } 728 + 729 + /// build a CBOR #account frame for a takedown event. 730 + /// header: {op: 1, t: "#account"}, payload: {seq: 0, did: "...", time: "...", active: false, status: "takendown"} 731 + fn buildAccountFrame(allocator: std.mem.Allocator, did: []const u8) ?[]const u8 { 732 + const zat = @import("zat"); 733 + const cbor = zat.cbor; 734 + 735 + const header: cbor.Value = .{ .map = &.{ 736 + .{ .key = "op", .value = .{ .unsigned = 1 } }, 737 + .{ .key = "t", .value = .{ .text = "#account" } }, 738 + } }; 739 + 740 + var time_buf: [24]u8 = undefined; 741 + const time_str = formatTimestamp(&time_buf); 742 + 743 + const payload: cbor.Value = .{ .map = &.{ 744 + .{ .key = "seq", .value = .{ .unsigned = 0 } }, 745 + .{ .key = "did", .value = .{ .text = did } }, 746 + .{ .key = "time", .value = .{ .text = time_str } }, 747 + .{ .key = "active", .value = .{ .boolean = false } }, 748 + .{ .key = "status", .value = .{ .text = "takendown" } }, 749 + } }; 750 + 751 + const header_bytes = cbor.encodeAlloc(allocator, header) catch return null; 752 + const payload_bytes = cbor.encodeAlloc(allocator, payload) catch { 753 + allocator.free(header_bytes); 754 + return null; 755 + }; 756 + 757 + var frame = allocator.alloc(u8, header_bytes.len + payload_bytes.len) catch { 758 + allocator.free(header_bytes); 759 + allocator.free(payload_bytes); 760 + return null; 761 + }; 762 + @memcpy(frame[0..header_bytes.len], header_bytes); 763 + @memcpy(frame[header_bytes.len..], payload_bytes); 764 + 765 + allocator.free(header_bytes); 766 + allocator.free(payload_bytes); 767 + 768 + return frame; 769 + } 770 + 771 + /// format current UTC time as ISO 8601 (YYYY-MM-DDTHH:MM:SSZ) 772 + fn formatTimestamp(buf: *[24]u8) []const u8 { 773 + const ts: u64 = @intCast(std.time.timestamp()); 774 + const es = std.time.epoch.EpochSeconds{ .secs = ts }; 775 + const day = es.getEpochDay(); 776 + const yd = day.calculateYearDay(); 777 + const md = yd.calculateMonthDay(); 778 + const ds = es.getDaySeconds(); 779 + 780 + return std.fmt.bufPrint(buf, "{d:0>4}-{d:0>2}-{d:0>2}T{d:0>2}:{d:0>2}:{d:0>2}Z", .{ 781 + yd.year, 782 + @as(u32, @intFromEnum(md.month)) + 1, 783 + @as(u32, md.day_index) + 1, 784 + ds.getHoursIntoDay(), 785 + ds.getMinutesIntoHour(), 786 + ds.getSecondsIntoMinute(), 787 + }) catch "1970-01-01T00:00:00Z"; 788 + } 789 + 790 + // --- backfill handlers --- 791 + 792 + fn handleAdminBackfillTrigger(conn: *websocket.Conn, query: []const u8, headers: *const websocket.Handshake.KeyValue, backfiller: *backfill_mod.Backfiller) void { 793 + if (!checkAdmin(conn, headers)) return; 794 + 795 + const source = queryParam(query, "source") orelse "bsky.network"; 796 + 797 + backfiller.start(source) catch |err| { 798 + switch (err) { 799 + error.AlreadyRunning => { 800 + respondJson(conn, .conflict, "{\"error\":\"backfill already in progress\"}"); 801 + }, 802 + else => { 803 + respondJson(conn, .internal_server_error, "{\"error\":\"failed to start backfill\"}"); 804 + }, 805 + } 806 + return; 807 + }; 808 + 809 + var buf: [256]u8 = undefined; 810 + const body = std.fmt.bufPrint(&buf, "{{\"status\":\"started\",\"source\":\"{s}\"}}", .{source}) catch { 811 + respondJson(conn, .ok, "{\"status\":\"started\"}"); 812 + return; 813 + }; 814 + respondJson(conn, .ok, body); 815 + } 816 + 817 + fn handleAdminBackfillStatus(conn: *websocket.Conn, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { 818 + if (!checkAdmin(conn, headers)) return; 819 + 820 + const body = ctx.backfiller.getStatus(ctx.backfiller.allocator) catch { 821 + respondJson(conn, .internal_server_error, "{\"error\":\"failed to query backfill status\"}"); 822 + return; 823 + }; 824 + defer ctx.backfiller.allocator.free(body); 825 + 826 + respondJson(conn, .ok, body); 827 + } 828 + 829 + // --- query string helpers --- 830 + 831 + fn queryParam(query: []const u8, name: []const u8) ?[]const u8 { 832 + if (query.len == 0) return null; 833 + var iter = std.mem.splitScalar(u8, query, '&'); 834 + while (iter.next()) |pair| { 835 + const eq = std.mem.indexOfScalar(u8, pair, '=') orelse continue; 836 + if (std.mem.eql(u8, pair[0..eq], name)) { 837 + return pair[eq + 1 ..]; 838 + } 839 + } 840 + return null; 841 + } 842 + 843 + /// like queryParam but percent-decodes the value into buf. 844 + /// returns null if the param is missing, or a slice into buf with the decoded value. 845 + fn queryParamDecoded(query: []const u8, name: []const u8, buf: []u8) ?[]const u8 { 846 + const raw = queryParam(query, name) orelse return null; 847 + var i: usize = 0; 848 + var out: usize = 0; 849 + while (i < raw.len) { 850 + if (raw[i] == '%' and i + 2 < raw.len) { 851 + const hi = hexVal(raw[i + 1]) orelse { 852 + if (out >= buf.len) return null; 853 + buf[out] = raw[i]; 854 + out += 1; 855 + i += 1; 856 + continue; 857 + }; 858 + const lo = hexVal(raw[i + 2]) orelse { 859 + if (out >= buf.len) return null; 860 + buf[out] = raw[i]; 861 + out += 1; 862 + i += 1; 863 + continue; 864 + }; 865 + if (out >= buf.len) return null; 866 + buf[out] = (@as(u8, hi) << 4) | @as(u8, lo); 867 + out += 1; 868 + i += 3; 869 + } else if (raw[i] == '+') { 870 + if (out >= buf.len) return null; 871 + buf[out] = ' '; 872 + out += 1; 873 + i += 1; 874 + } else { 875 + if (out >= buf.len) return null; 876 + buf[out] = raw[i]; 877 + out += 1; 878 + i += 1; 879 + } 880 + } 881 + return buf[0..out]; 882 + } 883 + 884 + fn hexVal(c: u8) ?u4 { 885 + return switch (c) { 886 + '0'...'9' => @intCast(c - '0'), 887 + 'a'...'f' => @intCast(c - 'a' + 10), 888 + 'A'...'F' => @intCast(c - 'A' + 10), 889 + else => null, 890 + }; 891 + } 892 + 893 + // --- response helpers (write raw HTTP to websocket.Conn) --- 894 + 895 + fn httpRespond(conn: *websocket.Conn, status: http.Status, content_type: []const u8, body: []const u8) void { 896 + var buf: [512]u8 = undefined; 897 + const header = std.fmt.bufPrint(&buf, "HTTP/1.1 {s}\r\nContent-Type: {s}\r\nContent-Length: {d}\r\nConnection: close\r\nServer: zlay\r\n\r\n", .{ 898 + httpStatusLine(status), 899 + content_type, 900 + body.len, 901 + }) catch return; 902 + conn.writeFramed(header) catch return; 903 + if (body.len > 0) conn.writeFramed(body) catch return; 904 + } 905 + 906 + fn respondJson(conn: *websocket.Conn, status: http.Status, body: []const u8) void { 907 + httpRespond(conn, status, "application/json", body); 908 + } 909 + 910 + fn respondText(conn: *websocket.Conn, status: http.Status, body: []const u8) void { 911 + httpRespond(conn, status, "text/plain", body); 912 + } 913 + 914 + fn httpStatusLine(status: http.Status) []const u8 { 915 + return switch (status) { 916 + .ok => "200 OK", 917 + .bad_request => "400 Bad Request", 918 + .unauthorized => "401 Unauthorized", 919 + .forbidden => "403 Forbidden", 920 + .not_found => "404 Not Found", 921 + .method_not_allowed => "405 Method Not Allowed", 922 + .conflict => "409 Conflict", 923 + .internal_server_error => "500 Internal Server Error", 924 + else => "500 Internal Server Error", 925 + }; 926 + }
+3 -912
src/main.zig
··· 31 31 const event_log_mod = @import("event_log.zig"); 32 32 const collection_index_mod = @import("collection_index.zig"); 33 33 const backfill_mod = @import("backfill.zig"); 34 + const api = @import("api.zig"); 34 35 35 36 const log = std.log.scoped(.relay); 36 37 ··· 40 41 pub const default_stack_size = 2 * 1024 * 1024; 41 42 42 43 var shutdown_flag: std.atomic.Value(bool) = .{ .raw = false }; 43 - 44 - /// context for HTTP fallback handlers (passed as opaque pointer through broadcaster) 45 - const HttpContext = struct { 46 - stats: *broadcaster.Stats, 47 - persist: *event_log_mod.DiskPersist, 48 - slurper: *slurper_mod.Slurper, 49 - collection_index: *collection_index_mod.CollectionIndex, 50 - backfiller: *backfill_mod.Backfiller, 51 - bc: *broadcaster.Broadcaster, 52 - validator: *validator_mod.Validator, 53 - }; 54 44 55 45 /// metrics-only server on the internal port 56 46 const MetricsServer = struct { ··· 169 159 const gc_thread = try std.Thread.spawn(.{ .stack_size = default_stack_size }, gcLoop, .{&dp}); 170 160 171 161 // wire HTTP fallback into broadcaster (all API endpoints served on WS port) 172 - var http_context = HttpContext{ 162 + var http_context = api.HttpContext{ 173 163 .stats = &bc.stats, 174 164 .persist = &dp, 175 165 .slurper = &slurper, ··· 178 168 .bc = &bc, 179 169 .validator = &val, 180 170 }; 181 - bc.http_fallback = handleHttpRequest; 171 + bc.http_fallback = api.handleHttpRequest; 182 172 bc.http_fallback_ctx = @ptrCast(&http_context); 183 173 184 174 // start metrics-only server (internal port) ··· 267 257 .flags = 0, 268 258 }; 269 259 std.posix.sigaction(std.posix.SIG.PIPE, &ignore_act, null); 270 - } 271 - 272 - // --- HTTP fallback handler (called from broadcaster via websocket httpFallback) --- 273 - 274 - fn handleHttpRequest( 275 - conn: *websocket.Conn, 276 - method: []const u8, 277 - url: []const u8, 278 - body: []const u8, 279 - headers: *const websocket.Handshake.KeyValue, 280 - opaque_ctx: ?*anyopaque, 281 - ) void { 282 - const ctx: *HttpContext = @ptrCast(@alignCast(opaque_ctx orelse return)); 283 - 284 - const qmark = std.mem.indexOfScalar(u8, url, '?'); 285 - const path = url[0..(qmark orelse url.len)]; 286 - const query = if (qmark) |q| url[q + 1 ..] else ""; 287 - 288 - if (std.mem.eql(u8, method, "GET")) { 289 - handleGet(conn, path, query, headers, ctx); 290 - } else if (std.mem.eql(u8, method, "POST")) { 291 - handlePost(conn, path, query, body, headers, ctx); 292 - } else { 293 - respondText(conn, .method_not_allowed, "method not allowed"); 294 - } 295 - } 296 - 297 - fn handleGet(conn: *websocket.Conn, path: []const u8, query: []const u8, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { 298 - if (std.mem.eql(u8, path, "/_health") or std.mem.eql(u8, path, "/xrpc/_health")) { 299 - respondJson(conn, .ok, "{\"status\":\"ok\"}"); 300 - } else if (std.mem.eql(u8, path, "/_stats")) { 301 - var stats_buf: [4096]u8 = undefined; 302 - const body = broadcaster.formatStatsResponse(ctx.stats, &stats_buf); 303 - respondJson(conn, .ok, body); 304 - } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listRepos")) { 305 - handleListRepos(conn, query, ctx.persist); 306 - } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.getRepoStatus")) { 307 - handleGetRepoStatus(conn, query, ctx.persist); 308 - } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.getLatestCommit")) { 309 - handleGetLatestCommit(conn, query, ctx.persist); 310 - } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listReposByCollection")) { 311 - handleListReposByCollection(conn, query, ctx.collection_index); 312 - } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listHosts")) { 313 - handleListHosts(conn, query, ctx.persist); 314 - } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.getHostStatus")) { 315 - handleGetHostStatus(conn, query, ctx.persist); 316 - } else if (std.mem.eql(u8, path, "/admin/hosts")) { 317 - handleAdminListHosts(conn, headers, ctx); 318 - } else if (std.mem.eql(u8, path, "/admin/backfill-collections")) { 319 - handleAdminBackfillStatus(conn, headers, ctx); 320 - } else if (std.mem.eql(u8, path, "/")) { 321 - respondText(conn, .ok, 322 - \\ _ 323 - \\ ___| | __ _ _ _ 324 - \\|_ / |/ _` | | | | 325 - \\ / /| | (_| | |_| | 326 - \\/___|_|\__,_|\__, | 327 - \\ |___/ 328 - \\ 329 - \\This is an atproto [https://atproto.com] relay instance, 330 - \\running the zlay codebase [https://tangled.org/zzstoatzz.io/zlay] 331 - \\ 332 - \\The firehose WebSocket path is at: /xrpc/com.atproto.sync.subscribeRepos 333 - \\ 334 - ); 335 - } else if (std.mem.eql(u8, path, "/favicon.svg") or std.mem.eql(u8, path, "/favicon.ico")) { 336 - httpRespond(conn, .ok, "image/svg+xml", 337 - \\<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 32 32"> 338 - \\<rect width="32" height="32" rx="6" fill="#1a1a2e"/> 339 - \\<text x="16" y="24" font-family="monospace" font-size="22" font-weight="bold" fill="#e94560" text-anchor="middle">Z</text> 340 - \\</svg> 341 - ); 342 - } else { 343 - respondText(conn, .not_found, "not found"); 344 - } 345 - } 346 - 347 - fn handlePost(conn: *websocket.Conn, path: []const u8, query: []const u8, body: []const u8, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { 348 - if (std.mem.eql(u8, path, "/admin/repo/ban")) { 349 - handleBan(conn, body, headers, ctx); 350 - } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.requestCrawl")) { 351 - handleRequestCrawl(conn, body, ctx.slurper); 352 - } else if (std.mem.eql(u8, path, "/admin/hosts/block")) { 353 - handleAdminBlockHost(conn, body, headers, ctx.persist); 354 - } else if (std.mem.eql(u8, path, "/admin/hosts/unblock")) { 355 - handleAdminUnblockHost(conn, body, headers, ctx.persist); 356 - } else if (std.mem.eql(u8, path, "/admin/backfill-collections")) { 357 - handleAdminBackfillTrigger(conn, query, headers, ctx.backfiller); 358 - } else { 359 - respondText(conn, .not_found, "not found"); 360 - } 361 - } 362 - 363 - fn handleBan(conn: *websocket.Conn, body: []const u8, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { 364 - if (!checkAdmin(conn, headers)) return; 365 - 366 - const parsed = std.json.parseFromSlice(struct { did: []const u8 }, ctx.persist.allocator, body, .{ .ignore_unknown_fields = true }) catch { 367 - respondJson(conn, .bad_request, "{\"error\":\"invalid JSON, expected {\\\"did\\\":\\\"...\\\"}\"}"); 368 - return; 369 - }; 370 - defer parsed.deinit(); 371 - const did = parsed.value.did; 372 - 373 - // resolve DID → UID and take down 374 - const uid = ctx.persist.uidForDid(did) catch { 375 - respondJson(conn, .internal_server_error, "{\"error\":\"failed to resolve DID\"}"); 376 - return; 377 - }; 378 - ctx.persist.takeDownUser(uid) catch { 379 - respondJson(conn, .internal_server_error, "{\"error\":\"takedown failed\"}"); 380 - return; 381 - }; 382 - 383 - // emit #account event so downstream consumers see the takedown 384 - if (buildAccountFrame(ctx.persist.allocator, did)) |frame_bytes| { 385 - if (ctx.persist.persist(.account, uid, frame_bytes)) |relay_seq| { 386 - ctx.bc.stats.relay_seq.store(relay_seq, .release); 387 - const broadcast_data = broadcaster.resequenceFrame(ctx.persist.allocator, frame_bytes, relay_seq) orelse frame_bytes; 388 - ctx.bc.broadcast(relay_seq, broadcast_data); 389 - log.info("admin: emitted #account takedown event for {s} (seq={d})", .{ did, relay_seq }); 390 - } else |err| { 391 - log.warn("admin: failed to persist #account takedown event: {s}", .{@errorName(err)}); 392 - } 393 - } 394 - 395 - log.info("admin: banned {s} (uid={d})", .{ did, uid }); 396 - respondJson(conn, .ok, "{\"success\":true}"); 397 - } 398 - 399 - fn handleRequestCrawl(conn: *websocket.Conn, body: []const u8, slurper: *slurper_mod.Slurper) void { 400 - const parsed = std.json.parseFromSlice(struct { hostname: []const u8 }, slurper.allocator, body, .{ .ignore_unknown_fields = true }) catch { 401 - respondJson(conn, .bad_request, "{\"error\":\"invalid JSON, expected {\\\"hostname\\\":\\\"...\\\"}\"}"); 402 - return; 403 - }; 404 - defer parsed.deinit(); 405 - 406 - // fast validation: hostname format (Go relay does this synchronously in handler) 407 - const hostname = slurper_mod.validateHostname(slurper.allocator, parsed.value.hostname) catch |err| { 408 - log.warn("requestCrawl rejected '{s}': {s}", .{ parsed.value.hostname, @errorName(err) }); 409 - respondJson(conn, .bad_request, switch (err) { 410 - error.EmptyHostname => "{\"error\":\"empty hostname\"}", 411 - error.InvalidCharacter => "{\"error\":\"hostname contains invalid characters\"}", 412 - error.InvalidLabel => "{\"error\":\"hostname has invalid label\"}", 413 - error.TooFewLabels => "{\"error\":\"hostname must have at least two labels (e.g. pds.example.com)\"}", 414 - error.LooksLikeIpAddress => "{\"error\":\"IP addresses not allowed, use a hostname\"}", 415 - error.PortNotAllowed => "{\"error\":\"port numbers not allowed\"}", 416 - error.LocalhostNotAllowed => "{\"error\":\"localhost not allowed\"}", 417 - else => "{\"error\":\"invalid hostname\"}", 418 - }); 419 - return; 420 - }; 421 - defer slurper.allocator.free(hostname); 422 - 423 - // fast validation: domain ban check 424 - if (slurper.persist.isDomainBanned(hostname)) { 425 - log.warn("requestCrawl rejected '{s}': domain banned", .{hostname}); 426 - respondJson(conn, .bad_request, "{\"error\":\"domain is banned\"}"); 427 - return; 428 - } 429 - 430 - // enqueue for async processing (describeServer check happens in crawl processor) 431 - slurper.addCrawlRequest(hostname) catch { 432 - respondJson(conn, .internal_server_error, "{\"error\":\"failed to store crawl request\"}"); 433 - return; 434 - }; 435 - 436 - log.info("crawl requested: {s}", .{hostname}); 437 - respondJson(conn, .ok, "{\"success\":true}"); 438 - } 439 - 440 - // --- admin host management --- 441 - 442 - fn handleAdminListHosts(conn: *websocket.Conn, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { 443 - if (!checkAdmin(conn, headers)) return; 444 - 445 - const persist = ctx.persist; 446 - const hosts = persist.listAllHosts(persist.allocator) catch { 447 - respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 448 - return; 449 - }; 450 - defer { 451 - for (hosts) |h| { 452 - persist.allocator.free(h.hostname); 453 - persist.allocator.free(h.status); 454 - } 455 - persist.allocator.free(hosts); 456 - } 457 - 458 - var list: std.ArrayListUnmanaged(u8) = .{}; 459 - defer list.deinit(persist.allocator); 460 - const w = list.writer(persist.allocator); 461 - 462 - w.writeAll("{\"hosts\":[") catch return; 463 - 464 - for (hosts, 0..) |host, i| { 465 - if (i > 0) w.writeByte(',') catch return; 466 - std.fmt.format(w, "{{\"id\":{d},\"hostname\":\"{s}\",\"status\":\"{s}\",\"last_seq\":{d},\"failed_attempts\":{d}}}", .{ 467 - host.id, 468 - host.hostname, 469 - host.status, 470 - host.last_seq, 471 - host.failed_attempts, 472 - }) catch return; 473 - } 474 - 475 - std.fmt.format(w, "],\"active_workers\":{d}}}", .{ctx.slurper.workerCount()}) catch return; 476 - respondJson(conn, .ok, list.items); 477 - } 478 - 479 - fn handleAdminBlockHost(conn: *websocket.Conn, body: []const u8, headers: *const websocket.Handshake.KeyValue, persist: *event_log_mod.DiskPersist) void { 480 - if (!checkAdmin(conn, headers)) return; 481 - 482 - const parsed = std.json.parseFromSlice(struct { hostname: []const u8 }, persist.allocator, body, .{ .ignore_unknown_fields = true }) catch { 483 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid JSON\"}"); 484 - return; 485 - }; 486 - defer parsed.deinit(); 487 - 488 - const host_info = persist.getOrCreateHost(parsed.value.hostname) catch { 489 - respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"host lookup failed\"}"); 490 - return; 491 - }; 492 - 493 - persist.updateHostStatus(host_info.id, "blocked") catch { 494 - respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"status update failed\"}"); 495 - return; 496 - }; 497 - 498 - log.info("admin: blocked host {s} (id={d})", .{ parsed.value.hostname, host_info.id }); 499 - respondJson(conn, .ok, "{\"success\":true}"); 500 - } 501 - 502 - fn handleAdminUnblockHost(conn: *websocket.Conn, body: []const u8, headers: *const websocket.Handshake.KeyValue, persist: *event_log_mod.DiskPersist) void { 503 - if (!checkAdmin(conn, headers)) return; 504 - 505 - const parsed = std.json.parseFromSlice(struct { hostname: []const u8 }, persist.allocator, body, .{ .ignore_unknown_fields = true }) catch { 506 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid JSON\"}"); 507 - return; 508 - }; 509 - defer parsed.deinit(); 510 - 511 - const host_info = persist.getOrCreateHost(parsed.value.hostname) catch { 512 - respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"host lookup failed\"}"); 513 - return; 514 - }; 515 - 516 - persist.updateHostStatus(host_info.id, "active") catch { 517 - respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"status update failed\"}"); 518 - return; 519 - }; 520 - persist.resetHostFailures(host_info.id) catch {}; 521 - 522 - log.info("admin: unblocked host {s} (id={d})", .{ parsed.value.hostname, host_info.id }); 523 - respondJson(conn, .ok, "{\"success\":true}"); 524 - } 525 - 526 - /// check admin auth via headers, send error response if not authorized. returns true if authorized. 527 - fn checkAdmin(conn: *websocket.Conn, headers: ?*const websocket.Handshake.KeyValue) bool { 528 - const admin_pw = std.posix.getenv("RELAY_ADMIN_PASSWORD") orelse { 529 - respondJson(conn, .forbidden, "{\"error\":\"admin endpoint not configured\"}"); 530 - return false; 531 - }; 532 - 533 - const kv = headers orelse { 534 - respondJson(conn, .unauthorized, "{\"error\":\"missing authorization header\"}"); 535 - return false; 536 - }; 537 - 538 - // handshake parser lowercases all header names 539 - const auth_value = kv.get("authorization") orelse { 540 - respondJson(conn, .unauthorized, "{\"error\":\"missing authorization header\"}"); 541 - return false; 542 - }; 543 - 544 - const bearer_prefix = "Bearer "; 545 - if (!std.mem.startsWith(u8, auth_value, bearer_prefix)) { 546 - respondJson(conn, .unauthorized, "{\"error\":\"invalid authorization scheme\"}"); 547 - return false; 548 - } 549 - const token = auth_value[bearer_prefix.len..]; 550 - if (!std.mem.eql(u8, token, admin_pw)) { 551 - respondJson(conn, .unauthorized, "{\"error\":\"invalid token\"}"); 552 - return false; 553 - } 554 - return true; 555 - } 556 - 557 - // --- XRPC endpoint handlers --- 558 - 559 - fn handleListRepos(conn: *websocket.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { 560 - const cursor_str = queryParam(query, "cursor") orelse "0"; 561 - const limit_str = queryParam(query, "limit") orelse "500"; 562 - 563 - const cursor_val = std.fmt.parseInt(i64, cursor_str, 10) catch { 564 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid cursor\"}"); 565 - return; 566 - }; 567 - if (cursor_val < 0) { 568 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"cursor must be >= 0\"}"); 569 - return; 570 - } 571 - 572 - const limit = std.fmt.parseInt(i64, limit_str, 10) catch { 573 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid limit\"}"); 574 - return; 575 - }; 576 - if (limit < 1 or limit > 1000) { 577 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"limit must be 1..1000\"}"); 578 - return; 579 - } 580 - 581 - // query accounts with repo state, paginated by UID 582 - // includes both local status and upstream_status for combined active check 583 - var result = persist.db.query( 584 - \\SELECT a.uid, a.did, a.status, a.upstream_status, COALESCE(r.rev, ''), COALESCE(r.commit_data_cid, '') 585 - \\FROM account a LEFT JOIN account_repo r ON a.uid = r.uid 586 - \\WHERE a.uid > $1 ORDER BY a.uid ASC LIMIT $2 587 - , .{ cursor_val, limit }) catch { 588 - respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 589 - return; 590 - }; 591 - defer result.deinit(); 592 - 593 - // build JSON response into a buffer 594 - var buf: [65536]u8 = undefined; 595 - var fbs = std.io.fixedBufferStream(&buf); 596 - const w = fbs.writer(); 597 - 598 - var count: i64 = 0; 599 - var last_uid: i64 = 0; 600 - 601 - w.writeAll("{\"repos\":[") catch return; 602 - 603 - while (result.nextUnsafe() catch null) |row| { 604 - if (count > 0) w.writeByte(',') catch return; 605 - 606 - const uid = row.get(i64, 0); 607 - const did = row.get([]const u8, 1); 608 - const local_status = row.get([]const u8, 2); 609 - const upstream_status = row.get([]const u8, 3); 610 - const rev = row.get([]const u8, 4); 611 - const head = row.get([]const u8, 5); 612 - 613 - // Go relay: Account.IsActive() — both local AND upstream must be active 614 - const local_ok = std.mem.eql(u8, local_status, "active"); 615 - const upstream_ok = std.mem.eql(u8, upstream_status, "active"); 616 - const active = local_ok and upstream_ok; 617 - // Go relay: Account.AccountStatus() — local takes priority 618 - const status = if (!local_ok) local_status else upstream_status; 619 - 620 - w.writeAll("{\"did\":\"") catch return; 621 - w.writeAll(did) catch return; 622 - w.writeAll("\"") catch return; 623 - 624 - if (head.len > 0) { 625 - w.writeAll(",\"head\":\"") catch return; 626 - w.writeAll(head) catch return; 627 - w.writeAll("\"") catch return; 628 - } 629 - if (rev.len > 0) { 630 - w.writeAll(",\"rev\":\"") catch return; 631 - w.writeAll(rev) catch return; 632 - w.writeAll("\"") catch return; 633 - } 634 - 635 - if (active) { 636 - w.writeAll(",\"active\":true") catch return; 637 - } else { 638 - w.writeAll(",\"active\":false,\"status\":\"") catch return; 639 - w.writeAll(status) catch return; 640 - w.writeAll("\"") catch return; 641 - } 642 - 643 - w.writeByte('}') catch return; 644 - last_uid = uid; 645 - count += 1; 646 - } 647 - 648 - w.writeByte(']') catch return; 649 - 650 - // include cursor if we got a full page 651 - if (count >= limit and count >= 2) { 652 - std.fmt.format(w, ",\"cursor\":\"{d}\"", .{last_uid}) catch return; 653 - } 654 - 655 - w.writeByte('}') catch return; 656 - 657 - respondJson(conn, .ok, fbs.getWritten()); 658 - } 659 - 660 - fn handleGetRepoStatus(conn: *websocket.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { 661 - var did_buf: [256]u8 = undefined; 662 - const did = queryParamDecoded(query, "did", &did_buf) orelse { 663 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"did parameter required\"}"); 664 - return; 665 - }; 666 - 667 - // basic DID syntax check 668 - if (!std.mem.startsWith(u8, did, "did:")) { 669 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid DID\"}"); 670 - return; 671 - } 672 - 673 - // look up account (includes both local and upstream status) 674 - var row = (persist.db.rowUnsafe( 675 - "SELECT a.uid, a.status, a.upstream_status, COALESCE(r.rev, '') FROM account a LEFT JOIN account_repo r ON a.uid = r.uid WHERE a.did = $1", 676 - .{did}, 677 - ) catch { 678 - respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 679 - return; 680 - }) orelse { 681 - respondJson(conn, .not_found, "{\"error\":\"RepoNotFound\",\"message\":\"account not found\"}"); 682 - return; 683 - }; 684 - defer row.deinit() catch {}; 685 - 686 - const local_status = row.get([]const u8, 1); 687 - const upstream_status = row.get([]const u8, 2); 688 - const rev = row.get([]const u8, 3); 689 - // Go relay: Account.IsActive() / AccountStatus() 690 - const local_ok = std.mem.eql(u8, local_status, "active"); 691 - const upstream_ok = std.mem.eql(u8, upstream_status, "active"); 692 - const active = local_ok and upstream_ok; 693 - const status = if (!local_ok) local_status else upstream_status; 694 - 695 - var buf: [4096]u8 = undefined; 696 - var fbs = std.io.fixedBufferStream(&buf); 697 - const w = fbs.writer(); 698 - 699 - w.writeAll("{\"did\":\"") catch return; 700 - w.writeAll(did) catch return; 701 - w.writeAll("\"") catch return; 702 - 703 - if (active) { 704 - w.writeAll(",\"active\":true") catch return; 705 - } else { 706 - w.writeAll(",\"active\":false,\"status\":\"") catch return; 707 - w.writeAll(status) catch return; 708 - w.writeAll("\"") catch return; 709 - } 710 - 711 - if (rev.len > 0) { 712 - w.writeAll(",\"rev\":\"") catch return; 713 - w.writeAll(rev) catch return; 714 - w.writeAll("\"") catch return; 715 - } 716 - 717 - w.writeByte('}') catch return; 718 - respondJson(conn, .ok, fbs.getWritten()); 719 - } 720 - 721 - fn handleGetLatestCommit(conn: *websocket.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { 722 - var did_buf: [256]u8 = undefined; 723 - const did = queryParamDecoded(query, "did", &did_buf) orelse { 724 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"did parameter required\"}"); 725 - return; 726 - }; 727 - 728 - if (!std.mem.startsWith(u8, did, "did:")) { 729 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid DID\"}"); 730 - return; 731 - } 732 - 733 - // look up account + repo state (includes both local and upstream status) 734 - var row = (persist.db.rowUnsafe( 735 - "SELECT a.status, a.upstream_status, COALESCE(r.rev, ''), COALESCE(r.commit_data_cid, '') FROM account a LEFT JOIN account_repo r ON a.uid = r.uid WHERE a.did = $1", 736 - .{did}, 737 - ) catch { 738 - respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 739 - return; 740 - }) orelse { 741 - respondJson(conn, .not_found, "{\"error\":\"RepoNotFound\",\"message\":\"account not found\"}"); 742 - return; 743 - }; 744 - defer row.deinit() catch {}; 745 - 746 - const local_status = row.get([]const u8, 0); 747 - const upstream_status = row.get([]const u8, 1); 748 - const rev = row.get([]const u8, 2); 749 - const cid = row.get([]const u8, 3); 750 - 751 - // combined status: local takes priority (Go relay: AccountStatus()) 752 - const status = if (!std.mem.eql(u8, local_status, "active")) local_status else upstream_status; 753 - 754 - // check account status (match Go relay behavior) 755 - if (std.mem.eql(u8, status, "takendown") or std.mem.eql(u8, status, "suspended")) { 756 - respondJson(conn, .forbidden, "{\"error\":\"RepoTakendown\",\"message\":\"account has been taken down\"}"); 757 - return; 758 - } else if (std.mem.eql(u8, status, "deactivated")) { 759 - respondJson(conn, .forbidden, "{\"error\":\"RepoDeactivated\",\"message\":\"account is deactivated\"}"); 760 - return; 761 - } else if (std.mem.eql(u8, status, "deleted")) { 762 - respondJson(conn, .forbidden, "{\"error\":\"RepoDeleted\",\"message\":\"account is deleted\"}"); 763 - return; 764 - } else if (!std.mem.eql(u8, status, "active")) { 765 - respondJson(conn, .forbidden, "{\"error\":\"RepoInactive\",\"message\":\"account is not active\"}"); 766 - return; 767 - } 768 - 769 - if (rev.len == 0 or cid.len == 0) { 770 - respondJson(conn, .not_found, "{\"error\":\"RepoNotSynchronized\",\"message\":\"relay has no repo data for this account\"}"); 771 - return; 772 - } 773 - 774 - var buf: [4096]u8 = undefined; 775 - var fbs = std.io.fixedBufferStream(&buf); 776 - const w = fbs.writer(); 777 - 778 - w.writeAll("{\"cid\":\"") catch return; 779 - w.writeAll(cid) catch return; 780 - w.writeAll("\",\"rev\":\"") catch return; 781 - w.writeAll(rev) catch return; 782 - w.writeAll("\"}") catch return; 783 - 784 - respondJson(conn, .ok, fbs.getWritten()); 785 - } 786 - 787 - fn handleListReposByCollection(conn: *websocket.Conn, query: []const u8, ci: *collection_index_mod.CollectionIndex) void { 788 - const collection = queryParam(query, "collection") orelse { 789 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"collection parameter required\"}"); 790 - return; 791 - }; 792 - 793 - if (collection.len == 0 or !std.mem.containsAtLeast(u8, collection, 1, ".")) { 794 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid collection NSID\"}"); 795 - return; 796 - } 797 - 798 - const limit_str = queryParam(query, "limit") orelse "500"; 799 - const limit = std.fmt.parseInt(usize, limit_str, 10) catch { 800 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid limit\"}"); 801 - return; 802 - }; 803 - if (limit < 1 or limit > 1000) { 804 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"limit must be 1..1000\"}"); 805 - return; 806 - } 807 - 808 - var cursor_buf: [256]u8 = undefined; 809 - const cursor_did = queryParamDecoded(query, "cursor", &cursor_buf); 810 - 811 - // scan collection index 812 - var did_buf: [65536]u8 = undefined; 813 - const result = ci.listReposByCollection(collection, limit, cursor_did, &did_buf) catch { 814 - respondJson(conn, .internal_server_error, "{\"error\":\"InternalError\",\"message\":\"index scan failed\"}"); 815 - return; 816 - }; 817 - 818 - // build JSON response 819 - var buf: [65536]u8 = undefined; 820 - var fbs = std.io.fixedBufferStream(&buf); 821 - const w = fbs.writer(); 822 - 823 - w.writeAll("{\"repos\":[") catch return; 824 - for (0..result.count) |i| { 825 - if (i > 0) w.writeByte(',') catch return; 826 - w.writeAll("{\"did\":\"") catch return; 827 - w.writeAll(result.getDid(i)) catch return; 828 - w.writeAll("\"}") catch return; 829 - } 830 - w.writeByte(']') catch return; 831 - 832 - if (result.last_did) |last| { 833 - if (result.count >= limit) { 834 - w.writeAll(",\"cursor\":\"") catch return; 835 - w.writeAll(last) catch return; 836 - w.writeAll("\"") catch return; 837 - } 838 - } 839 - 840 - w.writeByte('}') catch return; 841 - respondJson(conn, .ok, fbs.getWritten()); 842 - } 843 - 844 - fn handleListHosts(conn: *websocket.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { 845 - const cursor_str = queryParam(query, "cursor") orelse "0"; 846 - const limit_str = queryParam(query, "limit") orelse "200"; 847 - 848 - const cursor_val = std.fmt.parseInt(i64, cursor_str, 10) catch { 849 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid cursor\"}"); 850 - return; 851 - }; 852 - if (cursor_val < 0) { 853 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"cursor must be >= 0\"}"); 854 - return; 855 - } 856 - 857 - const limit = std.fmt.parseInt(i64, limit_str, 10) catch { 858 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid limit\"}"); 859 - return; 860 - }; 861 - if (limit < 1 or limit > 1000) { 862 - respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"limit must be 1..1000\"}"); 863 - return; 864 - } 865 - 866 - var result = persist.db.query( 867 - "SELECT id, hostname, status, last_seq FROM host WHERE id > $1 AND last_seq > 0 ORDER BY id ASC LIMIT $2", 868 - .{ cursor_val, limit }, 869 - ) catch { 870 - respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 871 - return; 872 - }; 873 - defer result.deinit(); 874 - 875 - var buf: [65536]u8 = undefined; 876 - var fbs = std.io.fixedBufferStream(&buf); 877 - const w = fbs.writer(); 878 - 879 - var count: i64 = 0; 880 - var last_id: i64 = 0; 881 - 882 - w.writeAll("{\"hosts\":[") catch return; 883 - 884 - while (result.nextUnsafe() catch null) |row| { 885 - if (count > 0) w.writeByte(',') catch return; 886 - 887 - const id = row.get(i64, 0); 888 - const hostname = row.get([]const u8, 1); 889 - const status = row.get([]const u8, 2); 890 - const seq = row.get(i64, 3); 891 - 892 - w.writeAll("{\"hostname\":\"") catch return; 893 - w.writeAll(hostname) catch return; 894 - w.writeAll("\"") catch return; 895 - std.fmt.format(w, ",\"seq\":{d}", .{seq}) catch return; 896 - w.writeAll(",\"status\":\"") catch return; 897 - w.writeAll(status) catch return; 898 - w.writeAll("\"}") catch return; 899 - 900 - last_id = id; 901 - count += 1; 902 - } 903 - 904 - w.writeByte(']') catch return; 905 - 906 - if (count >= limit and count > 1) { 907 - std.fmt.format(w, ",\"cursor\":\"{d}\"", .{last_id}) catch return; 908 - } 909 - 910 - w.writeByte('}') catch return; 911 - respondJson(conn, .ok, fbs.getWritten()); 912 - } 913 - 914 - fn handleGetHostStatus(conn: *websocket.Conn, query: []const u8, persist: *event_log_mod.DiskPersist) void { 915 - var hostname_buf: [256]u8 = undefined; 916 - const hostname = queryParamDecoded(query, "hostname", &hostname_buf) orelse { 917 - respondJson(conn, .bad_request, "{\"error\":\"InvalidRequest\",\"message\":\"hostname parameter required\"}"); 918 - return; 919 - }; 920 - 921 - // look up host 922 - var row = (persist.db.rowUnsafe( 923 - "SELECT id, hostname, status, last_seq FROM host WHERE hostname = $1", 924 - .{hostname}, 925 - ) catch { 926 - respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 927 - return; 928 - }) orelse { 929 - respondJson(conn, .bad_request, "{\"error\":\"HostNotFound\",\"message\":\"host not found\"}"); 930 - return; 931 - }; 932 - defer row.deinit() catch {}; 933 - 934 - const host_id = row.get(i64, 0); 935 - const host_name = row.get([]const u8, 1); 936 - const raw_status = row.get([]const u8, 2); 937 - const seq = row.get(i64, 3); 938 - 939 - // map internal status to lexicon hostStatus values 940 - const status = if (std.mem.eql(u8, raw_status, "blocked")) 941 - "banned" 942 - else if (std.mem.eql(u8, raw_status, "exhausted")) 943 - "offline" 944 - else 945 - raw_status; // active, idle pass through 946 - 947 - // count accounts on this host 948 - const account_count: i64 = if (persist.db.rowUnsafe( 949 - "SELECT COUNT(*) FROM account WHERE host_id = $1", 950 - .{host_id}, 951 - ) catch null) |cnt_row| blk: { 952 - var r = cnt_row; 953 - defer r.deinit() catch {}; 954 - break :blk r.get(i64, 0); 955 - } else 0; 956 - 957 - var buf: [4096]u8 = undefined; 958 - var fbs = std.io.fixedBufferStream(&buf); 959 - const w = fbs.writer(); 960 - 961 - w.writeAll("{\"hostname\":\"") catch return; 962 - w.writeAll(host_name) catch return; 963 - w.writeAll("\"") catch return; 964 - std.fmt.format(w, ",\"seq\":{d},\"accountCount\":{d}", .{ seq, account_count }) catch return; 965 - w.writeAll(",\"status\":\"") catch return; 966 - w.writeAll(status) catch return; 967 - w.writeAll("\"}") catch return; 968 - 969 - respondJson(conn, .ok, fbs.getWritten()); 970 - } 971 - 972 - /// build a CBOR #account frame for a takedown event. 973 - /// header: {op: 1, t: "#account"}, payload: {seq: 0, did: "...", time: "...", active: false, status: "takendown"} 974 - fn buildAccountFrame(allocator: std.mem.Allocator, did: []const u8) ?[]const u8 { 975 - const zat = @import("zat"); 976 - const cbor = zat.cbor; 977 - 978 - const header: cbor.Value = .{ .map = &.{ 979 - .{ .key = "op", .value = .{ .unsigned = 1 } }, 980 - .{ .key = "t", .value = .{ .text = "#account" } }, 981 - } }; 982 - 983 - var time_buf: [24]u8 = undefined; 984 - const time_str = formatTimestamp(&time_buf); 985 - 986 - const payload: cbor.Value = .{ .map = &.{ 987 - .{ .key = "seq", .value = .{ .unsigned = 0 } }, 988 - .{ .key = "did", .value = .{ .text = did } }, 989 - .{ .key = "time", .value = .{ .text = time_str } }, 990 - .{ .key = "active", .value = .{ .boolean = false } }, 991 - .{ .key = "status", .value = .{ .text = "takendown" } }, 992 - } }; 993 - 994 - const header_bytes = cbor.encodeAlloc(allocator, header) catch return null; 995 - const payload_bytes = cbor.encodeAlloc(allocator, payload) catch { 996 - allocator.free(header_bytes); 997 - return null; 998 - }; 999 - 1000 - var frame = allocator.alloc(u8, header_bytes.len + payload_bytes.len) catch { 1001 - allocator.free(header_bytes); 1002 - allocator.free(payload_bytes); 1003 - return null; 1004 - }; 1005 - @memcpy(frame[0..header_bytes.len], header_bytes); 1006 - @memcpy(frame[header_bytes.len..], payload_bytes); 1007 - 1008 - allocator.free(header_bytes); 1009 - allocator.free(payload_bytes); 1010 - 1011 - return frame; 1012 - } 1013 - 1014 - /// format current UTC time as ISO 8601 (YYYY-MM-DDTHH:MM:SSZ) 1015 - fn formatTimestamp(buf: *[24]u8) []const u8 { 1016 - const ts: u64 = @intCast(std.time.timestamp()); 1017 - const es = std.time.epoch.EpochSeconds{ .secs = ts }; 1018 - const day = es.getEpochDay(); 1019 - const yd = day.calculateYearDay(); 1020 - const md = yd.calculateMonthDay(); 1021 - const ds = es.getDaySeconds(); 1022 - 1023 - return std.fmt.bufPrint(buf, "{d:0>4}-{d:0>2}-{d:0>2}T{d:0>2}:{d:0>2}:{d:0>2}Z", .{ 1024 - yd.year, 1025 - @as(u32, @intFromEnum(md.month)) + 1, 1026 - @as(u32, md.day_index) + 1, 1027 - ds.getHoursIntoDay(), 1028 - ds.getMinutesIntoHour(), 1029 - ds.getSecondsIntoMinute(), 1030 - }) catch "1970-01-01T00:00:00Z"; 1031 - } 1032 - 1033 - // --- backfill handlers --- 1034 - 1035 - fn handleAdminBackfillTrigger(conn: *websocket.Conn, query: []const u8, headers: *const websocket.Handshake.KeyValue, backfiller: *backfill_mod.Backfiller) void { 1036 - if (!checkAdmin(conn, headers)) return; 1037 - 1038 - const source = queryParam(query, "source") orelse "bsky.network"; 1039 - 1040 - backfiller.start(source) catch |err| { 1041 - switch (err) { 1042 - error.AlreadyRunning => { 1043 - respondJson(conn, .conflict, "{\"error\":\"backfill already in progress\"}"); 1044 - }, 1045 - else => { 1046 - respondJson(conn, .internal_server_error, "{\"error\":\"failed to start backfill\"}"); 1047 - }, 1048 - } 1049 - return; 1050 - }; 1051 - 1052 - var buf: [256]u8 = undefined; 1053 - const body = std.fmt.bufPrint(&buf, "{{\"status\":\"started\",\"source\":\"{s}\"}}", .{source}) catch { 1054 - respondJson(conn, .ok, "{\"status\":\"started\"}"); 1055 - return; 1056 - }; 1057 - respondJson(conn, .ok, body); 1058 - } 1059 - 1060 - fn handleAdminBackfillStatus(conn: *websocket.Conn, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { 1061 - if (!checkAdmin(conn, headers)) return; 1062 - 1063 - const body = ctx.backfiller.getStatus(ctx.backfiller.allocator) catch { 1064 - respondJson(conn, .internal_server_error, "{\"error\":\"failed to query backfill status\"}"); 1065 - return; 1066 - }; 1067 - defer ctx.backfiller.allocator.free(body); 1068 - 1069 - respondJson(conn, .ok, body); 1070 - } 1071 - 1072 - // --- query string helpers --- 1073 - 1074 - fn queryParam(query: []const u8, name: []const u8) ?[]const u8 { 1075 - if (query.len == 0) return null; 1076 - var iter = std.mem.splitScalar(u8, query, '&'); 1077 - while (iter.next()) |pair| { 1078 - const eq = std.mem.indexOfScalar(u8, pair, '=') orelse continue; 1079 - if (std.mem.eql(u8, pair[0..eq], name)) { 1080 - return pair[eq + 1 ..]; 1081 - } 1082 - } 1083 - return null; 1084 - } 1085 - 1086 - /// like queryParam but percent-decodes the value into buf. 1087 - /// returns null if the param is missing, or a slice into buf with the decoded value. 1088 - fn queryParamDecoded(query: []const u8, name: []const u8, buf: []u8) ?[]const u8 { 1089 - const raw = queryParam(query, name) orelse return null; 1090 - var i: usize = 0; 1091 - var out: usize = 0; 1092 - while (i < raw.len) { 1093 - if (raw[i] == '%' and i + 2 < raw.len) { 1094 - const hi = hexVal(raw[i + 1]) orelse { 1095 - if (out >= buf.len) return null; 1096 - buf[out] = raw[i]; 1097 - out += 1; 1098 - i += 1; 1099 - continue; 1100 - }; 1101 - const lo = hexVal(raw[i + 2]) orelse { 1102 - if (out >= buf.len) return null; 1103 - buf[out] = raw[i]; 1104 - out += 1; 1105 - i += 1; 1106 - continue; 1107 - }; 1108 - if (out >= buf.len) return null; 1109 - buf[out] = (@as(u8, hi) << 4) | @as(u8, lo); 1110 - out += 1; 1111 - i += 3; 1112 - } else if (raw[i] == '+') { 1113 - if (out >= buf.len) return null; 1114 - buf[out] = ' '; 1115 - out += 1; 1116 - i += 1; 1117 - } else { 1118 - if (out >= buf.len) return null; 1119 - buf[out] = raw[i]; 1120 - out += 1; 1121 - i += 1; 1122 - } 1123 - } 1124 - return buf[0..out]; 1125 - } 1126 - 1127 - fn hexVal(c: u8) ?u4 { 1128 - return switch (c) { 1129 - '0'...'9' => @intCast(c - '0'), 1130 - 'a'...'f' => @intCast(c - 'a' + 10), 1131 - 'A'...'F' => @intCast(c - 'A' + 10), 1132 - else => null, 1133 - }; 1134 - } 1135 - 1136 - // --- response helpers (write raw HTTP to websocket.Conn) --- 1137 - 1138 - fn httpRespond(conn: *websocket.Conn, status: http.Status, content_type: []const u8, body: []const u8) void { 1139 - var buf: [512]u8 = undefined; 1140 - const header = std.fmt.bufPrint(&buf, "HTTP/1.1 {s}\r\nContent-Type: {s}\r\nContent-Length: {d}\r\nConnection: close\r\nServer: zlay\r\n\r\n", .{ 1141 - httpStatusLine(status), 1142 - content_type, 1143 - body.len, 1144 - }) catch return; 1145 - conn.writeFramed(header) catch return; 1146 - if (body.len > 0) conn.writeFramed(body) catch return; 1147 - } 1148 - 1149 - fn respondJson(conn: *websocket.Conn, status: http.Status, body: []const u8) void { 1150 - httpRespond(conn, status, "application/json", body); 1151 - } 1152 - 1153 - fn respondText(conn: *websocket.Conn, status: http.Status, body: []const u8) void { 1154 - httpRespond(conn, status, "text/plain", body); 1155 - } 1156 - 1157 - fn httpStatusLine(status: http.Status) []const u8 { 1158 - return switch (status) { 1159 - .ok => "200 OK", 1160 - .bad_request => "400 Bad Request", 1161 - .unauthorized => "401 Unauthorized", 1162 - .forbidden => "403 Forbidden", 1163 - .not_found => "404 Not Found", 1164 - .method_not_allowed => "405 Method Not Allowed", 1165 - .conflict => "409 Conflict", 1166 - .internal_server_error => "500 Internal Server Error", 1167 - else => "500 Internal Server Error", 1168 - }; 1169 260 } 1170 261 1171 262 fn parseEnvInt(comptime T: type, key: []const u8, default: T) T {