atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 1014 lines 40 kB view raw
1//! relay frame validator — DID key resolution + real signature verification 2//! 3//! validates firehose commit frames by verifying the commit signature against 4//! the pre-resolved signing key for the DID. accepts pre-decoded CBOR payload 5//! from the subscriber (decoded via zat SDK). on cache miss, skips validation 6//! and queues background resolution. no frame is ever blocked on network I/O. 7 8const std = @import("std"); 9const Io = std.Io; 10const zat = @import("zat"); 11const broadcaster = @import("broadcaster.zig"); 12const event_log_mod = @import("event_log.zig"); 13const lru = @import("lru.zig"); 14 15const Allocator = std.mem.Allocator; 16const log = std.log.scoped(.relay); 17 18/// decoded and cached signing key for a DID 19const CachedKey = struct { 20 key_type: zat.multicodec.KeyType, 21 raw: [33]u8, // compressed public key (secp256k1 or p256) 22 len: u8, 23 resolve_time: i64 = 0, // epoch seconds when resolved 24}; 25 26pub const ValidationResult = struct { 27 valid: bool, 28 skipped: bool, 29 data_cid: ?[]const u8 = null, // MST root CID from verified commit 30 commit_rev: ?[]const u8 = null, // rev from verified commit 31}; 32 33/// configuration for commit validation checks 34pub const ValidatorConfig = struct { 35 /// verify MST structure during signature verification 36 verify_mst: bool = false, // off by default for relay throughput 37 /// verify commit diffs via MST inversion (sync 1.1) 38 verify_commit_diff: bool = false, 39 /// max allowed operations per commit 40 max_ops: usize = 200, 41 /// max clock skew for rev timestamps (seconds) 42 rev_clock_skew: i64 = 300, // 5 minutes 43}; 44 45pub const Validator = struct { 46 allocator: Allocator, 47 stats: *broadcaster.Stats, 48 config: ValidatorConfig, 49 persist: ?*event_log_mod.DiskPersist = null, 50 // DID → signing key cache (decoded, ready for verification) 51 cache: lru.LruCache(CachedKey), 52 // background resolve queue 53 queue: std.ArrayListUnmanaged([]const u8) = .empty, 54 // in-flight set — prevents duplicate DID entries in the queue 55 queued_set: std.StringHashMapUnmanaged(void) = .empty, 56 queue_mutex: Io.Mutex = Io.Mutex.init, 57 queue_cond: Io.Condition = Io.Condition.init, 58 resolver_futures: [max_resolver_threads]?Io.Future(void) = .{null} ** max_resolver_threads, 59 alive: std.atomic.Value(bool) = .{ .raw = true }, 60 max_cache_size: u32 = 250_000, 61 io: Io, 62 // pool of reusable resolvers for inline host authority checks. 63 // frame workers acquire/release via atomic flag to avoid creating 64 // a fresh resolver (and fresh TLS handshake) per call. 65 host_resolvers: [host_resolver_pool_size]zat.DidResolver = undefined, 66 host_resolver_available: [host_resolver_pool_size]std.atomic.Value(bool) = .{std.atomic.Value(bool){ .raw = false }} ** host_resolver_pool_size, 67 host_resolver_inited: bool = false, 68 69 const max_resolver_threads = 8; 70 const default_resolver_threads = 4; 71 const max_queue_size: usize = 100_000; 72 const host_resolver_pool_size: usize = 4; 73 74 pub fn init(allocator: Allocator, stats: *broadcaster.Stats, io: Io) Validator { 75 return initWithConfig(allocator, stats, .{}, io); 76 } 77 78 pub fn initWithConfig(allocator: Allocator, stats: *broadcaster.Stats, config: ValidatorConfig, io: Io) Validator { 79 return .{ 80 .allocator = allocator, 81 .stats = stats, 82 .config = config, 83 .cache = lru.LruCache(CachedKey).init(allocator, 250_000, io), 84 .io = io, 85 }; 86 } 87 88 pub fn deinit(self: *Validator) void { 89 self.alive.store(false, .release); 90 self.queue_cond.broadcast(self.io); 91 for (&self.resolver_futures) |*slot| { 92 if (slot.*) |*f| { 93 f.cancel(self.io); 94 } 95 slot.* = null; 96 } 97 98 if (self.host_resolver_inited) { 99 for (&self.host_resolvers) |*r| { 100 r.deinit(); 101 } 102 self.host_resolver_inited = false; 103 } 104 105 self.cache.deinit(); 106 107 // free queued DIDs 108 for (self.queue.items) |did| { 109 self.allocator.free(did); 110 } 111 self.queue.deinit(self.allocator); 112 self.queued_set.deinit(self.allocator); 113 } 114 115 /// start background resolver threads and host authority resolver pool 116 pub fn start(self: *Validator) !void { 117 self.max_cache_size = parseEnvInt(u32, "VALIDATOR_CACHE_SIZE", self.max_cache_size); 118 self.cache.capacity = self.max_cache_size; 119 const n = parseEnvInt(u8, "RESOLVER_THREADS", default_resolver_threads); 120 const count = @min(n, max_resolver_threads); 121 for (self.resolver_futures[0..count]) |*slot| { 122 slot.* = try self.io.concurrent(resolveLoop, .{self}); 123 } 124 125 // init host authority resolver pool (reused across calls) 126 for (&self.host_resolvers) |*r| { 127 r.* = zat.DidResolver.initWithOptions(self.io, self.allocator, .{}); 128 } 129 for (&self.host_resolver_available) |*a| { 130 a.store(true, .release); 131 } 132 self.host_resolver_inited = true; 133 } 134 135 /// validate a #sync frame: signature verification only (no ops, no MST). 136 /// #sync resets a repo to a new commit state — used for recovery from broken streams. 137 /// on cache miss, queues background resolution and skips. 138 pub fn validateSync(self: *Validator, payload: zat.cbor.Value) ValidationResult { 139 const did = payload.getString("did") orelse { 140 _ = self.stats.skipped.fetchAdd(1, .monotonic); 141 return .{ .valid = true, .skipped = true }; 142 }; 143 144 if (zat.Did.parse(did) == null) { 145 _ = self.stats.failed.fetchAdd(1, .monotonic); 146 _ = self.stats.failed_bad_did.fetchAdd(1, .monotonic); 147 return .{ .valid = false, .skipped = false }; 148 } 149 150 // check rev is valid TID (if present) 151 if (payload.getString("rev")) |rev| { 152 if (zat.Tid.parse(rev) == null) { 153 _ = self.stats.failed.fetchAdd(1, .monotonic); 154 _ = self.stats.failed_bad_rev.fetchAdd(1, .monotonic); 155 return .{ .valid = false, .skipped = false }; 156 } 157 } 158 159 const blocks = payload.getBytes("blocks") orelse { 160 _ = self.stats.failed.fetchAdd(1, .monotonic); 161 _ = self.stats.failed_missing_blocks.fetchAdd(1, .monotonic); 162 return .{ .valid = false, .skipped = false }; 163 }; 164 165 // #sync CAR should be small (just the signed commit block) 166 // lexicon maxLength: 10000 167 if (blocks.len > 10_000) { 168 _ = self.stats.failed.fetchAdd(1, .monotonic); 169 _ = self.stats.failed_oversized_blocks.fetchAdd(1, .monotonic); 170 return .{ .valid = false, .skipped = false }; 171 } 172 173 // cache lookup 174 const cached_key: ?CachedKey = self.cache.get(did); 175 176 if (cached_key == null) { 177 _ = self.stats.cache_misses.fetchAdd(1, .monotonic); 178 _ = self.stats.skipped.fetchAdd(1, .monotonic); 179 self.queueResolve(did); 180 return .{ .valid = true, .skipped = true }; 181 } 182 183 _ = self.stats.cache_hits.fetchAdd(1, .monotonic); 184 185 // verify signature (no MST, no ops) 186 const public_key = zat.multicodec.PublicKey{ 187 .key_type = cached_key.?.key_type, 188 .raw = cached_key.?.raw[0..cached_key.?.len], 189 }; 190 191 var arena = std.heap.ArenaAllocator.init(self.allocator); 192 defer arena.deinit(); 193 194 const result = zat.verifyCommitCar(arena.allocator(), blocks, public_key, .{ 195 .verify_mst = false, 196 .expected_did = did, 197 .max_car_size = 10 * 1024, 198 }) catch |err| { 199 log.debug("sync verification failed for {s}: {s}", .{ did, @errorName(err) }); 200 // sync spec: on signature failure, key may have rotated. 201 // evict cached key and queue re-resolution. skip this frame. 202 self.evictKey(did); 203 self.queueResolve(did); 204 _ = self.stats.skipped.fetchAdd(1, .monotonic); 205 return .{ .valid = true, .skipped = true }; 206 }; 207 208 _ = self.stats.validated.fetchAdd(1, .monotonic); 209 return .{ 210 .valid = true, 211 .skipped = false, 212 .data_cid = result.commit_cid, 213 .commit_rev = result.commit_rev, 214 }; 215 } 216 217 /// validate a commit frame using a pre-decoded CBOR payload (from SDK decoder). 218 /// on cache miss, queues background resolution and skips. 219 pub fn validateCommit(self: *Validator, payload: zat.cbor.Value) ValidationResult { 220 // extract DID from decoded payload 221 const did = payload.getString("repo") orelse { 222 _ = self.stats.skipped.fetchAdd(1, .monotonic); 223 return .{ .valid = true, .skipped = true }; 224 }; 225 226 // check cache for pre-resolved signing key 227 const cached_key: ?CachedKey = self.cache.get(did); 228 229 if (cached_key == null) { 230 // cache miss — queue for background resolution, skip validation 231 _ = self.stats.cache_misses.fetchAdd(1, .monotonic); 232 _ = self.stats.skipped.fetchAdd(1, .monotonic); 233 self.queueResolve(did); 234 return .{ .valid = true, .skipped = true }; 235 } 236 237 _ = self.stats.cache_hits.fetchAdd(1, .monotonic); 238 239 // cache hit — do structure checks + signature verification 240 if (self.verifyCommit(payload, did, cached_key.?)) |vr| { 241 _ = self.stats.validated.fetchAdd(1, .monotonic); 242 return vr; 243 } else |err| { 244 log.debug("commit verification failed for {s}: {s}", .{ did, @errorName(err) }); 245 // sync spec: on signature failure, key may have rotated. 246 // evict cached key and queue re-resolution. skip this frame 247 // (treat as cache miss). next commit will use the refreshed key. 248 self.evictKey(did); 249 self.queueResolve(did); 250 _ = self.stats.skipped.fetchAdd(1, .monotonic); 251 return .{ .valid = true, .skipped = true }; 252 } 253 } 254 255 fn verifyCommit(self: *Validator, payload: zat.cbor.Value, expected_did: []const u8, cached_key: CachedKey) !ValidationResult { 256 // commit structure checks first (cheap, no allocation) 257 self.checkCommitStructure(payload) catch { 258 _ = self.stats.failed_bad_structure.fetchAdd(1, .monotonic); 259 return error.InvalidFrame; 260 }; 261 262 // extract blocks (raw CAR bytes) from the pre-decoded payload 263 const blocks = payload.getBytes("blocks") orelse return error.InvalidFrame; 264 265 // blocks size check — lexicon maxLength: 2000000 266 if (blocks.len > 2_000_000) return error.InvalidFrame; 267 268 // build public key for verification 269 const public_key = zat.multicodec.PublicKey{ 270 .key_type = cached_key.key_type, 271 .raw = cached_key.raw[0..cached_key.len], 272 }; 273 274 // run real signature verification (needs its own arena for CAR/MST temporaries) 275 var arena = std.heap.ArenaAllocator.init(self.allocator); 276 defer arena.deinit(); 277 const alloc = arena.allocator(); 278 279 // try sync 1.1 path: extract ops and use verifyCommitDiff 280 if (self.config.verify_commit_diff) { 281 if (self.extractOps(alloc, payload)) |msg_ops| { 282 // get stored prev_data from payload 283 const prev_data: ?[]const u8 = if (payload.get("prevData")) |pd| switch (pd) { 284 .cid => |c| c.raw, 285 .null => null, 286 else => null, 287 } else null; 288 289 const diff_result = zat.verifyCommitDiff(alloc, blocks, msg_ops, prev_data, public_key, .{ 290 .expected_did = expected_did, 291 .skip_inversion = prev_data == null, 292 }) catch |err| { 293 return err; 294 }; 295 296 return .{ 297 .valid = true, 298 .skipped = false, 299 .data_cid = diff_result.data_cid, 300 .commit_rev = diff_result.commit_rev, 301 }; 302 } 303 } 304 305 // fallback: legacy verification (signature + optional MST walk) 306 const result = zat.verifyCommitCar(alloc, blocks, public_key, .{ 307 .verify_mst = self.config.verify_mst, 308 .expected_did = expected_did, 309 }) catch |err| { 310 return err; 311 }; 312 313 return .{ 314 .valid = true, 315 .skipped = false, 316 .data_cid = result.commit_cid, 317 .commit_rev = result.commit_rev, 318 }; 319 } 320 321 /// extract ops from payload and convert to mst.Operation array. 322 /// the firehose format uses a single "path" field ("collection/rkey"), 323 /// not separate "collection"/"rkey" fields. 324 fn extractOps(self: *Validator, alloc: Allocator, payload: zat.cbor.Value) ?[]const zat.MstOperation { 325 _ = self; 326 const ops_array = payload.getArray("ops") orelse return null; 327 var ops: std.ArrayListUnmanaged(zat.MstOperation) = .empty; 328 for (ops_array) |op| { 329 const action = op.getString("action") orelse continue; 330 const path = op.getString("path") orelse continue; 331 332 // validate path contains "/" (collection/rkey) 333 if (std.mem.indexOfScalar(u8, path, '/') == null) continue; 334 335 // extract CID values 336 const cid_value: ?[]const u8 = if (op.get("cid")) |v| switch (v) { 337 .cid => |c| c.raw, 338 else => null, 339 } else null; 340 341 var value: ?[]const u8 = null; 342 var prev: ?[]const u8 = null; 343 344 if (std.mem.eql(u8, action, "create")) { 345 value = cid_value; 346 } else if (std.mem.eql(u8, action, "update")) { 347 value = cid_value; 348 prev = if (op.get("prev")) |v| switch (v) { 349 .cid => |c| c.raw, 350 else => null, 351 } else null; 352 } else if (std.mem.eql(u8, action, "delete")) { 353 prev = if (op.get("prev")) |v| switch (v) { 354 .cid => |c| c.raw, 355 else => null, 356 } else null; 357 } else continue; 358 359 ops.append(alloc, .{ 360 .path = path, 361 .value = value, 362 .prev = prev, 363 }) catch return null; 364 } 365 366 if (ops.items.len == 0) return null; 367 return ops.items; 368 } 369 370 fn checkCommitStructure(self: *Validator, payload: zat.cbor.Value) !void { 371 // check repo field is a valid DID 372 const repo = payload.getString("repo") orelse return error.InvalidFrame; 373 if (zat.Did.parse(repo) == null) return error.InvalidFrame; 374 375 // check rev is a valid TID 376 if (payload.getString("rev")) |rev| { 377 if (zat.Tid.parse(rev) == null) return error.InvalidFrame; 378 } 379 380 // check ops count 381 if (payload.get("ops")) |ops_value| { 382 switch (ops_value) { 383 .array => |ops| { 384 if (ops.len > self.config.max_ops) return error.InvalidFrame; 385 // validate each op has valid path (collection/rkey) 386 for (ops) |op| { 387 if (op.getString("path")) |path| { 388 if (std.mem.indexOfScalar(u8, path, '/')) |sep| { 389 const collection = path[0..sep]; 390 const rkey = path[sep + 1 ..]; 391 if (zat.Nsid.parse(collection) == null) return error.InvalidFrame; 392 if (rkey.len > 0) { 393 if (zat.Rkey.parse(rkey) == null) return error.InvalidFrame; 394 } 395 } else return error.InvalidFrame; // path must contain '/' 396 } 397 } 398 }, 399 else => return error.InvalidFrame, 400 } 401 } 402 } 403 404 fn queueResolve(self: *Validator, did: []const u8) void { 405 // check if already cached (race between validate and resolver) 406 if (self.cache.contains(did)) return; 407 408 const duped = self.allocator.dupe(u8, did) catch return; 409 410 self.queue_mutex.lockUncancelable(self.io); 411 defer self.queue_mutex.unlock(self.io); 412 413 // skip if already queued (prevents unbounded queue growth) 414 if (self.queued_set.contains(duped)) { 415 self.allocator.free(duped); 416 return; 417 } 418 419 // cap queue size — drop DID without adding to queued_set so it can be re-queued later 420 if (self.queue.items.len >= max_queue_size) { 421 self.allocator.free(duped); 422 return; 423 } 424 425 self.queue.append(self.allocator, duped) catch { 426 self.allocator.free(duped); 427 return; 428 }; 429 self.queued_set.put(self.allocator, duped, {}) catch {}; 430 self.queue_cond.signal(self.io); 431 } 432 433 fn resolveLoop(self: *Validator) void { 434 var resolver = zat.DidResolver.initWithOptions(self.io, self.allocator, .{ .keep_alive = true }); 435 defer resolver.deinit(); 436 437 while (self.alive.load(.acquire)) { 438 var did: ?[]const u8 = null; 439 { 440 self.queue_mutex.lockUncancelable(self.io); 441 defer self.queue_mutex.unlock(self.io); 442 while (self.queue.items.len == 0 and self.alive.load(.acquire)) { 443 self.queue_cond.waitUncancelable(self.io, &self.queue_mutex); 444 } 445 if (self.queue.items.len > 0) { 446 did = self.queue.orderedRemove(0); 447 _ = self.queued_set.remove(did.?); 448 } 449 } 450 451 const d = did orelse continue; 452 defer self.allocator.free(d); 453 454 // skip if already cached (resolved while queued) 455 if (self.cache.contains(d)) continue; 456 457 // resolve DID → signing key 458 const parsed = zat.Did.parse(d) orelse continue; 459 var doc = resolver.resolve(parsed) catch |err| { 460 log.debug("DID resolve failed for {s}: {s}", .{ d, @errorName(err) }); 461 continue; 462 }; 463 defer doc.deinit(); 464 465 // extract and decode signing key 466 const vm = doc.signingKey() orelse continue; 467 const key_bytes = zat.multibase.decode(self.allocator, vm.public_key_multibase) catch continue; 468 defer self.allocator.free(key_bytes); 469 const public_key = zat.multicodec.parsePublicKey(key_bytes) catch continue; 470 471 // store decoded key in cache (fixed-size, no pointer chasing) 472 var cached = CachedKey{ 473 .key_type = public_key.key_type, 474 .raw = undefined, 475 .len = @intCast(public_key.raw.len), 476 .resolve_time = timestamp(self.io), 477 }; 478 @memcpy(cached.raw[0..public_key.raw.len], public_key.raw); 479 480 self.cache.put(d, cached) catch continue; 481 482 // --- host validation (merged from migration queue) --- 483 // while we have the DID doc, check PDS endpoint and update host if needed. 484 // best-effort: failures don't prevent signing key caching. 485 if (self.persist) |persist| { 486 if (doc.pdsEndpoint()) |pds_endpoint| { 487 if (extractHostFromUrl(pds_endpoint)) |pds_host| { 488 const pds_host_id = (persist.getHostIdForHostname(pds_host) catch null) orelse continue; 489 const uid = persist.uidForDid(d) catch continue; 490 const current_host = persist.getAccountHostId(uid) catch continue; 491 if (current_host != 0 and current_host != pds_host_id) { 492 persist.setAccountHostId(uid, pds_host_id) catch {}; 493 log.info("host updated via DID doc: {s} -> host {d}", .{ d, pds_host_id }); 494 } 495 } 496 } 497 } 498 } 499 } 500 501 /// evict a DID's cached signing key (e.g. on #identity event). 502 /// the next commit from this DID will trigger a fresh resolution. 503 pub fn evictKey(self: *Validator, did: []const u8) void { 504 _ = self.cache.remove(did); 505 } 506 507 /// cache size (for diagnostics) 508 pub fn cacheSize(self: *Validator) u32 { 509 return self.cache.count(); 510 } 511 512 /// resolve queue length (for diagnostics — non-blocking) 513 pub fn resolveQueueLen(self: *Validator) usize { 514 if (!self.queue_mutex.tryLock()) return 0; 515 defer self.queue_mutex.unlock(self.io); 516 return self.queue.items.len; 517 } 518 519 /// resolve dedup set size (for diagnostics — non-blocking) 520 pub fn resolveQueuedSetCount(self: *Validator) u32 { 521 if (!self.queue_mutex.tryLock()) return 0; 522 defer self.queue_mutex.unlock(self.io); 523 return self.queued_set.count(); 524 } 525 526 /// signing key cache hashmap backing capacity (for memory attribution) 527 pub fn cacheMapCapacity(self: *Validator) u32 { 528 return self.cache.mapCapacity(); 529 } 530 531 /// resolver dedup set hashmap backing capacity (for memory attribution — non-blocking) 532 pub fn resolveQueuedSetCapacity(self: *Validator) u32 { 533 if (!self.queue_mutex.tryLock()) return 0; 534 defer self.queue_mutex.unlock(self.io); 535 return self.queued_set.capacity(); 536 } 537 538 pub const HostAuthority = enum { accept, migrate, reject }; 539 540 /// synchronous host authority check. called on first-seen DIDs (is_new) 541 /// and host migrations (host_changed). resolves the DID doc to verify the 542 /// PDS endpoint matches the incoming host. retries once on failure to 543 /// handle transient network errors. 544 /// 545 /// uses a pooled resolver to avoid creating a fresh resolver (and fresh 546 /// TLS handshake) per call. blocks briefly if all pool slots are in use. 547 /// 548 /// returns: 549 /// .accept — should not happen (caller should only call on new/mismatch) 550 /// .migrate — DID doc confirms this host, caller should update host_id 551 /// .reject — DID doc does not confirm, caller should drop the event 552 pub fn resolveHostAuthority(self: *Validator, did: []const u8, incoming_host_id: u64) HostAuthority { 553 const persist = self.persist orelse return .migrate; // no DB — can't check 554 const parsed = zat.Did.parse(did) orelse return .reject; 555 556 const idx = self.acquireHostResolver(); 557 defer self.releaseHostResolver(idx); 558 559 var resolver = &self.host_resolvers[idx]; 560 561 // first resolve attempt 562 var doc = resolver.resolve(parsed) catch { 563 // retry once on network failure 564 var doc2 = resolver.resolve(parsed) catch return .reject; 565 defer doc2.deinit(); 566 return self.checkPdsHost(&doc2, persist, incoming_host_id); 567 }; 568 defer doc.deinit(); 569 return self.checkPdsHost(&doc, persist, incoming_host_id); 570 } 571 572 /// acquire a resolver from the pool. spins until one is available. 573 fn acquireHostResolver(self: *Validator) usize { 574 while (self.alive.load(.acquire)) { 575 for (0..host_resolver_pool_size) |i| { 576 if (self.host_resolver_available[i].cmpxchgStrong(true, false, .acquire, .monotonic) == null) { 577 return i; 578 } 579 } 580 self.io.sleep(Io.Duration.fromMilliseconds(1), .awake) catch {}; 581 } 582 return 0; // shutdown path — caller will exit soon 583 } 584 585 fn releaseHostResolver(self: *Validator, idx: usize) void { 586 self.host_resolver_available[idx].store(true, .release); 587 } 588 589 fn checkPdsHost(self: *Validator, doc: *zat.DidDocument, persist: *event_log_mod.DiskPersist, incoming_host_id: u64) HostAuthority { 590 _ = self; 591 const pds_endpoint = doc.pdsEndpoint() orelse return .reject; 592 const pds_host = extractHostFromUrl(pds_endpoint) orelse return .reject; 593 const pds_host_id = (persist.getHostIdForHostname(pds_host) catch null) orelse return .reject; 594 if (pds_host_id == incoming_host_id) return .migrate; 595 return .reject; 596 } 597}; 598 599/// extract hostname from a URL like "https://pds.example.com" or "https://pds.example.com:443/path" 600pub fn extractHostFromUrl(url: []const u8) ?[]const u8 { 601 // strip scheme 602 var rest = url; 603 if (std.mem.startsWith(u8, rest, "https://")) { 604 rest = rest["https://".len..]; 605 } else if (std.mem.startsWith(u8, rest, "http://")) { 606 rest = rest["http://".len..]; 607 } 608 // strip path 609 if (std.mem.indexOfScalar(u8, rest, '/')) |i| { 610 rest = rest[0..i]; 611 } 612 // strip port 613 if (std.mem.indexOfScalar(u8, rest, ':')) |i| { 614 rest = rest[0..i]; 615 } 616 if (rest.len == 0) return null; 617 return rest; 618} 619 620fn getenv(key: [*:0]const u8) ?[]const u8 { 621 const ptr = std.c.getenv(key) orelse return null; 622 return std.mem.sliceTo(ptr, 0); 623} 624 625fn parseEnvInt(comptime T: type, key: [*:0]const u8, default: T) T { 626 const val = getenv(key) orelse return default; 627 return std.fmt.parseInt(T, val, 10) catch default; 628} 629 630fn timestamp(io: Io) i64 { 631 return @intCast(@divFloor(Io.Timestamp.now(io, .real).nanoseconds, std.time.ns_per_s)); 632} 633 634// --- tests --- 635 636test "validateCommit skips on cache miss" { 637 var stats = broadcaster.Stats{}; 638 var v = Validator.init(std.testing.allocator, &stats, std.testing.io); 639 defer v.deinit(); 640 641 // build a commit payload using SDK 642 const payload: zat.cbor.Value = .{ .map = &.{ 643 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 644 .{ .key = "seq", .value = .{ .unsigned = 42 } }, 645 .{ .key = "rev", .value = .{ .text = "3k2abc000000" } }, 646 .{ .key = "time", .value = .{ .text = "2024-01-15T10:30:00Z" } }, 647 } }; 648 649 const result = v.validateCommit(payload); 650 try std.testing.expect(result.valid); 651 try std.testing.expect(result.skipped); 652 try std.testing.expectEqual(@as(u64, 1), stats.cache_misses.load(.acquire)); 653} 654 655test "validateCommit skips when no repo field" { 656 var stats = broadcaster.Stats{}; 657 var v = Validator.init(std.testing.allocator, &stats, std.testing.io); 658 defer v.deinit(); 659 660 // payload without "repo" field 661 const payload: zat.cbor.Value = .{ .map = &.{ 662 .{ .key = "seq", .value = .{ .unsigned = 42 } }, 663 } }; 664 665 const result = v.validateCommit(payload); 666 try std.testing.expect(result.valid); 667 try std.testing.expect(result.skipped); 668 try std.testing.expectEqual(@as(u64, 1), stats.skipped.load(.acquire)); 669} 670 671test "checkCommitStructure rejects invalid DID" { 672 var stats = broadcaster.Stats{}; 673 var v = Validator.init(std.testing.allocator, &stats, std.testing.io); 674 defer v.deinit(); 675 676 const payload: zat.cbor.Value = .{ .map = &.{ 677 .{ .key = "repo", .value = .{ .text = "not-a-did" } }, 678 } }; 679 680 try std.testing.expectError(error.InvalidFrame, v.checkCommitStructure(payload)); 681} 682 683test "checkCommitStructure accepts valid commit" { 684 var stats = broadcaster.Stats{}; 685 var v = Validator.init(std.testing.allocator, &stats, std.testing.io); 686 defer v.deinit(); 687 688 const payload: zat.cbor.Value = .{ .map = &.{ 689 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 690 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, 691 } }; 692 693 try v.checkCommitStructure(payload); 694} 695 696test "validateSync skips on cache miss" { 697 var stats = broadcaster.Stats{}; 698 var v = Validator.init(std.testing.allocator, &stats, std.testing.io); 699 defer v.deinit(); 700 701 const payload: zat.cbor.Value = .{ .map = &.{ 702 .{ .key = "did", .value = .{ .text = "did:plc:test123" } }, 703 .{ .key = "seq", .value = .{ .unsigned = 42 } }, 704 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, 705 .{ .key = "blocks", .value = .{ .bytes = "deadbeef" } }, 706 } }; 707 708 const result = v.validateSync(payload); 709 try std.testing.expect(result.valid); 710 try std.testing.expect(result.skipped); 711 try std.testing.expectEqual(@as(u64, 1), stats.cache_misses.load(.acquire)); 712} 713 714test "validateSync rejects invalid DID" { 715 var stats = broadcaster.Stats{}; 716 var v = Validator.init(std.testing.allocator, &stats, std.testing.io); 717 defer v.deinit(); 718 719 const payload: zat.cbor.Value = .{ .map = &.{ 720 .{ .key = "did", .value = .{ .text = "not-a-did" } }, 721 .{ .key = "blocks", .value = .{ .bytes = "deadbeef" } }, 722 } }; 723 724 const result = v.validateSync(payload); 725 try std.testing.expect(!result.valid); 726 try std.testing.expect(!result.skipped); 727 try std.testing.expectEqual(@as(u64, 1), stats.failed.load(.acquire)); 728} 729 730test "validateSync rejects missing blocks" { 731 var stats = broadcaster.Stats{}; 732 var v = Validator.init(std.testing.allocator, &stats, std.testing.io); 733 defer v.deinit(); 734 735 const payload: zat.cbor.Value = .{ .map = &.{ 736 .{ .key = "did", .value = .{ .text = "did:plc:test123" } }, 737 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, 738 } }; 739 740 const result = v.validateSync(payload); 741 try std.testing.expect(!result.valid); 742 try std.testing.expect(!result.skipped); 743 try std.testing.expectEqual(@as(u64, 1), stats.failed.load(.acquire)); 744} 745 746test "validateSync skips when no did field" { 747 var stats = broadcaster.Stats{}; 748 var v = Validator.init(std.testing.allocator, &stats, std.testing.io); 749 defer v.deinit(); 750 751 const payload: zat.cbor.Value = .{ .map = &.{ 752 .{ .key = "seq", .value = .{ .unsigned = 42 } }, 753 } }; 754 755 const result = v.validateSync(payload); 756 try std.testing.expect(result.valid); 757 try std.testing.expect(result.skipped); 758 try std.testing.expectEqual(@as(u64, 1), stats.skipped.load(.acquire)); 759} 760 761test "LRU cache evicts least recently used" { 762 var stats = broadcaster.Stats{}; 763 var v = Validator.init(std.testing.allocator, &stats, std.testing.io); 764 v.cache.capacity = 3; 765 defer v.deinit(); 766 767 const mk = CachedKey{ .key_type = .p256, .raw = .{0} ** 33, .len = 33 }; 768 769 try v.cache.put("did:plc:aaa", mk); 770 try v.cache.put("did:plc:bbb", mk); 771 try v.cache.put("did:plc:ccc", mk); 772 773 // access "aaa" to promote it 774 _ = v.cache.get("did:plc:aaa"); 775 776 // insert "ddd" — should evict "bbb" (LRU) 777 try v.cache.put("did:plc:ddd", mk); 778 779 try std.testing.expect(v.cache.get("did:plc:bbb") == null); 780 try std.testing.expect(v.cache.get("did:plc:aaa") != null); 781 try std.testing.expect(v.cache.get("did:plc:ccc") != null); 782 try std.testing.expect(v.cache.get("did:plc:ddd") != null); 783 try std.testing.expectEqual(@as(u32, 3), v.cache.count()); 784} 785 786test "checkCommitStructure rejects too many ops" { 787 var stats = broadcaster.Stats{}; 788 var v = Validator.initWithConfig(std.testing.allocator, &stats, .{ .max_ops = 2 }, std.testing.io); 789 defer v.deinit(); 790 791 // build ops array with 3 items (over limit of 2) 792 const ops = [_]zat.cbor.Value{ 793 .{ .map = &.{.{ .key = "action", .value = .{ .text = "create" } }} }, 794 .{ .map = &.{.{ .key = "action", .value = .{ .text = "create" } }} }, 795 .{ .map = &.{.{ .key = "action", .value = .{ .text = "create" } }} }, 796 }; 797 798 const payload: zat.cbor.Value = .{ .map = &.{ 799 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 800 .{ .key = "ops", .value = .{ .array = &ops } }, 801 } }; 802 803 try std.testing.expectError(error.InvalidFrame, v.checkCommitStructure(payload)); 804} 805 806// --- spec conformance tests --- 807 808test "spec: #commit blocks > 2,000,000 bytes rejected" { 809 // lexicon maxLength for #commit blocks: 2,000,000 810 var stats = broadcaster.Stats{}; 811 var v = Validator.init(std.testing.allocator, &stats, std.testing.io); 812 defer v.deinit(); 813 814 // insert a fake cached key so we reach the blocks size check 815 const did = "did:plc:test123"; 816 try v.cache.put(did, .{ 817 .key_type = .p256, 818 .raw = .{0} ** 33, 819 .len = 33, 820 .resolve_time = 100, 821 }); 822 823 // blocks with 2,000,001 bytes (1 byte over limit) 824 const oversized_blocks = try std.testing.allocator.alloc(u8, 2_000_001); 825 defer std.testing.allocator.free(oversized_blocks); 826 @memset(oversized_blocks, 0); 827 828 const payload: zat.cbor.Value = .{ .map = &.{ 829 .{ .key = "repo", .value = .{ .text = did } }, 830 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, 831 .{ .key = "blocks", .value = .{ .bytes = oversized_blocks } }, 832 } }; 833 834 const result = v.validateCommit(payload); 835 try std.testing.expect(!result.valid or result.skipped); 836} 837 838test "spec: #commit blocks = 2,000,000 bytes accepted (boundary)" { 839 // lexicon maxLength for #commit blocks: 2,000,000 — exactly at limit should pass size check 840 var stats = broadcaster.Stats{}; 841 var v = Validator.init(std.testing.allocator, &stats, std.testing.io); 842 defer v.deinit(); 843 844 const did = "did:plc:test123"; 845 try v.cache.put(did, .{ 846 .key_type = .p256, 847 .raw = .{0} ** 33, 848 .len = 33, 849 .resolve_time = 100, 850 }); 851 852 // exactly 2,000,000 bytes — should pass size check (may fail signature verify, that's ok) 853 const exact_blocks = try std.testing.allocator.alloc(u8, 2_000_000); 854 defer std.testing.allocator.free(exact_blocks); 855 @memset(exact_blocks, 0); 856 857 const payload: zat.cbor.Value = .{ .map = &.{ 858 .{ .key = "repo", .value = .{ .text = did } }, 859 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, 860 .{ .key = "blocks", .value = .{ .bytes = exact_blocks } }, 861 } }; 862 863 const result = v.validateCommit(payload); 864 // should not be rejected for size — may fail signature verification (that's fine, 865 // it means we passed the size check). with P1.1c, sig failure → skipped=true. 866 try std.testing.expect(result.valid or result.skipped); 867} 868 869test "spec: #sync blocks > 10,000 bytes rejected" { 870 // lexicon maxLength for #sync blocks: 10,000 871 var stats = broadcaster.Stats{}; 872 var v = Validator.init(std.testing.allocator, &stats, std.testing.io); 873 defer v.deinit(); 874 875 const payload: zat.cbor.Value = .{ .map = &.{ 876 .{ .key = "did", .value = .{ .text = "did:plc:test123" } }, 877 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, 878 .{ .key = "blocks", .value = .{ .bytes = &([_]u8{0} ** 10_001) } }, 879 } }; 880 881 const result = v.validateSync(payload); 882 try std.testing.expect(!result.valid); 883 try std.testing.expect(!result.skipped); 884} 885 886test "spec: #sync blocks = 10,000 bytes accepted (boundary)" { 887 // lexicon maxLength for #sync blocks: 10,000 — exactly at limit should pass size check 888 var stats = broadcaster.Stats{}; 889 var v = Validator.init(std.testing.allocator, &stats, std.testing.io); 890 defer v.deinit(); 891 892 const payload: zat.cbor.Value = .{ .map = &.{ 893 .{ .key = "did", .value = .{ .text = "did:plc:test123" } }, 894 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, 895 .{ .key = "blocks", .value = .{ .bytes = &([_]u8{0} ** 10_000) } }, 896 } }; 897 898 const result = v.validateSync(payload); 899 // should pass size check — will be a cache miss → skipped (no cached key) 900 try std.testing.expect(result.valid); 901 try std.testing.expect(result.skipped); 902} 903 904test "extractOps reads path field from firehose format" { 905 var stats = broadcaster.Stats{}; 906 var v = Validator.initWithConfig(std.testing.allocator, &stats, .{ .verify_commit_diff = true }, std.testing.io); 907 defer v.deinit(); 908 909 // use arena since extractOps allocates an ArrayList internally 910 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 911 defer arena.deinit(); 912 913 const ops = [_]zat.cbor.Value{ 914 .{ .map = &.{ 915 .{ .key = "action", .value = .{ .text = "create" } }, 916 .{ .key = "path", .value = .{ .text = "app.bsky.feed.post/3k2abc000000" } }, 917 .{ .key = "cid", .value = .{ .cid = .{ .raw = "fakecid12345" } } }, 918 } }, 919 .{ .map = &.{ 920 .{ .key = "action", .value = .{ .text = "delete" } }, 921 .{ .key = "path", .value = .{ .text = "app.bsky.feed.like/3k2def000000" } }, 922 } }, 923 }; 924 925 const payload: zat.cbor.Value = .{ .map = &.{ 926 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 927 .{ .key = "ops", .value = .{ .array = &ops } }, 928 } }; 929 930 const result = v.extractOps(arena.allocator(), payload); 931 try std.testing.expect(result != null); 932 try std.testing.expectEqual(@as(usize, 2), result.?.len); 933 try std.testing.expectEqualStrings("app.bsky.feed.post/3k2abc000000", result.?[0].path); 934 try std.testing.expectEqualStrings("app.bsky.feed.like/3k2def000000", result.?[1].path); 935 try std.testing.expect(result.?[0].value != null); // create has cid 936 try std.testing.expect(result.?[1].value == null); // delete has no cid 937} 938 939test "extractOps rejects malformed path without slash" { 940 var stats = broadcaster.Stats{}; 941 var v = Validator.initWithConfig(std.testing.allocator, &stats, .{ .verify_commit_diff = true }, std.testing.io); 942 defer v.deinit(); 943 944 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 945 defer arena.deinit(); 946 947 const ops = [_]zat.cbor.Value{ 948 .{ .map = &.{ 949 .{ .key = "action", .value = .{ .text = "create" } }, 950 .{ .key = "path", .value = .{ .text = "noslash" } }, 951 .{ .key = "cid", .value = .{ .cid = .{ .raw = "fakecid12345" } } }, 952 } }, 953 }; 954 955 const payload: zat.cbor.Value = .{ .map = &.{ 956 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 957 .{ .key = "ops", .value = .{ .array = &ops } }, 958 } }; 959 960 // malformed path (no slash) → all ops skipped → returns null 961 const result = v.extractOps(arena.allocator(), payload); 962 try std.testing.expect(result == null); 963} 964 965test "checkCommitStructure validates path field" { 966 var stats = broadcaster.Stats{}; 967 var v = Validator.init(std.testing.allocator, &stats, std.testing.io); 968 defer v.deinit(); 969 970 // valid path 971 const valid_ops = [_]zat.cbor.Value{ 972 .{ .map = &.{ 973 .{ .key = "action", .value = .{ .text = "create" } }, 974 .{ .key = "path", .value = .{ .text = "app.bsky.feed.post/3k2abc000000" } }, 975 } }, 976 }; 977 978 const valid_payload: zat.cbor.Value = .{ .map = &.{ 979 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 980 .{ .key = "ops", .value = .{ .array = &valid_ops } }, 981 } }; 982 983 try v.checkCommitStructure(valid_payload); 984 985 // invalid collection in path 986 const invalid_ops = [_]zat.cbor.Value{ 987 .{ .map = &.{ 988 .{ .key = "action", .value = .{ .text = "create" } }, 989 .{ .key = "path", .value = .{ .text = "not-an-nsid/3k2abc000000" } }, 990 } }, 991 }; 992 993 const invalid_payload: zat.cbor.Value = .{ .map = &.{ 994 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 995 .{ .key = "ops", .value = .{ .array = &invalid_ops } }, 996 } }; 997 998 try std.testing.expectError(error.InvalidFrame, v.checkCommitStructure(invalid_payload)); 999} 1000 1001test "queueResolve deduplicates repeated DIDs" { 1002 var stats = broadcaster.Stats{}; 1003 var v = Validator.init(std.testing.allocator, &stats, std.testing.io); 1004 defer v.deinit(); 1005 1006 // queue the same DID 100 times 1007 for (0..100) |_| { 1008 v.queueResolve("did:plc:duplicate"); 1009 } 1010 1011 // should have exactly 1 entry, not 100 1012 try std.testing.expectEqual(@as(usize, 1), v.queue.items.len); 1013 try std.testing.expectEqual(@as(u32, 1), v.queued_set.count()); 1014}