atproto relay implementation in zig
zlay.waow.tech
1//! zat relay — AT Protocol firehose relay server
2//!
3//! crawls PDS instances directly via the Slurper (one subscriber per host),
4//! validates frames via DID resolution and signature verification, persists
5//! to disk with relay-assigned seq numbers, and rebroadcasts to downstream
6//! consumers over WebSocket.
7//!
8//! port 3000 (RELAY_PORT): WebSocket firehose + HTTP API (via httpFallback)
9//! /xrpc/com.atproto.sync.subscribeRepos — firehose WebSocket (supports ?cursor=N)
10//! /xrpc/com.atproto.sync.listRepos — paginated account listing
11//! /xrpc/com.atproto.sync.getRepoStatus — single account status
12//! /xrpc/com.atproto.sync.getLatestCommit — latest commit CID + rev
13//! /xrpc/com.atproto.sync.listReposByCollection — repos with records in a collection
14//! /xrpc/com.atproto.sync.listHosts — paginated active host listing
15//! /xrpc/com.atproto.sync.getHostStatus — single host status
16//! /xrpc/com.atproto.sync.requestCrawl — request PDS crawl (POST)
17//! /admin/hosts — list all hosts (GET, admin)
18//! /admin/hosts/block — block a host (POST, admin)
19//! /admin/hosts/unblock — unblock a host (POST, admin)
20//! /_health, /_stats — health, stats
21//!
22//! port 3001 (RELAY_METRICS_PORT): internal metrics + health
23//! /metrics — prometheus metrics
24//! /_health — liveness probe (DB check)
25
26const std = @import("std");
27const Io = std.Io;
28const http = std.http;
29const websocket = @import("websocket");
30const broadcaster = @import("broadcaster.zig");
31const validator_mod = @import("validator.zig");
32const slurper_mod = @import("slurper.zig");
33const event_log_mod = @import("event_log.zig");
34const collection_index_mod = @import("collection_index.zig");
35const backfill_mod = @import("backfill.zig");
36const cleaner_mod = @import("cleaner.zig");
37const resync_mod = @import("resync.zig");
38const host_ops_mod = @import("host_ops.zig");
39const api = @import("api.zig");
40const build_options = @import("build_options");
41const malloc_trim: ?*const fn (pad: usize) callconv(.c) c_int = if (builtin.os.tag == .linux)
42 @extern(*const fn (pad: usize) callconv(.c) c_int, .{ .name = "malloc_trim" })
43else
44 null;
45
46const log = std.log.scoped(.relay);
47
48pub const default_stack_size = 8 * 1024 * 1024;
49
50// -- Io backend selection --
51// Io.Threaded: one OS thread per io.concurrent() call (~2,800 for PDS subscribers).
52// Higher thread count than Evented (~35) but proven stable at 99%+ coverage on 0.15.
53//
54// Evented (io_uring fibers) shelved: 8 crash classes from cross-Io violations,
55// ReleaseSafe GPF from fiber context-switch codegen bug, persistent ~10-15% coverage
56// degradation, zig team marks Evented as experimental. See docs/evented-attempt.md.
57// Can revisit when zig's Evented runtime matures.
58const Backend = Io.Threaded;
59
60var backend: Backend = undefined;
61var debug_threaded_io: Io.Threaded = undefined;
62
63/// override single-threaded debug_io default — required for std.debug.print safety
64/// when multiple OS threads exist (frame worker pool, websocket server).
65pub const std_options_debug_threaded_io: ?*Io.Threaded = &debug_threaded_io;
66
67var shutdown_flag: std.atomic.Value(bool) = .{ .raw = false };
68
69/// metrics-only server on the internal port
70const MetricsServer = struct {
71 server: Io.net.Server,
72 io: Io,
73 stats: *broadcaster.Stats,
74 validator: *validator_mod.Validator,
75 data_dir: []const u8,
76 persist: *event_log_mod.DiskPersist,
77 bc: *broadcaster.Broadcaster,
78 slurper: *slurper_mod.Slurper,
79
80 fn run(self: *MetricsServer) void {
81 while (!shutdown_flag.load(.acquire)) {
82 const stream = self.server.accept(self.io) catch |err| {
83 if (shutdown_flag.load(.acquire)) return;
84 log.debug("metrics accept error: {s}", .{@errorName(err)});
85 continue;
86 };
87 self.handleMetricsConn(stream);
88 }
89 }
90
91 fn handleMetricsConn(self: *MetricsServer, stream: Io.net.Stream) void {
92 defer stream.close(self.io);
93
94 var recv_buf: [4096]u8 = undefined;
95 var send_buf: [4096]u8 = undefined;
96 var connection_reader = stream.reader(self.io, &recv_buf);
97 var connection_writer = stream.writer(self.io, &send_buf);
98 var server = http.Server.init(&connection_reader.interface, &connection_writer.interface);
99
100 var request = server.receiveHead() catch return;
101 const path = request.head.target;
102
103 if (std.mem.eql(u8, path, "/_healthz")) {
104 // trivial liveness — constant-time, no dependencies
105 request.respond("{\"status\":\"ok\"}", .{ .status = .ok, .keep_alive = false, .extra_headers = &.{
106 .{ .name = "content-type", .value = "application/json" },
107 .{ .name = "server", .value = "zlay (atproto-relay)" },
108 } }) catch {};
109 } else if (std.mem.eql(u8, path, "/_health") or std.mem.eql(u8, path, "/_readyz")) {
110 // use atomic health flag — pg.Pool runs on Threaded, metrics server is Evented
111 const db_ok = self.persist.isDbHealthy();
112 const status: http.Status = if (db_ok) .ok else .internal_server_error;
113 const body = if (db_ok) "{\"status\":\"ok\"}" else "{\"status\":\"error\",\"msg\":\"database unavailable\"}";
114 request.respond(body, .{ .status = status, .keep_alive = false, .extra_headers = &.{
115 .{ .name = "content-type", .value = "application/json" },
116 .{ .name = "server", .value = "zlay (atproto-relay)" },
117 } }) catch {};
118 } else if (std.mem.eql(u8, path, "/metrics")) {
119 const cache_entries = self.validator.cacheSize();
120 const attribution = broadcaster.AttributionMetrics{
121 .history_entries = self.bc.history.count(),
122 .evtbuf_entries = self.persist.evtbufLen(),
123 .did_cache_entries = self.persist.didCacheLen(),
124 .resolve_queue_len = self.validator.resolveQueueLen(),
125 .resolve_queued_set_count = self.validator.resolveQueuedSetCount(),
126 .validator_cache_map_cap = self.validator.cacheMapCapacity(),
127 .did_cache_map_cap = self.persist.didCacheMapCap(),
128 .queued_set_map_cap = self.validator.resolveQueuedSetCapacity(),
129 .evtbuf_cap = self.persist.evtbufCap(),
130 .outbuf_cap = self.persist.outbufCap(),
131 .workers_count = self.slurper.workerCount(),
132 };
133
134 var metrics_buf: [65536]u8 = undefined;
135 const body = broadcaster.formatPrometheusMetrics(self.stats, cache_entries, attribution, self.data_dir, &metrics_buf, self.io);
136 request.respond(body, .{ .status = .ok, .keep_alive = false, .extra_headers = &.{
137 .{ .name = "content-type", .value = "text/plain; version=0.0.4; charset=utf-8" },
138 .{ .name = "server", .value = "zlay (atproto-relay)" },
139 } }) catch {};
140 } else {
141 request.respond("not found", .{ .status = .not_found, .keep_alive = false, .extra_headers = &.{
142 .{ .name = "content-type", .value = "text/plain" },
143 .{ .name = "server", .value = "zlay (atproto-relay)" },
144 } }) catch {};
145 }
146 }
147};
148
149pub fn main() !void {
150 // exp-002: optional GPA wrapper for leak detection.
151 // build with -Duse_gpa=true to enable. on clean shutdown (SIGTERM),
152 // GPA logs every allocation that was never freed, with stack traces.
153 var gpa: std.heap.DebugAllocator(.{
154 .stack_trace_frames = if (build_options.use_gpa) 8 else 0,
155 }) = .init;
156 defer if (build_options.use_gpa) {
157 log.info("GPA: checking for leaks...", .{});
158 const status = gpa.deinit();
159 if (status == .leak) {
160 log.err("GPA: leaks detected! see stderr for details", .{});
161 } else {
162 log.info("GPA: no leaks detected", .{});
163 }
164 };
165 const allocator = if (build_options.use_gpa) gpa.allocator() else std.heap.c_allocator;
166
167 // shared options for both debug and primary runtimes
168 const io_opts: Io.Threaded.InitOptions = .{
169 .stack_size = default_stack_size, // 8MB (default is 16MB)
170 .concurrent_limit = Io.Limit.limited(4096), // safety rail (steady-state ~2,800 hosts)
171 };
172
173 // init debug io (for std.debug.print thread safety)
174 debug_threaded_io = Io.Threaded.init(allocator, io_opts);
175
176 // init primary runtime (Evented: fibers for network orchestration)
177 if (Backend == Io.Threaded) {
178 backend = Io.Threaded.init(allocator, io_opts);
179 } else {
180 try Backend.init(&backend, allocator, .{});
181 }
182 const io = backend.io();
183
184 // dedicated Threaded runtime for the frame worker pool.
185 // historically needed to isolate Threaded work from Evented fibers (cross-Io
186 // crash class). now redundant since Backend is also Threaded, but harmless.
187 // used for: persist ordering mutex, timestamps, validator cache,
188 // DID resolution HTTP, and thread pool internal sync.
189 var pool_io_backend = Io.Threaded.init(allocator, .{
190 .stack_size = default_stack_size,
191 });
192 const pool_io = pool_io_backend.io();
193
194 log.info("io backend: {s}", .{if (Backend == Io.Threaded) "Threaded" else "Evented"});
195
196 // parse config from env
197 const port = parseEnvInt(u16, "RELAY_PORT", 3000);
198 const metrics_port = parseEnvInt(u16, "RELAY_METRICS_PORT", 3001);
199 const upstream = normalizeSeedHost(getenv("RELAY_UPSTREAM") orelse "bsky.network");
200 const data_dir = getenv("RELAY_DATA_DIR") orelse "data/events";
201 const retention_hours = parseEnvInt(u64, "RELAY_RETENTION_HOURS", 72);
202 const max_events_gb = parseEnvInt(u64, "RELAY_MAX_EVENTS_GB", 100);
203 const frame_workers = parseEnvInt(u16, "FRAME_WORKERS", 16);
204 const frame_queue_capacity = parseEnvInt(u16, "FRAME_QUEUE_CAPACITY", 4096);
205 const startup_batch_size = parseEnvInt(u16, "STARTUP_BATCH_SIZE", 50);
206 const db_pool_size = parseEnvInt(u16, "DB_POOL_SIZE", 20);
207
208 // install signal handlers (including SIGPIPE ignore)
209 installSignalHandlers();
210
211 // init components — pass io to network-facing modules
212 var bc = broadcaster.Broadcaster.init(allocator, io, &shutdown_flag);
213 defer bc.deinit();
214
215 // validator uses pool_io — its cache LRU and host resolvers are called from worker threads
216 var val = validator_mod.Validator.init(allocator, &bc.stats, pool_io);
217 defer val.deinit();
218 try val.start();
219
220 // init disk persistence (indigo-compatible diskpersist format + Postgres index)
221 const database_url = getenv("DATABASE_URL") orelse "postgres://relay:relay@localhost:5432/relay";
222 var dp = event_log_mod.DiskPersist.init(allocator, data_dir, database_url, db_pool_size, pool_io) catch |err| {
223 log.err("failed to init disk persist at {s}: {s}", .{ data_dir, @errorName(err) });
224 return err;
225 };
226 defer dp.deinit();
227 dp.retention_hours = retention_hours;
228 dp.max_dir_bytes = max_events_gb * 1024 * 1024 * 1024;
229
230 // DbRequestQueue — routes DB requests through pool_io worker threads.
231 // originally needed to bridge Evented fibers to Threaded pg.Pool; now
232 // redundant under all-Threaded but still functional. cleanup candidate.
233 var db_queue: event_log_mod.DbRequestQueue = .{
234 .shutdown = &shutdown_flag,
235 .persist = &dp,
236 };
237
238 if (dp.lastSeq()) |last| {
239 log.info("event log recovered: last_seq={d}", .{last});
240 }
241
242 // start flush thread
243 try dp.start();
244
245 // spawn 2 DbRequestQueue worker threads on pool_io
246 const db_worker_1 = std.Thread.spawn(.{}, event_log_mod.DbRequestQueue.run, .{ &db_queue, pool_io }) catch |err| {
247 log.err("failed to start db queue worker 1: {s}", .{@errorName(err)});
248 return err;
249 };
250 const db_worker_2 = std.Thread.spawn(.{}, event_log_mod.DbRequestQueue.run, .{ &db_queue, pool_io }) catch |err| {
251 log.err("failed to start db queue worker 2: {s}", .{@errorName(err)});
252 return err;
253 };
254
255 // wire persist into broadcaster for cursor replay and validator for migration checks
256 bc.persist = &dp;
257 bc.db_queue = &db_queue;
258 val.persist = &dp;
259
260 // init collection index (RocksDB — inspired by lightrail/microcosm.blue)
261 const ci_dir = getenv("COLLECTION_INDEX_DIR") orelse "data/collection-index";
262 var ci = collection_index_mod.CollectionIndex.open(allocator, ci_dir) catch |err| {
263 log.err("failed to init collection index at {s}: {s}", .{ ci_dir, @errorName(err) });
264 return err;
265 };
266 defer ci.deinit();
267
268 // init backfiller (collection index backfill from source relay)
269 // uses pool_io (Threaded) — backfiller spawns std.Thread, DNS works
270 var backfiller = backfill_mod.Backfiller.init(allocator, &ci, &dp, pool_io, &shutdown_flag);
271
272 // init cleaner (removes stale entries from collection index)
273 // uses pool_io (Threaded) — cleaner spawns std.Thread, checks shutdown
274 var cleaner = cleaner_mod.Cleaner.init(allocator, pool_io, &ci, &dp, &shutdown_flag);
275
276 // init resyncer (updates collection index on #sync events)
277 // runs on pool_io — enqueue() is called from frame worker threads,
278 // background worker is a plain std.Thread.
279 var resyncer = resync_mod.Resyncer.init(allocator, pool_io, &ci);
280 try resyncer.start();
281 defer resyncer.deinit();
282
283 // init cursor map + host ops queue — subscriber threads write cursor seqs
284 // to the coalescing map (atomic store, no lock) and push rare ops (failures,
285 // status) to the MPSC queue. a background thread sweeps cursors every 5s
286 // and drains rare ops immediately.
287 var cursor_map: host_ops_mod.CursorMap = .{};
288 var host_ops_queue: host_ops_mod.HostOpsQueue = .{
289 .persist = &dp,
290 .cursor_map = &cursor_map,
291 .shutdown = &shutdown_flag,
292 .max_consecutive_failures = 15,
293 .bc = &bc,
294 };
295 const host_ops_thread = std.Thread.spawn(.{}, host_ops_mod.HostOpsQueue.run, .{ &host_ops_queue, pool_io }) catch |err| {
296 log.err("failed to start host ops thread: {s}", .{@errorName(err)});
297 return err;
298 };
299
300 // init slurper (multi-host crawl manager)
301 var slurper = slurper_mod.Slurper.init(
302 allocator,
303 &bc,
304 &val,
305 &dp,
306 &shutdown_flag,
307 .{
308 .seed_host = upstream,
309 .max_message_size = 5 * 1024 * 1024,
310 .frame_workers = frame_workers,
311 .frame_queue_capacity = frame_queue_capacity,
312 .startup_batch_size = startup_batch_size,
313 },
314 io,
315 pool_io,
316 );
317 defer slurper.deinit();
318 slurper.collection_index = &ci;
319 slurper.resyncer = &resyncer;
320 slurper.host_ops = &host_ops_queue;
321 slurper.cursor_map = &cursor_map;
322 slurper.db_queue = &db_queue;
323
324 // start: loads active hosts from DB, spawns subscriber threads.
325 // pullHosts runs on its own std.Thread (parallel, not gating spawnWorkers).
326 try slurper.start();
327
328 // spawn pullHosts on a separate thread — runs in parallel with spawnWorkers,
329 // uses pool_io for HTTP (DNS works), writes to DB via Threaded pool directly.
330 const pull_hosts_thread = if (slurper.options.seed_host.len > 0) blk: {
331 log.info("pulling hosts from {s} (background thread)...", .{slurper.options.seed_host});
332 break :blk std.Thread.spawn(.{}, slurper_mod.Slurper.pullHosts, .{&slurper}) catch |err| {
333 log.warn("failed to spawn pullHosts thread: {s}", .{@errorName(err)});
334 break :blk null;
335 };
336 } else blk: {
337 log.info("no seed host configured, skipping bootstrap", .{});
338 break :blk null;
339 };
340
341 // start broadcast loop — drains broadcast queue, owns all consumer state.
342 // frame workers push results to the queue, this thread does the fan-out.
343 var broadcast_future = try io.concurrent(broadcaster.Broadcaster.runBroadcastLoop, .{&bc});
344 defer _ = broadcast_future.cancel(io);
345
346 // start GC loop on a plain thread — dp.gc() uses pool_io mutex and pg.Pool.
347 const gc_thread = std.Thread.spawn(.{}, gcLoop, .{ &dp, pool_io }) catch |err| {
348 log.err("failed to start GC thread: {s}", .{@errorName(err)});
349 return err;
350 };
351
352 // wire HTTP fallback into broadcaster (all API endpoints served on WS port)
353 var http_context = api.HttpContext{
354 .stats = &bc.stats,
355 .persist = &dp,
356 .slurper = &slurper,
357 .collection_index = &ci,
358 .backfiller = &backfiller,
359 .cleaner = &cleaner,
360 .resyncer = &resyncer,
361 .bc = &bc,
362 .validator = &val,
363 .host_ops = &host_ops_queue,
364 .pool_io = pool_io,
365 .io = io,
366 .db_queue = &db_queue,
367 .shutdown = &shutdown_flag,
368 };
369 bc.http_fallback = api.handleHttpRequest;
370 bc.http_fallback_ctx = @ptrCast(&http_context);
371
372 // start metrics-only server (internal port)
373 const metrics_address = Io.net.Ip4Address.unspecified(metrics_port);
374 var metrics_srv = MetricsServer{
375 .server = (Io.net.IpAddress{ .ip4 = metrics_address }).listen(io, .{ .reuse_address = true }) catch |err| {
376 log.err("metrics server failed to listen on :{d}: {s}", .{ metrics_port, @errorName(err) });
377 return err;
378 },
379 .io = io,
380 .stats = &bc.stats,
381 .validator = &val,
382 .data_dir = data_dir,
383 .persist = &dp,
384 .bc = &bc,
385 .slurper = &slurper,
386 };
387 var metrics_future = try io.concurrent(MetricsServer.run, .{&metrics_srv});
388 defer _ = metrics_future.cancel(io);
389
390 // start downstream WebSocket server (also serves HTTP API via httpFallback)
391 log.info("relay listening on :{d} (ws+http), :{d} (metrics)", .{ port, metrics_port });
392 log.info("seed host: {s}", .{upstream});
393 log.info("data dir: {s} (retention: {d}h, max: {d} GB)", .{ data_dir, retention_hours, max_events_gb });
394
395 var server = try websocket.Server(broadcaster.Handler).init(allocator, io, .{
396 .port = port,
397 .address = "0.0.0.0",
398 .max_conn = 4096,
399 .max_message_size = 5 * 1024 * 1024,
400 });
401 defer server.deinit();
402
403 // Io-native accept loop: fiber-based under Evented, thread-based under Threaded
404 const ws_address = Io.net.Ip4Address.unspecified(port);
405 var ws_listener = (Io.net.IpAddress{ .ip4 = ws_address }).listen(io, .{ .reuse_address = true }) catch |err| {
406 log.err("websocket server failed to listen on :{d}: {s}", .{ port, @errorName(err) });
407 return err;
408 };
409 var server_future = try io.concurrent(runWsServer, .{ &server, &ws_listener, &bc });
410 defer _ = server_future.cancel(io);
411
412 // wait for shutdown signal
413 while (!shutdown_flag.load(.acquire)) {
414 io.sleep(Io.Duration.fromMilliseconds(100), .awake) catch break;
415 }
416
417 log.info("shutdown signal received, stopping...", .{});
418
419 // stop WebSocket server: close listener to unblock accept, then cancel task
420 ws_listener.deinit(io);
421 server_future.cancel(io);
422
423 // join GC thread — it checks shutdown_flag and will exit its sleep loop.
424 // must complete before dp.deinit() runs (dp is stack-owned).
425 gc_thread.join();
426
427 // join pullHosts thread if running
428 if (pull_hosts_thread) |t| t.join();
429
430 // join db queue workers — shutdown flag already set, they will drain and exit
431 db_worker_1.join();
432 db_worker_2.join();
433
434 // join host ops thread — drains remaining ops before dp.deinit()
435 host_ops_thread.join();
436
437 // join backfiller/cleaner threads if running — they touch dp and ci
438 backfiller.waitForCompletion();
439 cleaner.waitForCompletion();
440
441 // cancel broadcaster fiber (shutdown flag already set, it will drain remaining)
442 broadcast_future.cancel(io);
443
444 // close metrics listener to unblock accept(), then cancel task
445 metrics_srv.server.deinit(io);
446 metrics_future.cancel(io);
447
448 log.info("relay stopped cleanly", .{});
449}
450
451const builtin = @import("builtin");
452
453/// concrete wrapper for runIo — io.concurrent needs ArgsTuple, which can't handle anytype
454fn runWsServer(server: *websocket.Server(broadcaster.Handler), listener: *Io.net.Server, bc: *broadcaster.Broadcaster) void {
455 server.runIo(listener, bc);
456}
457
458fn gcLoop(dp: *event_log_mod.DiskPersist, io: Io) void {
459 const gc_interval: u64 = 10 * 60; // 10 minutes in seconds
460 while (!shutdown_flag.load(.acquire)) {
461 // sleep in small increments to check shutdown
462 var remaining: u64 = gc_interval;
463 while (remaining > 0 and !shutdown_flag.load(.acquire)) {
464 const chunk = @min(remaining, 1);
465 io.sleep(Io.Duration.fromSeconds(@intCast(chunk)), .awake) catch return;
466 remaining -= chunk;
467 }
468 if (shutdown_flag.load(.acquire)) return;
469
470 dp.gc() catch |err| {
471 log.warn("event log GC failed: {s}", .{@errorName(err)});
472 };
473
474 // return freed pages to OS (glibc-specific, no-op on other platforms)
475 if (comptime malloc_trim) |trim| {
476 _ = trim(0);
477 log.info("gc: malloc_trim complete", .{});
478 }
479 }
480}
481
482fn signalHandler(_: std.posix.SIG) callconv(.c) void {
483 shutdown_flag.store(true, .release);
484}
485
486fn installSignalHandlers() void {
487 const act: std.posix.Sigaction = .{
488 .handler = .{ .handler = signalHandler },
489 .mask = std.posix.sigemptyset(),
490 .flags = 0,
491 };
492 std.posix.sigaction(std.posix.SIG.INT, &act, null);
493 std.posix.sigaction(std.posix.SIG.TERM, &act, null);
494
495 // ignore SIGPIPE — writing to disconnected consumers must not crash the process
496 const ignore_act: std.posix.Sigaction = .{
497 .handler = .{ .handler = std.posix.SIG.IGN },
498 .mask = std.posix.sigemptyset(),
499 .flags = 0,
500 };
501 std.posix.sigaction(std.posix.SIG.PIPE, &ignore_act, null);
502}
503
504/// normalize seed host: empty string or "none" means disabled (no bootstrap).
505fn normalizeSeedHost(raw: []const u8) []const u8 {
506 if (raw.len == 0 or std.mem.eql(u8, raw, "none")) return "";
507 return raw;
508}
509
510/// libc getenv — std.posix.getenv removed in 0.16
511fn getenv(key: [*:0]const u8) ?[]const u8 {
512 const ptr = std.c.getenv(key) orelse return null;
513 return std.mem.sliceTo(ptr, 0);
514}
515
516fn parseEnvInt(comptime T: type, key: [*:0]const u8, default: T) T {
517 const val = getenv(key) orelse return default;
518 return std.fmt.parseInt(T, val, 10) catch default;
519}
520
521test "normalizeSeedHost" {
522 // normal hostnames pass through
523 try std.testing.expectEqualStrings("bsky.network", normalizeSeedHost("bsky.network"));
524 try std.testing.expectEqualStrings("relay.example.com", normalizeSeedHost("relay.example.com"));
525
526 // empty string and "none" disable bootstrap
527 try std.testing.expectEqualStrings("", normalizeSeedHost(""));
528 try std.testing.expectEqualStrings("", normalizeSeedHost("none"));
529}