atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 529 lines 23 kB view raw
1//! zat relay — AT Protocol firehose relay server 2//! 3//! crawls PDS instances directly via the Slurper (one subscriber per host), 4//! validates frames via DID resolution and signature verification, persists 5//! to disk with relay-assigned seq numbers, and rebroadcasts to downstream 6//! consumers over WebSocket. 7//! 8//! port 3000 (RELAY_PORT): WebSocket firehose + HTTP API (via httpFallback) 9//! /xrpc/com.atproto.sync.subscribeRepos — firehose WebSocket (supports ?cursor=N) 10//! /xrpc/com.atproto.sync.listRepos — paginated account listing 11//! /xrpc/com.atproto.sync.getRepoStatus — single account status 12//! /xrpc/com.atproto.sync.getLatestCommit — latest commit CID + rev 13//! /xrpc/com.atproto.sync.listReposByCollection — repos with records in a collection 14//! /xrpc/com.atproto.sync.listHosts — paginated active host listing 15//! /xrpc/com.atproto.sync.getHostStatus — single host status 16//! /xrpc/com.atproto.sync.requestCrawl — request PDS crawl (POST) 17//! /admin/hosts — list all hosts (GET, admin) 18//! /admin/hosts/block — block a host (POST, admin) 19//! /admin/hosts/unblock — unblock a host (POST, admin) 20//! /_health, /_stats — health, stats 21//! 22//! port 3001 (RELAY_METRICS_PORT): internal metrics + health 23//! /metrics — prometheus metrics 24//! /_health — liveness probe (DB check) 25 26const std = @import("std"); 27const Io = std.Io; 28const http = std.http; 29const websocket = @import("websocket"); 30const broadcaster = @import("broadcaster.zig"); 31const validator_mod = @import("validator.zig"); 32const slurper_mod = @import("slurper.zig"); 33const event_log_mod = @import("event_log.zig"); 34const collection_index_mod = @import("collection_index.zig"); 35const backfill_mod = @import("backfill.zig"); 36const cleaner_mod = @import("cleaner.zig"); 37const resync_mod = @import("resync.zig"); 38const host_ops_mod = @import("host_ops.zig"); 39const api = @import("api.zig"); 40const build_options = @import("build_options"); 41const malloc_trim: ?*const fn (pad: usize) callconv(.c) c_int = if (builtin.os.tag == .linux) 42 @extern(*const fn (pad: usize) callconv(.c) c_int, .{ .name = "malloc_trim" }) 43else 44 null; 45 46const log = std.log.scoped(.relay); 47 48pub const default_stack_size = 8 * 1024 * 1024; 49 50// -- Io backend selection -- 51// Io.Threaded: one OS thread per io.concurrent() call (~2,800 for PDS subscribers). 52// Higher thread count than Evented (~35) but proven stable at 99%+ coverage on 0.15. 53// 54// Evented (io_uring fibers) shelved: 8 crash classes from cross-Io violations, 55// ReleaseSafe GPF from fiber context-switch codegen bug, persistent ~10-15% coverage 56// degradation, zig team marks Evented as experimental. See docs/evented-attempt.md. 57// Can revisit when zig's Evented runtime matures. 58const Backend = Io.Threaded; 59 60var backend: Backend = undefined; 61var debug_threaded_io: Io.Threaded = undefined; 62 63/// override single-threaded debug_io default — required for std.debug.print safety 64/// when multiple OS threads exist (frame worker pool, websocket server). 65pub const std_options_debug_threaded_io: ?*Io.Threaded = &debug_threaded_io; 66 67var shutdown_flag: std.atomic.Value(bool) = .{ .raw = false }; 68 69/// metrics-only server on the internal port 70const MetricsServer = struct { 71 server: Io.net.Server, 72 io: Io, 73 stats: *broadcaster.Stats, 74 validator: *validator_mod.Validator, 75 data_dir: []const u8, 76 persist: *event_log_mod.DiskPersist, 77 bc: *broadcaster.Broadcaster, 78 slurper: *slurper_mod.Slurper, 79 80 fn run(self: *MetricsServer) void { 81 while (!shutdown_flag.load(.acquire)) { 82 const stream = self.server.accept(self.io) catch |err| { 83 if (shutdown_flag.load(.acquire)) return; 84 log.debug("metrics accept error: {s}", .{@errorName(err)}); 85 continue; 86 }; 87 self.handleMetricsConn(stream); 88 } 89 } 90 91 fn handleMetricsConn(self: *MetricsServer, stream: Io.net.Stream) void { 92 defer stream.close(self.io); 93 94 var recv_buf: [4096]u8 = undefined; 95 var send_buf: [4096]u8 = undefined; 96 var connection_reader = stream.reader(self.io, &recv_buf); 97 var connection_writer = stream.writer(self.io, &send_buf); 98 var server = http.Server.init(&connection_reader.interface, &connection_writer.interface); 99 100 var request = server.receiveHead() catch return; 101 const path = request.head.target; 102 103 if (std.mem.eql(u8, path, "/_healthz")) { 104 // trivial liveness — constant-time, no dependencies 105 request.respond("{\"status\":\"ok\"}", .{ .status = .ok, .keep_alive = false, .extra_headers = &.{ 106 .{ .name = "content-type", .value = "application/json" }, 107 .{ .name = "server", .value = "zlay (atproto-relay)" }, 108 } }) catch {}; 109 } else if (std.mem.eql(u8, path, "/_health") or std.mem.eql(u8, path, "/_readyz")) { 110 // use atomic health flag — pg.Pool runs on Threaded, metrics server is Evented 111 const db_ok = self.persist.isDbHealthy(); 112 const status: http.Status = if (db_ok) .ok else .internal_server_error; 113 const body = if (db_ok) "{\"status\":\"ok\"}" else "{\"status\":\"error\",\"msg\":\"database unavailable\"}"; 114 request.respond(body, .{ .status = status, .keep_alive = false, .extra_headers = &.{ 115 .{ .name = "content-type", .value = "application/json" }, 116 .{ .name = "server", .value = "zlay (atproto-relay)" }, 117 } }) catch {}; 118 } else if (std.mem.eql(u8, path, "/metrics")) { 119 const cache_entries = self.validator.cacheSize(); 120 const attribution = broadcaster.AttributionMetrics{ 121 .history_entries = self.bc.history.count(), 122 .evtbuf_entries = self.persist.evtbufLen(), 123 .did_cache_entries = self.persist.didCacheLen(), 124 .resolve_queue_len = self.validator.resolveQueueLen(), 125 .resolve_queued_set_count = self.validator.resolveQueuedSetCount(), 126 .validator_cache_map_cap = self.validator.cacheMapCapacity(), 127 .did_cache_map_cap = self.persist.didCacheMapCap(), 128 .queued_set_map_cap = self.validator.resolveQueuedSetCapacity(), 129 .evtbuf_cap = self.persist.evtbufCap(), 130 .outbuf_cap = self.persist.outbufCap(), 131 .workers_count = self.slurper.workerCount(), 132 }; 133 134 var metrics_buf: [65536]u8 = undefined; 135 const body = broadcaster.formatPrometheusMetrics(self.stats, cache_entries, attribution, self.data_dir, &metrics_buf, self.io); 136 request.respond(body, .{ .status = .ok, .keep_alive = false, .extra_headers = &.{ 137 .{ .name = "content-type", .value = "text/plain; version=0.0.4; charset=utf-8" }, 138 .{ .name = "server", .value = "zlay (atproto-relay)" }, 139 } }) catch {}; 140 } else { 141 request.respond("not found", .{ .status = .not_found, .keep_alive = false, .extra_headers = &.{ 142 .{ .name = "content-type", .value = "text/plain" }, 143 .{ .name = "server", .value = "zlay (atproto-relay)" }, 144 } }) catch {}; 145 } 146 } 147}; 148 149pub fn main() !void { 150 // exp-002: optional GPA wrapper for leak detection. 151 // build with -Duse_gpa=true to enable. on clean shutdown (SIGTERM), 152 // GPA logs every allocation that was never freed, with stack traces. 153 var gpa: std.heap.DebugAllocator(.{ 154 .stack_trace_frames = if (build_options.use_gpa) 8 else 0, 155 }) = .init; 156 defer if (build_options.use_gpa) { 157 log.info("GPA: checking for leaks...", .{}); 158 const status = gpa.deinit(); 159 if (status == .leak) { 160 log.err("GPA: leaks detected! see stderr for details", .{}); 161 } else { 162 log.info("GPA: no leaks detected", .{}); 163 } 164 }; 165 const allocator = if (build_options.use_gpa) gpa.allocator() else std.heap.c_allocator; 166 167 // shared options for both debug and primary runtimes 168 const io_opts: Io.Threaded.InitOptions = .{ 169 .stack_size = default_stack_size, // 8MB (default is 16MB) 170 .concurrent_limit = Io.Limit.limited(4096), // safety rail (steady-state ~2,800 hosts) 171 }; 172 173 // init debug io (for std.debug.print thread safety) 174 debug_threaded_io = Io.Threaded.init(allocator, io_opts); 175 176 // init primary runtime (Evented: fibers for network orchestration) 177 if (Backend == Io.Threaded) { 178 backend = Io.Threaded.init(allocator, io_opts); 179 } else { 180 try Backend.init(&backend, allocator, .{}); 181 } 182 const io = backend.io(); 183 184 // dedicated Threaded runtime for the frame worker pool. 185 // historically needed to isolate Threaded work from Evented fibers (cross-Io 186 // crash class). now redundant since Backend is also Threaded, but harmless. 187 // used for: persist ordering mutex, timestamps, validator cache, 188 // DID resolution HTTP, and thread pool internal sync. 189 var pool_io_backend = Io.Threaded.init(allocator, .{ 190 .stack_size = default_stack_size, 191 }); 192 const pool_io = pool_io_backend.io(); 193 194 log.info("io backend: {s}", .{if (Backend == Io.Threaded) "Threaded" else "Evented"}); 195 196 // parse config from env 197 const port = parseEnvInt(u16, "RELAY_PORT", 3000); 198 const metrics_port = parseEnvInt(u16, "RELAY_METRICS_PORT", 3001); 199 const upstream = normalizeSeedHost(getenv("RELAY_UPSTREAM") orelse "bsky.network"); 200 const data_dir = getenv("RELAY_DATA_DIR") orelse "data/events"; 201 const retention_hours = parseEnvInt(u64, "RELAY_RETENTION_HOURS", 72); 202 const max_events_gb = parseEnvInt(u64, "RELAY_MAX_EVENTS_GB", 100); 203 const frame_workers = parseEnvInt(u16, "FRAME_WORKERS", 16); 204 const frame_queue_capacity = parseEnvInt(u16, "FRAME_QUEUE_CAPACITY", 4096); 205 const startup_batch_size = parseEnvInt(u16, "STARTUP_BATCH_SIZE", 50); 206 const db_pool_size = parseEnvInt(u16, "DB_POOL_SIZE", 20); 207 208 // install signal handlers (including SIGPIPE ignore) 209 installSignalHandlers(); 210 211 // init components — pass io to network-facing modules 212 var bc = broadcaster.Broadcaster.init(allocator, io, &shutdown_flag); 213 defer bc.deinit(); 214 215 // validator uses pool_io — its cache LRU and host resolvers are called from worker threads 216 var val = validator_mod.Validator.init(allocator, &bc.stats, pool_io); 217 defer val.deinit(); 218 try val.start(); 219 220 // init disk persistence (indigo-compatible diskpersist format + Postgres index) 221 const database_url = getenv("DATABASE_URL") orelse "postgres://relay:relay@localhost:5432/relay"; 222 var dp = event_log_mod.DiskPersist.init(allocator, data_dir, database_url, db_pool_size, pool_io) catch |err| { 223 log.err("failed to init disk persist at {s}: {s}", .{ data_dir, @errorName(err) }); 224 return err; 225 }; 226 defer dp.deinit(); 227 dp.retention_hours = retention_hours; 228 dp.max_dir_bytes = max_events_gb * 1024 * 1024 * 1024; 229 230 // DbRequestQueue — routes DB requests through pool_io worker threads. 231 // originally needed to bridge Evented fibers to Threaded pg.Pool; now 232 // redundant under all-Threaded but still functional. cleanup candidate. 233 var db_queue: event_log_mod.DbRequestQueue = .{ 234 .shutdown = &shutdown_flag, 235 .persist = &dp, 236 }; 237 238 if (dp.lastSeq()) |last| { 239 log.info("event log recovered: last_seq={d}", .{last}); 240 } 241 242 // start flush thread 243 try dp.start(); 244 245 // spawn 2 DbRequestQueue worker threads on pool_io 246 const db_worker_1 = std.Thread.spawn(.{}, event_log_mod.DbRequestQueue.run, .{ &db_queue, pool_io }) catch |err| { 247 log.err("failed to start db queue worker 1: {s}", .{@errorName(err)}); 248 return err; 249 }; 250 const db_worker_2 = std.Thread.spawn(.{}, event_log_mod.DbRequestQueue.run, .{ &db_queue, pool_io }) catch |err| { 251 log.err("failed to start db queue worker 2: {s}", .{@errorName(err)}); 252 return err; 253 }; 254 255 // wire persist into broadcaster for cursor replay and validator for migration checks 256 bc.persist = &dp; 257 bc.db_queue = &db_queue; 258 val.persist = &dp; 259 260 // init collection index (RocksDB — inspired by lightrail/microcosm.blue) 261 const ci_dir = getenv("COLLECTION_INDEX_DIR") orelse "data/collection-index"; 262 var ci = collection_index_mod.CollectionIndex.open(allocator, ci_dir) catch |err| { 263 log.err("failed to init collection index at {s}: {s}", .{ ci_dir, @errorName(err) }); 264 return err; 265 }; 266 defer ci.deinit(); 267 268 // init backfiller (collection index backfill from source relay) 269 // uses pool_io (Threaded) — backfiller spawns std.Thread, DNS works 270 var backfiller = backfill_mod.Backfiller.init(allocator, &ci, &dp, pool_io, &shutdown_flag); 271 272 // init cleaner (removes stale entries from collection index) 273 // uses pool_io (Threaded) — cleaner spawns std.Thread, checks shutdown 274 var cleaner = cleaner_mod.Cleaner.init(allocator, pool_io, &ci, &dp, &shutdown_flag); 275 276 // init resyncer (updates collection index on #sync events) 277 // runs on pool_io — enqueue() is called from frame worker threads, 278 // background worker is a plain std.Thread. 279 var resyncer = resync_mod.Resyncer.init(allocator, pool_io, &ci); 280 try resyncer.start(); 281 defer resyncer.deinit(); 282 283 // init cursor map + host ops queue — subscriber threads write cursor seqs 284 // to the coalescing map (atomic store, no lock) and push rare ops (failures, 285 // status) to the MPSC queue. a background thread sweeps cursors every 5s 286 // and drains rare ops immediately. 287 var cursor_map: host_ops_mod.CursorMap = .{}; 288 var host_ops_queue: host_ops_mod.HostOpsQueue = .{ 289 .persist = &dp, 290 .cursor_map = &cursor_map, 291 .shutdown = &shutdown_flag, 292 .max_consecutive_failures = 15, 293 .bc = &bc, 294 }; 295 const host_ops_thread = std.Thread.spawn(.{}, host_ops_mod.HostOpsQueue.run, .{ &host_ops_queue, pool_io }) catch |err| { 296 log.err("failed to start host ops thread: {s}", .{@errorName(err)}); 297 return err; 298 }; 299 300 // init slurper (multi-host crawl manager) 301 var slurper = slurper_mod.Slurper.init( 302 allocator, 303 &bc, 304 &val, 305 &dp, 306 &shutdown_flag, 307 .{ 308 .seed_host = upstream, 309 .max_message_size = 5 * 1024 * 1024, 310 .frame_workers = frame_workers, 311 .frame_queue_capacity = frame_queue_capacity, 312 .startup_batch_size = startup_batch_size, 313 }, 314 io, 315 pool_io, 316 ); 317 defer slurper.deinit(); 318 slurper.collection_index = &ci; 319 slurper.resyncer = &resyncer; 320 slurper.host_ops = &host_ops_queue; 321 slurper.cursor_map = &cursor_map; 322 slurper.db_queue = &db_queue; 323 324 // start: loads active hosts from DB, spawns subscriber threads. 325 // pullHosts runs on its own std.Thread (parallel, not gating spawnWorkers). 326 try slurper.start(); 327 328 // spawn pullHosts on a separate thread — runs in parallel with spawnWorkers, 329 // uses pool_io for HTTP (DNS works), writes to DB via Threaded pool directly. 330 const pull_hosts_thread = if (slurper.options.seed_host.len > 0) blk: { 331 log.info("pulling hosts from {s} (background thread)...", .{slurper.options.seed_host}); 332 break :blk std.Thread.spawn(.{}, slurper_mod.Slurper.pullHosts, .{&slurper}) catch |err| { 333 log.warn("failed to spawn pullHosts thread: {s}", .{@errorName(err)}); 334 break :blk null; 335 }; 336 } else blk: { 337 log.info("no seed host configured, skipping bootstrap", .{}); 338 break :blk null; 339 }; 340 341 // start broadcast loop — drains broadcast queue, owns all consumer state. 342 // frame workers push results to the queue, this thread does the fan-out. 343 var broadcast_future = try io.concurrent(broadcaster.Broadcaster.runBroadcastLoop, .{&bc}); 344 defer _ = broadcast_future.cancel(io); 345 346 // start GC loop on a plain thread — dp.gc() uses pool_io mutex and pg.Pool. 347 const gc_thread = std.Thread.spawn(.{}, gcLoop, .{ &dp, pool_io }) catch |err| { 348 log.err("failed to start GC thread: {s}", .{@errorName(err)}); 349 return err; 350 }; 351 352 // wire HTTP fallback into broadcaster (all API endpoints served on WS port) 353 var http_context = api.HttpContext{ 354 .stats = &bc.stats, 355 .persist = &dp, 356 .slurper = &slurper, 357 .collection_index = &ci, 358 .backfiller = &backfiller, 359 .cleaner = &cleaner, 360 .resyncer = &resyncer, 361 .bc = &bc, 362 .validator = &val, 363 .host_ops = &host_ops_queue, 364 .pool_io = pool_io, 365 .io = io, 366 .db_queue = &db_queue, 367 .shutdown = &shutdown_flag, 368 }; 369 bc.http_fallback = api.handleHttpRequest; 370 bc.http_fallback_ctx = @ptrCast(&http_context); 371 372 // start metrics-only server (internal port) 373 const metrics_address = Io.net.Ip4Address.unspecified(metrics_port); 374 var metrics_srv = MetricsServer{ 375 .server = (Io.net.IpAddress{ .ip4 = metrics_address }).listen(io, .{ .reuse_address = true }) catch |err| { 376 log.err("metrics server failed to listen on :{d}: {s}", .{ metrics_port, @errorName(err) }); 377 return err; 378 }, 379 .io = io, 380 .stats = &bc.stats, 381 .validator = &val, 382 .data_dir = data_dir, 383 .persist = &dp, 384 .bc = &bc, 385 .slurper = &slurper, 386 }; 387 var metrics_future = try io.concurrent(MetricsServer.run, .{&metrics_srv}); 388 defer _ = metrics_future.cancel(io); 389 390 // start downstream WebSocket server (also serves HTTP API via httpFallback) 391 log.info("relay listening on :{d} (ws+http), :{d} (metrics)", .{ port, metrics_port }); 392 log.info("seed host: {s}", .{upstream}); 393 log.info("data dir: {s} (retention: {d}h, max: {d} GB)", .{ data_dir, retention_hours, max_events_gb }); 394 395 var server = try websocket.Server(broadcaster.Handler).init(allocator, io, .{ 396 .port = port, 397 .address = "0.0.0.0", 398 .max_conn = 4096, 399 .max_message_size = 5 * 1024 * 1024, 400 }); 401 defer server.deinit(); 402 403 // Io-native accept loop: fiber-based under Evented, thread-based under Threaded 404 const ws_address = Io.net.Ip4Address.unspecified(port); 405 var ws_listener = (Io.net.IpAddress{ .ip4 = ws_address }).listen(io, .{ .reuse_address = true }) catch |err| { 406 log.err("websocket server failed to listen on :{d}: {s}", .{ port, @errorName(err) }); 407 return err; 408 }; 409 var server_future = try io.concurrent(runWsServer, .{ &server, &ws_listener, &bc }); 410 defer _ = server_future.cancel(io); 411 412 // wait for shutdown signal 413 while (!shutdown_flag.load(.acquire)) { 414 io.sleep(Io.Duration.fromMilliseconds(100), .awake) catch break; 415 } 416 417 log.info("shutdown signal received, stopping...", .{}); 418 419 // stop WebSocket server: close listener to unblock accept, then cancel task 420 ws_listener.deinit(io); 421 server_future.cancel(io); 422 423 // join GC thread — it checks shutdown_flag and will exit its sleep loop. 424 // must complete before dp.deinit() runs (dp is stack-owned). 425 gc_thread.join(); 426 427 // join pullHosts thread if running 428 if (pull_hosts_thread) |t| t.join(); 429 430 // join db queue workers — shutdown flag already set, they will drain and exit 431 db_worker_1.join(); 432 db_worker_2.join(); 433 434 // join host ops thread — drains remaining ops before dp.deinit() 435 host_ops_thread.join(); 436 437 // join backfiller/cleaner threads if running — they touch dp and ci 438 backfiller.waitForCompletion(); 439 cleaner.waitForCompletion(); 440 441 // cancel broadcaster fiber (shutdown flag already set, it will drain remaining) 442 broadcast_future.cancel(io); 443 444 // close metrics listener to unblock accept(), then cancel task 445 metrics_srv.server.deinit(io); 446 metrics_future.cancel(io); 447 448 log.info("relay stopped cleanly", .{}); 449} 450 451const builtin = @import("builtin"); 452 453/// concrete wrapper for runIo — io.concurrent needs ArgsTuple, which can't handle anytype 454fn runWsServer(server: *websocket.Server(broadcaster.Handler), listener: *Io.net.Server, bc: *broadcaster.Broadcaster) void { 455 server.runIo(listener, bc); 456} 457 458fn gcLoop(dp: *event_log_mod.DiskPersist, io: Io) void { 459 const gc_interval: u64 = 10 * 60; // 10 minutes in seconds 460 while (!shutdown_flag.load(.acquire)) { 461 // sleep in small increments to check shutdown 462 var remaining: u64 = gc_interval; 463 while (remaining > 0 and !shutdown_flag.load(.acquire)) { 464 const chunk = @min(remaining, 1); 465 io.sleep(Io.Duration.fromSeconds(@intCast(chunk)), .awake) catch return; 466 remaining -= chunk; 467 } 468 if (shutdown_flag.load(.acquire)) return; 469 470 dp.gc() catch |err| { 471 log.warn("event log GC failed: {s}", .{@errorName(err)}); 472 }; 473 474 // return freed pages to OS (glibc-specific, no-op on other platforms) 475 if (comptime malloc_trim) |trim| { 476 _ = trim(0); 477 log.info("gc: malloc_trim complete", .{}); 478 } 479 } 480} 481 482fn signalHandler(_: std.posix.SIG) callconv(.c) void { 483 shutdown_flag.store(true, .release); 484} 485 486fn installSignalHandlers() void { 487 const act: std.posix.Sigaction = .{ 488 .handler = .{ .handler = signalHandler }, 489 .mask = std.posix.sigemptyset(), 490 .flags = 0, 491 }; 492 std.posix.sigaction(std.posix.SIG.INT, &act, null); 493 std.posix.sigaction(std.posix.SIG.TERM, &act, null); 494 495 // ignore SIGPIPE — writing to disconnected consumers must not crash the process 496 const ignore_act: std.posix.Sigaction = .{ 497 .handler = .{ .handler = std.posix.SIG.IGN }, 498 .mask = std.posix.sigemptyset(), 499 .flags = 0, 500 }; 501 std.posix.sigaction(std.posix.SIG.PIPE, &ignore_act, null); 502} 503 504/// normalize seed host: empty string or "none" means disabled (no bootstrap). 505fn normalizeSeedHost(raw: []const u8) []const u8 { 506 if (raw.len == 0 or std.mem.eql(u8, raw, "none")) return ""; 507 return raw; 508} 509 510/// libc getenv — std.posix.getenv removed in 0.16 511fn getenv(key: [*:0]const u8) ?[]const u8 { 512 const ptr = std.c.getenv(key) orelse return null; 513 return std.mem.sliceTo(ptr, 0); 514} 515 516fn parseEnvInt(comptime T: type, key: [*:0]const u8, default: T) T { 517 const val = getenv(key) orelse return default; 518 return std.fmt.parseInt(T, val, 10) catch default; 519} 520 521test "normalizeSeedHost" { 522 // normal hostnames pass through 523 try std.testing.expectEqualStrings("bsky.network", normalizeSeedHost("bsky.network")); 524 try std.testing.expectEqualStrings("relay.example.com", normalizeSeedHost("relay.example.com")); 525 526 // empty string and "none" disable bootstrap 527 try std.testing.expectEqualStrings("", normalizeSeedHost("")); 528 try std.testing.expectEqualStrings("", normalizeSeedHost("none")); 529}