atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: spec compliance — getHostStatus, takedown #account events, DID migration validation

- add getHostStatus endpoint (lexicon says "implemented by relays")
- emit #account event on admin takedown so downstream consumers see it
- validate DID migrations asynchronously instead of blindly updating host_id

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz 4b9b4295 70e89255

+312 -41
+33 -20
README.md
··· 1 1 # zlay 2 2 3 - an [AT Protocol](https://atproto.com/) relay in zig. crawls PDS hosts directly and rebroadcasts their firehose as a single aggregated stream. 3 + an [AT Protocol](https://atproto.com/) relay in zig. subscribes to every PDS on the network, verifies commit signatures, and serves the merged event stream to downstream consumers via `com.atproto.sync.subscribeRepos`. 4 4 5 - **live instance**: [zlay.waow.tech](https://zlay.waow.tech/_health) — [metrics dashboard](https://zlay-metrics.waow.tech) 5 + **live instance**: [zlay.waow.tech](https://zlay.waow.tech/_health) — [metrics](https://zlay-metrics.waow.tech) 6 6 7 - ## what it does 7 + ## design 8 8 9 - a relay subscribes to every PDS on the network, verifies commit signatures, and serves the merged event stream to downstream consumers via `com.atproto.sync.subscribeRepos`. it also maintains a collection index for `com.atproto.sync.listReposByCollection`. 9 + - **direct PDS crawl** — the bootstrap relay (`bsky.network`) is called once at startup for the host list via `listHosts`, then all data flows directly from each PDS. no fan-out relay in between. 10 10 11 - ## design 11 + - **optimistic signature validation** — on signing key cache miss, the frame passes through immediately and the DID is queued for background resolution. the first commit from an unknown account is unvalidated; all subsequent commits are verified against the cached key. >99.9% cache hit rate after warmup. 12 12 13 - - **direct PDS crawl** — no fan-out relay in between. the bootstrap relay (bsky.network) is called once at startup for the host list, then all data flows from each PDS. 14 - - **optimistic validation** — on signing key cache miss, frames pass through immediately and the DID is queued for background resolution. >99.9% cache hit rate after warmup. 15 - - **inline collection index** — RocksDB with two column families for bidirectional `(DID, collection)` lookups. no sidecar process. 16 - - **one thread per PDS** — predictable memory, no GC. ~2,750 threads is fine; most are blocked on websocket reads. 13 + - **inline collection index** — indexes `(DID, collection)` pairs directly in the event processing pipeline using [RocksDB](https://rocksdb.org/) with two column families: `rbc` for collection-to-DID lookups and `cbr` for DID-to-collection cleanup. serves `listReposByCollection` from the relay process — no sidecar. the index design draws on [fig](https://tangled.org/microcosm.blue)'s work on [lightrail](https://tangled.org/microcosm.blue/lightrail), which uses adjacent keys from CAR slices to enumerate collections. 17 14 18 - ## dependencies 19 - 20 - | dependency | purpose | 21 - |---|---| 22 - | [zat](https://tangled.org/zzstoatzz.io/zat) | AT Protocol primitives (CBOR, CAR, signatures, DID resolution) | 23 - | [websocket.zig](https://github.com/nicholasgasior/websocket.zig) | WebSocket client/server | 24 - | [pg.zig](https://github.com/karlseguin/pg.zig) | PostgreSQL driver | 25 - | [rocksdb-zig](https://github.com/Syndica/rocksdb-zig) | RocksDB bindings | 15 + - **one OS thread per PDS** — predictable memory, no garbage collector. ~2,750 threads is fine; most are blocked on websocket reads. thread stacks are set to 2 MB (zig's default is 16 MB). 26 16 27 17 ## endpoints 28 18 29 19 | endpoint | method | 30 20 |---|---| 31 - | `com.atproto.sync.subscribeRepos` | WebSocket (port 3000) | 21 + | `com.atproto.sync.subscribeRepos` | WebSocket | 32 22 | `com.atproto.sync.listRepos` | GET | 33 23 | `com.atproto.sync.getRepoStatus` | GET | 34 24 | `com.atproto.sync.getLatestCommit` | GET | ··· 36 26 | `com.atproto.sync.listHosts` | GET | 37 27 | `com.atproto.sync.requestCrawl` | POST | 38 28 29 + `getRepo` is not implemented — the relay does not serve full repository exports. 30 + 31 + ## dependencies 32 + 33 + | dependency | purpose | 34 + |---|---| 35 + | [zat](https://tangled.org/zzstoatzz.io/zat) | AT Protocol primitives (CBOR, CAR, signatures, DID resolution) | 36 + | [websocket.zig](https://github.com/nicholasgasior/websocket.zig) | WebSocket client/server | 37 + | [pg.zig](https://github.com/karlseguin/pg.zig) | PostgreSQL driver | 38 + | [rocksdb-zig](https://github.com/Syndica/rocksdb-zig) | [RocksDB](https://rocksdb.org/) bindings | 39 + 39 40 ## build 40 41 41 42 requires zig 0.15 and a C/C++ toolchain (for RocksDB). ··· 46 47 zig build -Doptimize=ReleaseSafe # release build 47 48 ``` 48 49 50 + ## configuration 51 + 52 + | variable | default | description | 53 + |---|---|---| 54 + | `RELAY_PORT` | `3000` | WebSocket firehose port | 55 + | `RELAY_HTTP_PORT` | `3001` | HTTP API port | 56 + | `RELAY_UPSTREAM` | `bsky.network` | bootstrap relay for initial host list | 57 + | `RELAY_DATA_DIR` | `data/events` | event log storage | 58 + | `RELAY_RETENTION_HOURS` | `72` | event retention window | 59 + | `COLLECTION_INDEX_DIR` | `data/collection-index` | RocksDB collection index path | 60 + | `DATABASE_URL` | — | PostgreSQL connection string | 61 + | `RELAY_ADMIN_PASSWORD` | — | bearer token for admin endpoints | 62 + 49 63 see [docs/deployment.md](docs/deployment.md) for production deployment and [docs/backfill.md](docs/backfill.md) for collection index backfill. 50 64 51 65 ## numbers 52 66 53 67 | metric | value | 54 68 |---|---| 55 - | code | ~6,000 lines | 56 69 | connected PDS hosts | ~2,750 | 57 - | memory | ~2.9 GiB steady state | 70 + | memory | ~2.9 GiB | 58 71 | throughput | ~600 events/sec typical |
+22 -9
src/event_log.zig
··· 233 233 self.flush_thread = try std.Thread.spawn(.{ .stack_size = 2 * 1024 * 1024 }, flushLoop, .{self}); 234 234 } 235 235 236 + pub const UidResult = struct { 237 + uid: u64, 238 + host_changed: bool = false, 239 + }; 240 + 236 241 /// resolve a DID to a numeric UID, associating with a host. 237 242 /// on first encounter, creates account row with host_id. 238 - /// on subsequent encounters from a different host, updates host_id. 239 - /// Go relay: preProcessEvent → CreateAccountHost / EnsureAccountHost 240 - pub fn uidForDidFromHost(self: *DiskPersist, did: []const u8, host_id: u64) !u64 { 243 + /// on subsequent encounters from a different host, returns host_changed=true 244 + /// so the caller can queue async DID migration validation. 245 + pub fn uidForDidFromHost(self: *DiskPersist, did: []const u8, host_id: u64) !UidResult { 241 246 const uid = try self.uidForDid(did); 242 247 if (host_id > 0) { 243 248 const current_host = self.getAccountHostId(uid) catch 0; ··· 245 250 // first encounter: set host_id 246 251 self.setAccountHostId(uid, host_id) catch {}; 247 252 } else if (current_host != host_id) { 248 - // host mismatch: account may have migrated 249 - // Go relay re-resolves DID doc here; we log and update 250 - // (full DID re-resolution for migration validation is TODO) 251 - log.info("account {s} (uid={d}) host changed: {d} → {d}", .{ did, uid, current_host, host_id }); 252 - self.setAccountHostId(uid, host_id) catch {}; 253 + // host mismatch: don't update yet — caller should validate via DID resolution 254 + log.info("account {s} (uid={d}) host mismatch: current={d} new={d}, queuing migration check", .{ did, uid, current_host, host_id }); 255 + return .{ .uid = uid, .host_changed = true }; 253 256 } 254 257 } 255 - return uid; 258 + return .{ .uid = uid }; 256 259 } 257 260 258 261 /// resolve a DID to a numeric UID. creates a new account row on first encounter. ··· 438 441 "UPDATE host SET last_seq = $2, updated_at = now() WHERE id = $1", 439 442 .{ @as(i64, @intCast(host_id)), @as(i64, @intCast(seq)) }, 440 443 ); 444 + } 445 + 446 + /// look up host ID by hostname. returns null if not found. 447 + pub fn getHostIdForHostname(self: *DiskPersist, hostname: []const u8) !?u64 { 448 + var row = (try self.db.rowUnsafe( 449 + "SELECT id FROM host WHERE hostname = $1", 450 + .{hostname}, 451 + )) orelse return null; 452 + defer row.deinit() catch {}; 453 + return @intCast(row.get(i64, 0)); 441 454 } 442 455 443 456 /// update host status (active, blocked, exhausted)
+144 -7
src/main.zig
··· 12 12 //! /xrpc/com.atproto.sync.getLatestCommit — latest commit CID + rev 13 13 //! /xrpc/com.atproto.sync.listReposByCollection — repos with records in a collection 14 14 //! /xrpc/com.atproto.sync.listHosts — paginated active host listing 15 + //! /xrpc/com.atproto.sync.getHostStatus — single host status 15 16 //! /xrpc/com.atproto.sync.requestCrawl — request PDS crawl (POST) 16 17 //! /admin/hosts — list all hosts (GET, admin) 17 18 //! /admin/hosts/block — block a host (POST, admin) ··· 45 46 slurper: *slurper_mod.Slurper, 46 47 collection_index: *collection_index_mod.CollectionIndex, 47 48 backfiller: *backfill_mod.Backfiller, 49 + bc: *broadcaster.Broadcaster, 48 50 49 51 fn run(self: *HttpServer) void { 50 52 while (!shutdown_flag.load(.acquire)) { ··· 53 55 log.debug("http accept error: {s}", .{@errorName(err)}); 54 56 continue; 55 57 }; 56 - handleHttpConn(conn.stream, self.stats, self.persist, self.slurper, self.collection_index, self.backfiller); 58 + handleHttpConn(conn.stream, self.stats, self.persist, self.slurper, self.collection_index, self.backfiller, self.bc); 57 59 } 58 60 } 59 61 }; ··· 98 100 // start flush thread 99 101 try dp.start(); 100 102 101 - // wire persist into broadcaster for cursor replay 103 + // wire persist into broadcaster for cursor replay and validator for migration checks 102 104 bc.persist = &dp; 105 + val.persist = &dp; 103 106 104 107 // init collection index (RocksDB — inspired by lightrail/microcosm.blue) 105 108 const ci_dir = std.posix.getenv("COLLECTION_INDEX_DIR") orelse "data/collection-index"; ··· 145 148 .slurper = &slurper, 146 149 .collection_index = &ci, 147 150 .backfiller = &backfiller, 151 + .bc = &bc, 148 152 }; 149 153 const http_thread = try std.Thread.spawn(.{ .stack_size = default_stack_size }, HttpServer.run, .{&http_srv}); 150 154 ··· 224 228 std.posix.sigaction(std.posix.SIG.PIPE, &ignore_act, null); 225 229 } 226 230 227 - fn handleHttpConn(stream: std.net.Stream, stats: *broadcaster.Stats, persist: *event_log_mod.DiskPersist, slurper: *slurper_mod.Slurper, ci: *collection_index_mod.CollectionIndex, backfiller: *backfill_mod.Backfiller) void { 231 + fn handleHttpConn(stream: std.net.Stream, stats: *broadcaster.Stats, persist: *event_log_mod.DiskPersist, slurper: *slurper_mod.Slurper, ci: *collection_index_mod.CollectionIndex, backfiller: *backfill_mod.Backfiller, bc: *broadcaster.Broadcaster) void { 228 232 defer stream.close(); 229 233 230 234 var recv_buf: [8192]u8 = undefined; ··· 244 248 if (request.head.method == .GET) { 245 249 handleGet(&request, path, query, stats, persist, slurper, ci, backfiller); 246 250 } else if (request.head.method == .POST) { 247 - handlePost(&request, path, query, persist, slurper, backfiller); 251 + handlePost(&request, path, query, persist, slurper, backfiller, bc); 248 252 } else { 249 253 respondText(&request, .method_not_allowed, "method not allowed"); 250 254 } ··· 274 278 handleListReposByCollection(request, query, ci); 275 279 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listHosts")) { 276 280 handleListHosts(request, query, persist); 281 + } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.getHostStatus")) { 282 + handleGetHostStatus(request, query, persist); 277 283 } else if (std.mem.eql(u8, path, "/admin/hosts")) { 278 284 handleAdminListHosts(request, persist, slurper); 279 285 } else if (std.mem.eql(u8, path, "/admin/backfill-collections")) { ··· 308 314 } 309 315 } 310 316 311 - fn handlePost(request: *http.Server.Request, path: []const u8, query: []const u8, persist: *event_log_mod.DiskPersist, slurper: *slurper_mod.Slurper, backfiller: *backfill_mod.Backfiller) void { 317 + fn handlePost(request: *http.Server.Request, path: []const u8, query: []const u8, persist: *event_log_mod.DiskPersist, slurper: *slurper_mod.Slurper, backfiller: *backfill_mod.Backfiller, bc: *broadcaster.Broadcaster) void { 312 318 if (std.mem.eql(u8, path, "/admin/repo/ban")) { 313 - handleBan(request, persist); 319 + handleBan(request, persist, bc); 314 320 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.requestCrawl")) { 315 321 handleRequestCrawl(request, slurper); 316 322 } else if (std.mem.eql(u8, path, "/admin/hosts/block")) { ··· 324 330 } 325 331 } 326 332 327 - fn handleBan(request: *http.Server.Request, persist: *event_log_mod.DiskPersist) void { 333 + fn handleBan(request: *http.Server.Request, persist: *event_log_mod.DiskPersist, bc: *broadcaster.Broadcaster) void { 328 334 if (!checkAdmin(request)) return; 329 335 330 336 // read body (after checkAdmin which uses iterateHeaders) ··· 354 360 respondJson(request, .internal_server_error, "{\"error\":\"takedown failed\"}"); 355 361 return; 356 362 }; 363 + 364 + // emit #account event so downstream consumers see the takedown 365 + if (buildAccountFrame(persist.allocator, did)) |frame_bytes| { 366 + if (persist.persist(.account, uid, frame_bytes)) |relay_seq| { 367 + bc.stats.relay_seq.store(relay_seq, .release); 368 + const broadcast_data = broadcaster.resequenceFrame(persist.allocator, frame_bytes, relay_seq) orelse frame_bytes; 369 + bc.broadcast(relay_seq, broadcast_data); 370 + log.info("admin: emitted #account takedown event for {s} (seq={d})", .{ did, relay_seq }); 371 + } else |err| { 372 + log.warn("admin: failed to persist #account takedown event: {s}", .{@errorName(err)}); 373 + } 374 + } 357 375 358 376 log.info("admin: banned {s} (uid={d})", .{ did, uid }); 359 377 respondJson(request, .ok, "{\"success\":true}"); ··· 895 913 896 914 w.writeByte('}') catch return; 897 915 respondJson(request, .ok, fbs.getWritten()); 916 + } 917 + 918 + fn handleGetHostStatus(request: *http.Server.Request, query: []const u8, persist: *event_log_mod.DiskPersist) void { 919 + var hostname_buf: [256]u8 = undefined; 920 + const hostname = queryParamDecoded(query, "hostname", &hostname_buf) orelse { 921 + respondJson(request, .bad_request, "{\"error\":\"InvalidRequest\",\"message\":\"hostname parameter required\"}"); 922 + return; 923 + }; 924 + 925 + // look up host 926 + var row = (persist.db.rowUnsafe( 927 + "SELECT id, hostname, status, last_seq FROM host WHERE hostname = $1", 928 + .{hostname}, 929 + ) catch { 930 + respondJson(request, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); 931 + return; 932 + }) orelse { 933 + respondJson(request, .bad_request, "{\"error\":\"HostNotFound\",\"message\":\"host not found\"}"); 934 + return; 935 + }; 936 + defer row.deinit() catch {}; 937 + 938 + const host_id = row.get(i64, 0); 939 + const host_name = row.get([]const u8, 1); 940 + const raw_status = row.get([]const u8, 2); 941 + const seq = row.get(i64, 3); 942 + 943 + // map internal status to lexicon hostStatus values 944 + const status = if (std.mem.eql(u8, raw_status, "blocked")) 945 + "banned" 946 + else if (std.mem.eql(u8, raw_status, "exhausted")) 947 + "offline" 948 + else 949 + raw_status; // active, idle pass through 950 + 951 + // count accounts on this host 952 + const account_count: i64 = if (persist.db.rowUnsafe( 953 + "SELECT COUNT(*) FROM account WHERE host_id = $1", 954 + .{host_id}, 955 + ) catch null) |cnt_row| blk: { 956 + var r = cnt_row; 957 + defer r.deinit() catch {}; 958 + break :blk r.get(i64, 0); 959 + } else 0; 960 + 961 + var buf: [4096]u8 = undefined; 962 + var fbs = std.io.fixedBufferStream(&buf); 963 + const w = fbs.writer(); 964 + 965 + w.writeAll("{\"hostname\":\"") catch return; 966 + w.writeAll(host_name) catch return; 967 + w.writeAll("\"") catch return; 968 + std.fmt.format(w, ",\"seq\":{d},\"accountCount\":{d}", .{ seq, account_count }) catch return; 969 + w.writeAll(",\"status\":\"") catch return; 970 + w.writeAll(status) catch return; 971 + w.writeAll("\"}") catch return; 972 + 973 + respondJson(request, .ok, fbs.getWritten()); 974 + } 975 + 976 + /// build a CBOR #account frame for a takedown event. 977 + /// header: {op: 1, t: "#account"}, payload: {seq: 0, did: "...", time: "...", active: false, status: "takendown"} 978 + fn buildAccountFrame(allocator: std.mem.Allocator, did: []const u8) ?[]const u8 { 979 + const zat = @import("zat"); 980 + const cbor = zat.cbor; 981 + 982 + const header: cbor.Value = .{ .map = &.{ 983 + .{ .key = "op", .value = .{ .unsigned = 1 } }, 984 + .{ .key = "t", .value = .{ .text = "#account" } }, 985 + } }; 986 + 987 + var time_buf: [24]u8 = undefined; 988 + const time_str = formatTimestamp(&time_buf); 989 + 990 + const payload: cbor.Value = .{ .map = &.{ 991 + .{ .key = "seq", .value = .{ .unsigned = 0 } }, 992 + .{ .key = "did", .value = .{ .text = did } }, 993 + .{ .key = "time", .value = .{ .text = time_str } }, 994 + .{ .key = "active", .value = .{ .boolean = false } }, 995 + .{ .key = "status", .value = .{ .text = "takendown" } }, 996 + } }; 997 + 998 + const header_bytes = cbor.encodeAlloc(allocator, header) catch return null; 999 + const payload_bytes = cbor.encodeAlloc(allocator, payload) catch { 1000 + allocator.free(header_bytes); 1001 + return null; 1002 + }; 1003 + 1004 + var frame = allocator.alloc(u8, header_bytes.len + payload_bytes.len) catch { 1005 + allocator.free(header_bytes); 1006 + allocator.free(payload_bytes); 1007 + return null; 1008 + }; 1009 + @memcpy(frame[0..header_bytes.len], header_bytes); 1010 + @memcpy(frame[header_bytes.len..], payload_bytes); 1011 + 1012 + allocator.free(header_bytes); 1013 + allocator.free(payload_bytes); 1014 + 1015 + return frame; 1016 + } 1017 + 1018 + /// format current UTC time as ISO 8601 (YYYY-MM-DDTHH:MM:SSZ) 1019 + fn formatTimestamp(buf: *[24]u8) []const u8 { 1020 + const ts: u64 = @intCast(std.time.timestamp()); 1021 + const es = std.time.epoch.EpochSeconds{ .secs = ts }; 1022 + const day = es.getEpochDay(); 1023 + const yd = day.calculateYearDay(); 1024 + const md = yd.calculateMonthDay(); 1025 + const ds = es.getDaySeconds(); 1026 + 1027 + return std.fmt.bufPrint(buf, "{d:0>4}-{d:0>2}-{d:0>2}T{d:0>2}:{d:0>2}:{d:0>2}Z", .{ 1028 + yd.year, 1029 + @as(u32, @intFromEnum(md.month)) + 1, 1030 + @as(u32, md.day_index) + 1, 1031 + ds.getHoursIntoDay(), 1032 + ds.getMinutesIntoHour(), 1033 + ds.getSecondsIntoMinute(), 1034 + }) catch "1970-01-01T00:00:00Z"; 898 1035 } 899 1036 900 1037 // --- backfill handlers ---
+7 -4
src/subscriber.zig
··· 339 339 340 340 // resolve DID → numeric UID for event header (host-aware) 341 341 const uid: u64 = if (sub.persist) |dp| blk: { 342 - break :blk if (did) |d| 343 - dp.uidForDidFromHost(d, sub.options.host_id) catch 0 344 - else 345 - 0; 342 + if (did) |d| { 343 + const result = dp.uidForDidFromHost(d, sub.options.host_id) catch break :blk @as(u64, 0); 344 + if (result.host_changed) { 345 + sub.validator.queueMigrationCheck(d, sub.options.host_id); 346 + } 347 + break :blk result.uid; 348 + } else break :blk @as(u64, 0); 346 349 } else 0; 347 350 348 351 // process #account events: update upstream status
+106 -1
src/validator.zig
··· 8 8 const std = @import("std"); 9 9 const zat = @import("zat"); 10 10 const broadcaster = @import("broadcaster.zig"); 11 + const event_log_mod = @import("event_log.zig"); 11 12 12 13 const Allocator = std.mem.Allocator; 13 14 const log = std.log.scoped(.relay); ··· 39 40 rev_clock_skew: i64 = 300, // 5 minutes 40 41 }; 41 42 43 + const MigrationCheck = struct { 44 + did: []const u8, // duped, owned by validator 45 + new_host_id: u64, 46 + }; 47 + 42 48 pub const Validator = struct { 43 49 allocator: Allocator, 44 50 stats: *broadcaster.Stats, 45 51 config: ValidatorConfig, 52 + persist: ?*event_log_mod.DiskPersist = null, 46 53 // DID → signing key cache (decoded, ready for verification) 47 54 cache: std.StringHashMapUnmanaged(CachedKey) = .{}, 48 55 cache_mutex: std.Thread.Mutex = .{}, 49 56 // background resolve queue 50 57 queue: std.ArrayListUnmanaged([]const u8) = .{}, 58 + // migration validation queue 59 + migration_queue: std.ArrayListUnmanaged(MigrationCheck) = .{}, 51 60 queue_mutex: std.Thread.Mutex = .{}, 52 61 queue_cond: std.Thread.Condition = .{}, 53 62 resolver_threads: [max_resolver_threads]?std.Thread = .{null} ** max_resolver_threads, ··· 90 99 self.allocator.free(did); 91 100 } 92 101 self.queue.deinit(self.allocator); 102 + 103 + // free migration queue 104 + for (self.migration_queue.items) |mc| { 105 + self.allocator.free(mc.did); 106 + } 107 + self.migration_queue.deinit(self.allocator); 93 108 } 94 109 95 110 /// start background resolver threads ··· 383 398 384 399 while (self.alive.load(.acquire)) { 385 400 var did: ?[]const u8 = null; 401 + var migration: ?MigrationCheck = null; 386 402 { 387 403 self.queue_mutex.lock(); 388 404 defer self.queue_mutex.unlock(); 389 - while (self.queue.items.len == 0 and self.alive.load(.acquire)) { 405 + while (self.queue.items.len == 0 and self.migration_queue.items.len == 0 and self.alive.load(.acquire)) { 390 406 self.queue_cond.timedWait(&self.queue_mutex, 1 * std.time.ns_per_s) catch {}; 391 407 } 392 408 if (self.queue.items.len > 0) { 393 409 did = self.queue.orderedRemove(0); 410 + } else if (self.migration_queue.items.len > 0) { 411 + migration = self.migration_queue.orderedRemove(0); 394 412 } 395 413 } 396 414 ··· 434 452 self.cache.put(self.allocator, did_duped, cached) catch { 435 453 self.allocator.free(did_duped); 436 454 }; 455 + } else if (migration) |mc| { 456 + defer self.allocator.free(mc.did); 457 + self.processMigrationCheck(&resolver, mc); 437 458 } 438 459 } 439 460 } 440 461 462 + /// validate a host migration by resolving the DID document and checking the PDS endpoint 463 + fn processMigrationCheck(self: *Validator, resolver: *zat.DidResolver, mc: MigrationCheck) void { 464 + const persist = self.persist orelse return; 465 + 466 + const parsed = zat.Did.parse(mc.did) orelse { 467 + log.debug("migration check: invalid DID {s}", .{mc.did}); 468 + return; 469 + }; 470 + 471 + var doc = resolver.resolve(parsed) catch |err| { 472 + log.debug("migration check: DID resolve failed for {s}: {s}", .{ mc.did, @errorName(err) }); 473 + return; 474 + }; 475 + defer doc.deinit(); 476 + 477 + const pds_endpoint = doc.pdsEndpoint() orelse { 478 + log.debug("migration check: no PDS endpoint for {s}", .{mc.did}); 479 + return; 480 + }; 481 + 482 + // extract hostname from PDS endpoint URL (strip https:// prefix) 483 + const pds_host = extractHostFromUrl(pds_endpoint) orelse { 484 + log.debug("migration check: cannot parse PDS URL '{s}' for {s}", .{ pds_endpoint, mc.did }); 485 + return; 486 + }; 487 + 488 + // look up the hostname → host_id 489 + const resolved_host_id = (persist.getHostIdForHostname(pds_host) catch { 490 + log.debug("migration check: host lookup failed for {s}", .{pds_host}); 491 + return; 492 + }) orelse { 493 + log.debug("migration check: unknown host {s} for {s}", .{ pds_host, mc.did }); 494 + return; 495 + }; 496 + 497 + if (resolved_host_id == mc.new_host_id) { 498 + // DID document confirms the new host — update 499 + const uid = persist.uidForDid(mc.did) catch return; 500 + persist.setAccountHostId(uid, mc.new_host_id) catch return; 501 + log.info("migration validated: {s} → host {d} (confirmed by DID doc)", .{ mc.did, mc.new_host_id }); 502 + } else { 503 + log.warn("migration rejected: {s} claims host {d}, but DID doc says {s} (host {d})", .{ 504 + mc.did, mc.new_host_id, pds_host, resolved_host_id, 505 + }); 506 + } 507 + } 508 + 441 509 /// evict a DID's cached signing key (e.g. on #identity event). 442 510 /// the next commit from this DID will trigger a fresh resolution. 443 511 pub fn evictKey(self: *Validator, did: []const u8) void { ··· 448 516 } 449 517 } 450 518 519 + /// queue a DID for async migration validation (host change detected) 520 + pub fn queueMigrationCheck(self: *Validator, did: []const u8, new_host_id: u64) void { 521 + const duped = self.allocator.dupe(u8, did) catch return; 522 + 523 + self.queue_mutex.lock(); 524 + defer self.queue_mutex.unlock(); 525 + self.migration_queue.append(self.allocator, .{ 526 + .did = duped, 527 + .new_host_id = new_host_id, 528 + }) catch { 529 + self.allocator.free(duped); 530 + return; 531 + }; 532 + self.queue_cond.signal(); 533 + } 534 + 451 535 /// cache size (for diagnostics) 452 536 pub fn cacheSize(self: *Validator) usize { 453 537 self.cache_mutex.lock(); ··· 455 539 return self.cache.count(); 456 540 } 457 541 }; 542 + 543 + /// extract hostname from a URL like "https://pds.example.com" or "https://pds.example.com:443/path" 544 + fn extractHostFromUrl(url: []const u8) ?[]const u8 { 545 + // strip scheme 546 + var rest = url; 547 + if (std.mem.startsWith(u8, rest, "https://")) { 548 + rest = rest["https://".len..]; 549 + } else if (std.mem.startsWith(u8, rest, "http://")) { 550 + rest = rest["http://".len..]; 551 + } 552 + // strip path 553 + if (std.mem.indexOfScalar(u8, rest, '/')) |i| { 554 + rest = rest[0..i]; 555 + } 556 + // strip port 557 + if (std.mem.indexOfScalar(u8, rest, ':')) |i| { 558 + rest = rest[0..i]; 559 + } 560 + if (rest.len == 0) return null; 561 + return rest; 562 + } 458 563 459 564 fn parseEnvInt(comptime T: type, key: []const u8, default: T) T { 460 565 const val = std.posix.getenv(key) orelse return default;