atproto relay implementation in zig
zlay.waow.tech
1//! relay frame validator — DID key resolution + real signature verification
2//!
3//! validates firehose commit frames by verifying the commit signature against
4//! the pre-resolved signing key for the DID. accepts pre-decoded CBOR payload
5//! from the subscriber (decoded via zat SDK). on cache miss, skips validation
6//! and queues background resolution. no frame is ever blocked on network I/O.
7
8const std = @import("std");
9const Io = std.Io;
10const zat = @import("zat");
11const broadcaster = @import("broadcaster.zig");
12const event_log_mod = @import("event_log.zig");
13const lru = @import("lru.zig");
14
15const Allocator = std.mem.Allocator;
16const log = std.log.scoped(.relay);
17
18/// decoded and cached signing key for a DID
19const CachedKey = struct {
20 key_type: zat.multicodec.KeyType,
21 raw: [33]u8, // compressed public key (secp256k1 or p256)
22 len: u8,
23 resolve_time: i64 = 0, // epoch seconds when resolved
24};
25
26pub const ValidationResult = struct {
27 valid: bool,
28 skipped: bool,
29 data_cid: ?[]const u8 = null, // MST root CID from verified commit
30 commit_rev: ?[]const u8 = null, // rev from verified commit
31};
32
33/// configuration for commit validation checks
34pub const ValidatorConfig = struct {
35 /// verify MST structure during signature verification
36 verify_mst: bool = false, // off by default for relay throughput
37 /// verify commit diffs via MST inversion (sync 1.1)
38 verify_commit_diff: bool = false,
39 /// max allowed operations per commit
40 max_ops: usize = 200,
41 /// max clock skew for rev timestamps (seconds)
42 rev_clock_skew: i64 = 300, // 5 minutes
43};
44
45pub const Validator = struct {
46 allocator: Allocator,
47 stats: *broadcaster.Stats,
48 config: ValidatorConfig,
49 persist: ?*event_log_mod.DiskPersist = null,
50 // DID → signing key cache (decoded, ready for verification)
51 cache: lru.LruCache(CachedKey),
52 // background resolve queue
53 queue: std.ArrayListUnmanaged([]const u8) = .empty,
54 // in-flight set — prevents duplicate DID entries in the queue
55 queued_set: std.StringHashMapUnmanaged(void) = .empty,
56 queue_mutex: Io.Mutex = Io.Mutex.init,
57 queue_cond: Io.Condition = Io.Condition.init,
58 resolver_futures: [max_resolver_threads]?Io.Future(void) = .{null} ** max_resolver_threads,
59 alive: std.atomic.Value(bool) = .{ .raw = true },
60 max_cache_size: u32 = 250_000,
61 io: Io,
62 // pool of reusable resolvers for inline host authority checks.
63 // frame workers acquire/release via atomic flag to avoid creating
64 // a fresh resolver (and fresh TLS handshake) per call.
65 host_resolvers: [host_resolver_pool_size]zat.DidResolver = undefined,
66 host_resolver_available: [host_resolver_pool_size]std.atomic.Value(bool) = .{std.atomic.Value(bool){ .raw = false }} ** host_resolver_pool_size,
67 host_resolver_inited: bool = false,
68
69 const max_resolver_threads = 8;
70 const default_resolver_threads = 4;
71 const max_queue_size: usize = 100_000;
72 const host_resolver_pool_size: usize = 4;
73
74 pub fn init(allocator: Allocator, stats: *broadcaster.Stats, io: Io) Validator {
75 return initWithConfig(allocator, stats, .{}, io);
76 }
77
78 pub fn initWithConfig(allocator: Allocator, stats: *broadcaster.Stats, config: ValidatorConfig, io: Io) Validator {
79 return .{
80 .allocator = allocator,
81 .stats = stats,
82 .config = config,
83 .cache = lru.LruCache(CachedKey).init(allocator, 250_000, io),
84 .io = io,
85 };
86 }
87
88 pub fn deinit(self: *Validator) void {
89 self.alive.store(false, .release);
90 self.queue_cond.broadcast(self.io);
91 for (&self.resolver_futures) |*slot| {
92 if (slot.*) |*f| {
93 f.cancel(self.io);
94 }
95 slot.* = null;
96 }
97
98 if (self.host_resolver_inited) {
99 for (&self.host_resolvers) |*r| {
100 r.deinit();
101 }
102 self.host_resolver_inited = false;
103 }
104
105 self.cache.deinit();
106
107 // free queued DIDs
108 for (self.queue.items) |did| {
109 self.allocator.free(did);
110 }
111 self.queue.deinit(self.allocator);
112 self.queued_set.deinit(self.allocator);
113 }
114
115 /// start background resolver threads and host authority resolver pool
116 pub fn start(self: *Validator) !void {
117 self.max_cache_size = parseEnvInt(u32, "VALIDATOR_CACHE_SIZE", self.max_cache_size);
118 self.cache.capacity = self.max_cache_size;
119 const n = parseEnvInt(u8, "RESOLVER_THREADS", default_resolver_threads);
120 const count = @min(n, max_resolver_threads);
121 for (self.resolver_futures[0..count]) |*slot| {
122 slot.* = try self.io.concurrent(resolveLoop, .{self});
123 }
124
125 // init host authority resolver pool (reused across calls)
126 for (&self.host_resolvers) |*r| {
127 r.* = zat.DidResolver.initWithOptions(self.io, self.allocator, .{});
128 }
129 for (&self.host_resolver_available) |*a| {
130 a.store(true, .release);
131 }
132 self.host_resolver_inited = true;
133 }
134
135 /// validate a #sync frame: signature verification only (no ops, no MST).
136 /// #sync resets a repo to a new commit state — used for recovery from broken streams.
137 /// on cache miss, queues background resolution and skips.
138 pub fn validateSync(self: *Validator, payload: zat.cbor.Value) ValidationResult {
139 const did = payload.getString("did") orelse {
140 _ = self.stats.skipped.fetchAdd(1, .monotonic);
141 return .{ .valid = true, .skipped = true };
142 };
143
144 if (zat.Did.parse(did) == null) {
145 _ = self.stats.failed.fetchAdd(1, .monotonic);
146 _ = self.stats.failed_bad_did.fetchAdd(1, .monotonic);
147 return .{ .valid = false, .skipped = false };
148 }
149
150 // check rev is valid TID (if present)
151 if (payload.getString("rev")) |rev| {
152 if (zat.Tid.parse(rev) == null) {
153 _ = self.stats.failed.fetchAdd(1, .monotonic);
154 _ = self.stats.failed_bad_rev.fetchAdd(1, .monotonic);
155 return .{ .valid = false, .skipped = false };
156 }
157 }
158
159 const blocks = payload.getBytes("blocks") orelse {
160 _ = self.stats.failed.fetchAdd(1, .monotonic);
161 _ = self.stats.failed_missing_blocks.fetchAdd(1, .monotonic);
162 return .{ .valid = false, .skipped = false };
163 };
164
165 // #sync CAR should be small (just the signed commit block)
166 // lexicon maxLength: 10000
167 if (blocks.len > 10_000) {
168 _ = self.stats.failed.fetchAdd(1, .monotonic);
169 _ = self.stats.failed_oversized_blocks.fetchAdd(1, .monotonic);
170 return .{ .valid = false, .skipped = false };
171 }
172
173 // cache lookup
174 const cached_key: ?CachedKey = self.cache.get(did);
175
176 if (cached_key == null) {
177 _ = self.stats.cache_misses.fetchAdd(1, .monotonic);
178 _ = self.stats.skipped.fetchAdd(1, .monotonic);
179 self.queueResolve(did);
180 return .{ .valid = true, .skipped = true };
181 }
182
183 _ = self.stats.cache_hits.fetchAdd(1, .monotonic);
184
185 // verify signature (no MST, no ops)
186 const public_key = zat.multicodec.PublicKey{
187 .key_type = cached_key.?.key_type,
188 .raw = cached_key.?.raw[0..cached_key.?.len],
189 };
190
191 var arena = std.heap.ArenaAllocator.init(self.allocator);
192 defer arena.deinit();
193
194 const result = zat.verifyCommitCar(arena.allocator(), blocks, public_key, .{
195 .verify_mst = false,
196 .expected_did = did,
197 .max_car_size = 10 * 1024,
198 }) catch |err| {
199 log.debug("sync verification failed for {s}: {s}", .{ did, @errorName(err) });
200 // sync spec: on signature failure, key may have rotated.
201 // evict cached key and queue re-resolution. skip this frame.
202 self.evictKey(did);
203 self.queueResolve(did);
204 _ = self.stats.skipped.fetchAdd(1, .monotonic);
205 return .{ .valid = true, .skipped = true };
206 };
207
208 _ = self.stats.validated.fetchAdd(1, .monotonic);
209 return .{
210 .valid = true,
211 .skipped = false,
212 .data_cid = result.commit_cid,
213 .commit_rev = result.commit_rev,
214 };
215 }
216
217 /// validate a commit frame using a pre-decoded CBOR payload (from SDK decoder).
218 /// on cache miss, queues background resolution and skips.
219 pub fn validateCommit(self: *Validator, payload: zat.cbor.Value) ValidationResult {
220 // extract DID from decoded payload
221 const did = payload.getString("repo") orelse {
222 _ = self.stats.skipped.fetchAdd(1, .monotonic);
223 return .{ .valid = true, .skipped = true };
224 };
225
226 // check cache for pre-resolved signing key
227 const cached_key: ?CachedKey = self.cache.get(did);
228
229 if (cached_key == null) {
230 // cache miss — queue for background resolution, skip validation
231 _ = self.stats.cache_misses.fetchAdd(1, .monotonic);
232 _ = self.stats.skipped.fetchAdd(1, .monotonic);
233 self.queueResolve(did);
234 return .{ .valid = true, .skipped = true };
235 }
236
237 _ = self.stats.cache_hits.fetchAdd(1, .monotonic);
238
239 // cache hit — do structure checks + signature verification
240 if (self.verifyCommit(payload, did, cached_key.?)) |vr| {
241 _ = self.stats.validated.fetchAdd(1, .monotonic);
242 return vr;
243 } else |err| {
244 log.debug("commit verification failed for {s}: {s}", .{ did, @errorName(err) });
245 // sync spec: on signature failure, key may have rotated.
246 // evict cached key and queue re-resolution. skip this frame
247 // (treat as cache miss). next commit will use the refreshed key.
248 self.evictKey(did);
249 self.queueResolve(did);
250 _ = self.stats.skipped.fetchAdd(1, .monotonic);
251 return .{ .valid = true, .skipped = true };
252 }
253 }
254
255 fn verifyCommit(self: *Validator, payload: zat.cbor.Value, expected_did: []const u8, cached_key: CachedKey) !ValidationResult {
256 // commit structure checks first (cheap, no allocation)
257 self.checkCommitStructure(payload) catch {
258 _ = self.stats.failed_bad_structure.fetchAdd(1, .monotonic);
259 return error.InvalidFrame;
260 };
261
262 // extract blocks (raw CAR bytes) from the pre-decoded payload
263 const blocks = payload.getBytes("blocks") orelse return error.InvalidFrame;
264
265 // blocks size check — lexicon maxLength: 2000000
266 if (blocks.len > 2_000_000) return error.InvalidFrame;
267
268 // build public key for verification
269 const public_key = zat.multicodec.PublicKey{
270 .key_type = cached_key.key_type,
271 .raw = cached_key.raw[0..cached_key.len],
272 };
273
274 // run real signature verification (needs its own arena for CAR/MST temporaries)
275 var arena = std.heap.ArenaAllocator.init(self.allocator);
276 defer arena.deinit();
277 const alloc = arena.allocator();
278
279 // try sync 1.1 path: extract ops and use verifyCommitDiff
280 if (self.config.verify_commit_diff) {
281 if (self.extractOps(alloc, payload)) |msg_ops| {
282 // get stored prev_data from payload
283 const prev_data: ?[]const u8 = if (payload.get("prevData")) |pd| switch (pd) {
284 .cid => |c| c.raw,
285 .null => null,
286 else => null,
287 } else null;
288
289 const diff_result = zat.verifyCommitDiff(alloc, blocks, msg_ops, prev_data, public_key, .{
290 .expected_did = expected_did,
291 .skip_inversion = prev_data == null,
292 }) catch |err| {
293 return err;
294 };
295
296 return .{
297 .valid = true,
298 .skipped = false,
299 .data_cid = diff_result.data_cid,
300 .commit_rev = diff_result.commit_rev,
301 };
302 }
303 }
304
305 // fallback: legacy verification (signature + optional MST walk)
306 const result = zat.verifyCommitCar(alloc, blocks, public_key, .{
307 .verify_mst = self.config.verify_mst,
308 .expected_did = expected_did,
309 }) catch |err| {
310 return err;
311 };
312
313 return .{
314 .valid = true,
315 .skipped = false,
316 .data_cid = result.commit_cid,
317 .commit_rev = result.commit_rev,
318 };
319 }
320
321 /// extract ops from payload and convert to mst.Operation array.
322 /// the firehose format uses a single "path" field ("collection/rkey"),
323 /// not separate "collection"/"rkey" fields.
324 fn extractOps(self: *Validator, alloc: Allocator, payload: zat.cbor.Value) ?[]const zat.MstOperation {
325 _ = self;
326 const ops_array = payload.getArray("ops") orelse return null;
327 var ops: std.ArrayListUnmanaged(zat.MstOperation) = .empty;
328 for (ops_array) |op| {
329 const action = op.getString("action") orelse continue;
330 const path = op.getString("path") orelse continue;
331
332 // validate path contains "/" (collection/rkey)
333 if (std.mem.indexOfScalar(u8, path, '/') == null) continue;
334
335 // extract CID values
336 const cid_value: ?[]const u8 = if (op.get("cid")) |v| switch (v) {
337 .cid => |c| c.raw,
338 else => null,
339 } else null;
340
341 var value: ?[]const u8 = null;
342 var prev: ?[]const u8 = null;
343
344 if (std.mem.eql(u8, action, "create")) {
345 value = cid_value;
346 } else if (std.mem.eql(u8, action, "update")) {
347 value = cid_value;
348 prev = if (op.get("prev")) |v| switch (v) {
349 .cid => |c| c.raw,
350 else => null,
351 } else null;
352 } else if (std.mem.eql(u8, action, "delete")) {
353 prev = if (op.get("prev")) |v| switch (v) {
354 .cid => |c| c.raw,
355 else => null,
356 } else null;
357 } else continue;
358
359 ops.append(alloc, .{
360 .path = path,
361 .value = value,
362 .prev = prev,
363 }) catch return null;
364 }
365
366 if (ops.items.len == 0) return null;
367 return ops.items;
368 }
369
370 fn checkCommitStructure(self: *Validator, payload: zat.cbor.Value) !void {
371 // check repo field is a valid DID
372 const repo = payload.getString("repo") orelse return error.InvalidFrame;
373 if (zat.Did.parse(repo) == null) return error.InvalidFrame;
374
375 // check rev is a valid TID
376 if (payload.getString("rev")) |rev| {
377 if (zat.Tid.parse(rev) == null) return error.InvalidFrame;
378 }
379
380 // check ops count
381 if (payload.get("ops")) |ops_value| {
382 switch (ops_value) {
383 .array => |ops| {
384 if (ops.len > self.config.max_ops) return error.InvalidFrame;
385 // validate each op has valid path (collection/rkey)
386 for (ops) |op| {
387 if (op.getString("path")) |path| {
388 if (std.mem.indexOfScalar(u8, path, '/')) |sep| {
389 const collection = path[0..sep];
390 const rkey = path[sep + 1 ..];
391 if (zat.Nsid.parse(collection) == null) return error.InvalidFrame;
392 if (rkey.len > 0) {
393 if (zat.Rkey.parse(rkey) == null) return error.InvalidFrame;
394 }
395 } else return error.InvalidFrame; // path must contain '/'
396 }
397 }
398 },
399 else => return error.InvalidFrame,
400 }
401 }
402 }
403
404 fn queueResolve(self: *Validator, did: []const u8) void {
405 // check if already cached (race between validate and resolver)
406 if (self.cache.contains(did)) return;
407
408 const duped = self.allocator.dupe(u8, did) catch return;
409
410 self.queue_mutex.lockUncancelable(self.io);
411 defer self.queue_mutex.unlock(self.io);
412
413 // skip if already queued (prevents unbounded queue growth)
414 if (self.queued_set.contains(duped)) {
415 self.allocator.free(duped);
416 return;
417 }
418
419 // cap queue size — drop DID without adding to queued_set so it can be re-queued later
420 if (self.queue.items.len >= max_queue_size) {
421 self.allocator.free(duped);
422 return;
423 }
424
425 self.queue.append(self.allocator, duped) catch {
426 self.allocator.free(duped);
427 return;
428 };
429 self.queued_set.put(self.allocator, duped, {}) catch {};
430 self.queue_cond.signal(self.io);
431 }
432
433 fn resolveLoop(self: *Validator) void {
434 var resolver = zat.DidResolver.initWithOptions(self.io, self.allocator, .{ .keep_alive = true });
435 defer resolver.deinit();
436
437 while (self.alive.load(.acquire)) {
438 var did: ?[]const u8 = null;
439 {
440 self.queue_mutex.lockUncancelable(self.io);
441 defer self.queue_mutex.unlock(self.io);
442 while (self.queue.items.len == 0 and self.alive.load(.acquire)) {
443 self.queue_cond.waitUncancelable(self.io, &self.queue_mutex);
444 }
445 if (self.queue.items.len > 0) {
446 did = self.queue.orderedRemove(0);
447 _ = self.queued_set.remove(did.?);
448 }
449 }
450
451 const d = did orelse continue;
452 defer self.allocator.free(d);
453
454 // skip if already cached (resolved while queued)
455 if (self.cache.contains(d)) continue;
456
457 // resolve DID → signing key
458 const parsed = zat.Did.parse(d) orelse continue;
459 var doc = resolver.resolve(parsed) catch |err| {
460 log.debug("DID resolve failed for {s}: {s}", .{ d, @errorName(err) });
461 continue;
462 };
463 defer doc.deinit();
464
465 // extract and decode signing key
466 const vm = doc.signingKey() orelse continue;
467 const key_bytes = zat.multibase.decode(self.allocator, vm.public_key_multibase) catch continue;
468 defer self.allocator.free(key_bytes);
469 const public_key = zat.multicodec.parsePublicKey(key_bytes) catch continue;
470
471 // store decoded key in cache (fixed-size, no pointer chasing)
472 var cached = CachedKey{
473 .key_type = public_key.key_type,
474 .raw = undefined,
475 .len = @intCast(public_key.raw.len),
476 .resolve_time = timestamp(self.io),
477 };
478 @memcpy(cached.raw[0..public_key.raw.len], public_key.raw);
479
480 self.cache.put(d, cached) catch continue;
481
482 // --- host validation (merged from migration queue) ---
483 // while we have the DID doc, check PDS endpoint and update host if needed.
484 // best-effort: failures don't prevent signing key caching.
485 if (self.persist) |persist| {
486 if (doc.pdsEndpoint()) |pds_endpoint| {
487 if (extractHostFromUrl(pds_endpoint)) |pds_host| {
488 const pds_host_id = (persist.getHostIdForHostname(pds_host) catch null) orelse continue;
489 const uid = persist.uidForDid(d) catch continue;
490 const current_host = persist.getAccountHostId(uid) catch continue;
491 if (current_host != 0 and current_host != pds_host_id) {
492 persist.setAccountHostId(uid, pds_host_id) catch {};
493 log.info("host updated via DID doc: {s} -> host {d}", .{ d, pds_host_id });
494 }
495 }
496 }
497 }
498 }
499 }
500
501 /// evict a DID's cached signing key (e.g. on #identity event).
502 /// the next commit from this DID will trigger a fresh resolution.
503 pub fn evictKey(self: *Validator, did: []const u8) void {
504 _ = self.cache.remove(did);
505 }
506
507 /// cache size (for diagnostics)
508 pub fn cacheSize(self: *Validator) u32 {
509 return self.cache.count();
510 }
511
512 /// resolve queue length (for diagnostics — non-blocking)
513 pub fn resolveQueueLen(self: *Validator) usize {
514 if (!self.queue_mutex.tryLock()) return 0;
515 defer self.queue_mutex.unlock(self.io);
516 return self.queue.items.len;
517 }
518
519 /// resolve dedup set size (for diagnostics — non-blocking)
520 pub fn resolveQueuedSetCount(self: *Validator) u32 {
521 if (!self.queue_mutex.tryLock()) return 0;
522 defer self.queue_mutex.unlock(self.io);
523 return self.queued_set.count();
524 }
525
526 /// signing key cache hashmap backing capacity (for memory attribution)
527 pub fn cacheMapCapacity(self: *Validator) u32 {
528 return self.cache.mapCapacity();
529 }
530
531 /// resolver dedup set hashmap backing capacity (for memory attribution — non-blocking)
532 pub fn resolveQueuedSetCapacity(self: *Validator) u32 {
533 if (!self.queue_mutex.tryLock()) return 0;
534 defer self.queue_mutex.unlock(self.io);
535 return self.queued_set.capacity();
536 }
537
538 pub const HostAuthority = enum { accept, migrate, reject };
539
540 /// synchronous host authority check. called on first-seen DIDs (is_new)
541 /// and host migrations (host_changed). resolves the DID doc to verify the
542 /// PDS endpoint matches the incoming host. retries once on failure to
543 /// handle transient network errors.
544 ///
545 /// uses a pooled resolver to avoid creating a fresh resolver (and fresh
546 /// TLS handshake) per call. blocks briefly if all pool slots are in use.
547 ///
548 /// returns:
549 /// .accept — should not happen (caller should only call on new/mismatch)
550 /// .migrate — DID doc confirms this host, caller should update host_id
551 /// .reject — DID doc does not confirm, caller should drop the event
552 pub fn resolveHostAuthority(self: *Validator, did: []const u8, incoming_host_id: u64) HostAuthority {
553 const persist = self.persist orelse return .migrate; // no DB — can't check
554 const parsed = zat.Did.parse(did) orelse return .reject;
555
556 const idx = self.acquireHostResolver();
557 defer self.releaseHostResolver(idx);
558
559 var resolver = &self.host_resolvers[idx];
560
561 // first resolve attempt
562 var doc = resolver.resolve(parsed) catch {
563 // retry once on network failure
564 var doc2 = resolver.resolve(parsed) catch return .reject;
565 defer doc2.deinit();
566 return self.checkPdsHost(&doc2, persist, incoming_host_id);
567 };
568 defer doc.deinit();
569 return self.checkPdsHost(&doc, persist, incoming_host_id);
570 }
571
572 /// acquire a resolver from the pool. spins until one is available.
573 fn acquireHostResolver(self: *Validator) usize {
574 while (self.alive.load(.acquire)) {
575 for (0..host_resolver_pool_size) |i| {
576 if (self.host_resolver_available[i].cmpxchgStrong(true, false, .acquire, .monotonic) == null) {
577 return i;
578 }
579 }
580 self.io.sleep(Io.Duration.fromMilliseconds(1), .awake) catch {};
581 }
582 return 0; // shutdown path — caller will exit soon
583 }
584
585 fn releaseHostResolver(self: *Validator, idx: usize) void {
586 self.host_resolver_available[idx].store(true, .release);
587 }
588
589 fn checkPdsHost(self: *Validator, doc: *zat.DidDocument, persist: *event_log_mod.DiskPersist, incoming_host_id: u64) HostAuthority {
590 _ = self;
591 const pds_endpoint = doc.pdsEndpoint() orelse return .reject;
592 const pds_host = extractHostFromUrl(pds_endpoint) orelse return .reject;
593 const pds_host_id = (persist.getHostIdForHostname(pds_host) catch null) orelse return .reject;
594 if (pds_host_id == incoming_host_id) return .migrate;
595 return .reject;
596 }
597};
598
599/// extract hostname from a URL like "https://pds.example.com" or "https://pds.example.com:443/path"
600pub fn extractHostFromUrl(url: []const u8) ?[]const u8 {
601 // strip scheme
602 var rest = url;
603 if (std.mem.startsWith(u8, rest, "https://")) {
604 rest = rest["https://".len..];
605 } else if (std.mem.startsWith(u8, rest, "http://")) {
606 rest = rest["http://".len..];
607 }
608 // strip path
609 if (std.mem.indexOfScalar(u8, rest, '/')) |i| {
610 rest = rest[0..i];
611 }
612 // strip port
613 if (std.mem.indexOfScalar(u8, rest, ':')) |i| {
614 rest = rest[0..i];
615 }
616 if (rest.len == 0) return null;
617 return rest;
618}
619
620fn getenv(key: [*:0]const u8) ?[]const u8 {
621 const ptr = std.c.getenv(key) orelse return null;
622 return std.mem.sliceTo(ptr, 0);
623}
624
625fn parseEnvInt(comptime T: type, key: [*:0]const u8, default: T) T {
626 const val = getenv(key) orelse return default;
627 return std.fmt.parseInt(T, val, 10) catch default;
628}
629
630fn timestamp(io: Io) i64 {
631 return @intCast(@divFloor(Io.Timestamp.now(io, .real).nanoseconds, std.time.ns_per_s));
632}
633
634// --- tests ---
635
636test "validateCommit skips on cache miss" {
637 var stats = broadcaster.Stats{};
638 var v = Validator.init(std.testing.allocator, &stats, std.testing.io);
639 defer v.deinit();
640
641 // build a commit payload using SDK
642 const payload: zat.cbor.Value = .{ .map = &.{
643 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } },
644 .{ .key = "seq", .value = .{ .unsigned = 42 } },
645 .{ .key = "rev", .value = .{ .text = "3k2abc000000" } },
646 .{ .key = "time", .value = .{ .text = "2024-01-15T10:30:00Z" } },
647 } };
648
649 const result = v.validateCommit(payload);
650 try std.testing.expect(result.valid);
651 try std.testing.expect(result.skipped);
652 try std.testing.expectEqual(@as(u64, 1), stats.cache_misses.load(.acquire));
653}
654
655test "validateCommit skips when no repo field" {
656 var stats = broadcaster.Stats{};
657 var v = Validator.init(std.testing.allocator, &stats, std.testing.io);
658 defer v.deinit();
659
660 // payload without "repo" field
661 const payload: zat.cbor.Value = .{ .map = &.{
662 .{ .key = "seq", .value = .{ .unsigned = 42 } },
663 } };
664
665 const result = v.validateCommit(payload);
666 try std.testing.expect(result.valid);
667 try std.testing.expect(result.skipped);
668 try std.testing.expectEqual(@as(u64, 1), stats.skipped.load(.acquire));
669}
670
671test "checkCommitStructure rejects invalid DID" {
672 var stats = broadcaster.Stats{};
673 var v = Validator.init(std.testing.allocator, &stats, std.testing.io);
674 defer v.deinit();
675
676 const payload: zat.cbor.Value = .{ .map = &.{
677 .{ .key = "repo", .value = .{ .text = "not-a-did" } },
678 } };
679
680 try std.testing.expectError(error.InvalidFrame, v.checkCommitStructure(payload));
681}
682
683test "checkCommitStructure accepts valid commit" {
684 var stats = broadcaster.Stats{};
685 var v = Validator.init(std.testing.allocator, &stats, std.testing.io);
686 defer v.deinit();
687
688 const payload: zat.cbor.Value = .{ .map = &.{
689 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } },
690 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } },
691 } };
692
693 try v.checkCommitStructure(payload);
694}
695
696test "validateSync skips on cache miss" {
697 var stats = broadcaster.Stats{};
698 var v = Validator.init(std.testing.allocator, &stats, std.testing.io);
699 defer v.deinit();
700
701 const payload: zat.cbor.Value = .{ .map = &.{
702 .{ .key = "did", .value = .{ .text = "did:plc:test123" } },
703 .{ .key = "seq", .value = .{ .unsigned = 42 } },
704 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } },
705 .{ .key = "blocks", .value = .{ .bytes = "deadbeef" } },
706 } };
707
708 const result = v.validateSync(payload);
709 try std.testing.expect(result.valid);
710 try std.testing.expect(result.skipped);
711 try std.testing.expectEqual(@as(u64, 1), stats.cache_misses.load(.acquire));
712}
713
714test "validateSync rejects invalid DID" {
715 var stats = broadcaster.Stats{};
716 var v = Validator.init(std.testing.allocator, &stats, std.testing.io);
717 defer v.deinit();
718
719 const payload: zat.cbor.Value = .{ .map = &.{
720 .{ .key = "did", .value = .{ .text = "not-a-did" } },
721 .{ .key = "blocks", .value = .{ .bytes = "deadbeef" } },
722 } };
723
724 const result = v.validateSync(payload);
725 try std.testing.expect(!result.valid);
726 try std.testing.expect(!result.skipped);
727 try std.testing.expectEqual(@as(u64, 1), stats.failed.load(.acquire));
728}
729
730test "validateSync rejects missing blocks" {
731 var stats = broadcaster.Stats{};
732 var v = Validator.init(std.testing.allocator, &stats, std.testing.io);
733 defer v.deinit();
734
735 const payload: zat.cbor.Value = .{ .map = &.{
736 .{ .key = "did", .value = .{ .text = "did:plc:test123" } },
737 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } },
738 } };
739
740 const result = v.validateSync(payload);
741 try std.testing.expect(!result.valid);
742 try std.testing.expect(!result.skipped);
743 try std.testing.expectEqual(@as(u64, 1), stats.failed.load(.acquire));
744}
745
746test "validateSync skips when no did field" {
747 var stats = broadcaster.Stats{};
748 var v = Validator.init(std.testing.allocator, &stats, std.testing.io);
749 defer v.deinit();
750
751 const payload: zat.cbor.Value = .{ .map = &.{
752 .{ .key = "seq", .value = .{ .unsigned = 42 } },
753 } };
754
755 const result = v.validateSync(payload);
756 try std.testing.expect(result.valid);
757 try std.testing.expect(result.skipped);
758 try std.testing.expectEqual(@as(u64, 1), stats.skipped.load(.acquire));
759}
760
761test "LRU cache evicts least recently used" {
762 var stats = broadcaster.Stats{};
763 var v = Validator.init(std.testing.allocator, &stats, std.testing.io);
764 v.cache.capacity = 3;
765 defer v.deinit();
766
767 const mk = CachedKey{ .key_type = .p256, .raw = .{0} ** 33, .len = 33 };
768
769 try v.cache.put("did:plc:aaa", mk);
770 try v.cache.put("did:plc:bbb", mk);
771 try v.cache.put("did:plc:ccc", mk);
772
773 // access "aaa" to promote it
774 _ = v.cache.get("did:plc:aaa");
775
776 // insert "ddd" — should evict "bbb" (LRU)
777 try v.cache.put("did:plc:ddd", mk);
778
779 try std.testing.expect(v.cache.get("did:plc:bbb") == null);
780 try std.testing.expect(v.cache.get("did:plc:aaa") != null);
781 try std.testing.expect(v.cache.get("did:plc:ccc") != null);
782 try std.testing.expect(v.cache.get("did:plc:ddd") != null);
783 try std.testing.expectEqual(@as(u32, 3), v.cache.count());
784}
785
786test "checkCommitStructure rejects too many ops" {
787 var stats = broadcaster.Stats{};
788 var v = Validator.initWithConfig(std.testing.allocator, &stats, .{ .max_ops = 2 }, std.testing.io);
789 defer v.deinit();
790
791 // build ops array with 3 items (over limit of 2)
792 const ops = [_]zat.cbor.Value{
793 .{ .map = &.{.{ .key = "action", .value = .{ .text = "create" } }} },
794 .{ .map = &.{.{ .key = "action", .value = .{ .text = "create" } }} },
795 .{ .map = &.{.{ .key = "action", .value = .{ .text = "create" } }} },
796 };
797
798 const payload: zat.cbor.Value = .{ .map = &.{
799 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } },
800 .{ .key = "ops", .value = .{ .array = &ops } },
801 } };
802
803 try std.testing.expectError(error.InvalidFrame, v.checkCommitStructure(payload));
804}
805
806// --- spec conformance tests ---
807
808test "spec: #commit blocks > 2,000,000 bytes rejected" {
809 // lexicon maxLength for #commit blocks: 2,000,000
810 var stats = broadcaster.Stats{};
811 var v = Validator.init(std.testing.allocator, &stats, std.testing.io);
812 defer v.deinit();
813
814 // insert a fake cached key so we reach the blocks size check
815 const did = "did:plc:test123";
816 try v.cache.put(did, .{
817 .key_type = .p256,
818 .raw = .{0} ** 33,
819 .len = 33,
820 .resolve_time = 100,
821 });
822
823 // blocks with 2,000,001 bytes (1 byte over limit)
824 const oversized_blocks = try std.testing.allocator.alloc(u8, 2_000_001);
825 defer std.testing.allocator.free(oversized_blocks);
826 @memset(oversized_blocks, 0);
827
828 const payload: zat.cbor.Value = .{ .map = &.{
829 .{ .key = "repo", .value = .{ .text = did } },
830 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } },
831 .{ .key = "blocks", .value = .{ .bytes = oversized_blocks } },
832 } };
833
834 const result = v.validateCommit(payload);
835 try std.testing.expect(!result.valid or result.skipped);
836}
837
838test "spec: #commit blocks = 2,000,000 bytes accepted (boundary)" {
839 // lexicon maxLength for #commit blocks: 2,000,000 — exactly at limit should pass size check
840 var stats = broadcaster.Stats{};
841 var v = Validator.init(std.testing.allocator, &stats, std.testing.io);
842 defer v.deinit();
843
844 const did = "did:plc:test123";
845 try v.cache.put(did, .{
846 .key_type = .p256,
847 .raw = .{0} ** 33,
848 .len = 33,
849 .resolve_time = 100,
850 });
851
852 // exactly 2,000,000 bytes — should pass size check (may fail signature verify, that's ok)
853 const exact_blocks = try std.testing.allocator.alloc(u8, 2_000_000);
854 defer std.testing.allocator.free(exact_blocks);
855 @memset(exact_blocks, 0);
856
857 const payload: zat.cbor.Value = .{ .map = &.{
858 .{ .key = "repo", .value = .{ .text = did } },
859 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } },
860 .{ .key = "blocks", .value = .{ .bytes = exact_blocks } },
861 } };
862
863 const result = v.validateCommit(payload);
864 // should not be rejected for size — may fail signature verification (that's fine,
865 // it means we passed the size check). with P1.1c, sig failure → skipped=true.
866 try std.testing.expect(result.valid or result.skipped);
867}
868
869test "spec: #sync blocks > 10,000 bytes rejected" {
870 // lexicon maxLength for #sync blocks: 10,000
871 var stats = broadcaster.Stats{};
872 var v = Validator.init(std.testing.allocator, &stats, std.testing.io);
873 defer v.deinit();
874
875 const payload: zat.cbor.Value = .{ .map = &.{
876 .{ .key = "did", .value = .{ .text = "did:plc:test123" } },
877 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } },
878 .{ .key = "blocks", .value = .{ .bytes = &([_]u8{0} ** 10_001) } },
879 } };
880
881 const result = v.validateSync(payload);
882 try std.testing.expect(!result.valid);
883 try std.testing.expect(!result.skipped);
884}
885
886test "spec: #sync blocks = 10,000 bytes accepted (boundary)" {
887 // lexicon maxLength for #sync blocks: 10,000 — exactly at limit should pass size check
888 var stats = broadcaster.Stats{};
889 var v = Validator.init(std.testing.allocator, &stats, std.testing.io);
890 defer v.deinit();
891
892 const payload: zat.cbor.Value = .{ .map = &.{
893 .{ .key = "did", .value = .{ .text = "did:plc:test123" } },
894 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } },
895 .{ .key = "blocks", .value = .{ .bytes = &([_]u8{0} ** 10_000) } },
896 } };
897
898 const result = v.validateSync(payload);
899 // should pass size check — will be a cache miss → skipped (no cached key)
900 try std.testing.expect(result.valid);
901 try std.testing.expect(result.skipped);
902}
903
904test "extractOps reads path field from firehose format" {
905 var stats = broadcaster.Stats{};
906 var v = Validator.initWithConfig(std.testing.allocator, &stats, .{ .verify_commit_diff = true }, std.testing.io);
907 defer v.deinit();
908
909 // use arena since extractOps allocates an ArrayList internally
910 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
911 defer arena.deinit();
912
913 const ops = [_]zat.cbor.Value{
914 .{ .map = &.{
915 .{ .key = "action", .value = .{ .text = "create" } },
916 .{ .key = "path", .value = .{ .text = "app.bsky.feed.post/3k2abc000000" } },
917 .{ .key = "cid", .value = .{ .cid = .{ .raw = "fakecid12345" } } },
918 } },
919 .{ .map = &.{
920 .{ .key = "action", .value = .{ .text = "delete" } },
921 .{ .key = "path", .value = .{ .text = "app.bsky.feed.like/3k2def000000" } },
922 } },
923 };
924
925 const payload: zat.cbor.Value = .{ .map = &.{
926 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } },
927 .{ .key = "ops", .value = .{ .array = &ops } },
928 } };
929
930 const result = v.extractOps(arena.allocator(), payload);
931 try std.testing.expect(result != null);
932 try std.testing.expectEqual(@as(usize, 2), result.?.len);
933 try std.testing.expectEqualStrings("app.bsky.feed.post/3k2abc000000", result.?[0].path);
934 try std.testing.expectEqualStrings("app.bsky.feed.like/3k2def000000", result.?[1].path);
935 try std.testing.expect(result.?[0].value != null); // create has cid
936 try std.testing.expect(result.?[1].value == null); // delete has no cid
937}
938
939test "extractOps rejects malformed path without slash" {
940 var stats = broadcaster.Stats{};
941 var v = Validator.initWithConfig(std.testing.allocator, &stats, .{ .verify_commit_diff = true }, std.testing.io);
942 defer v.deinit();
943
944 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
945 defer arena.deinit();
946
947 const ops = [_]zat.cbor.Value{
948 .{ .map = &.{
949 .{ .key = "action", .value = .{ .text = "create" } },
950 .{ .key = "path", .value = .{ .text = "noslash" } },
951 .{ .key = "cid", .value = .{ .cid = .{ .raw = "fakecid12345" } } },
952 } },
953 };
954
955 const payload: zat.cbor.Value = .{ .map = &.{
956 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } },
957 .{ .key = "ops", .value = .{ .array = &ops } },
958 } };
959
960 // malformed path (no slash) → all ops skipped → returns null
961 const result = v.extractOps(arena.allocator(), payload);
962 try std.testing.expect(result == null);
963}
964
965test "checkCommitStructure validates path field" {
966 var stats = broadcaster.Stats{};
967 var v = Validator.init(std.testing.allocator, &stats, std.testing.io);
968 defer v.deinit();
969
970 // valid path
971 const valid_ops = [_]zat.cbor.Value{
972 .{ .map = &.{
973 .{ .key = "action", .value = .{ .text = "create" } },
974 .{ .key = "path", .value = .{ .text = "app.bsky.feed.post/3k2abc000000" } },
975 } },
976 };
977
978 const valid_payload: zat.cbor.Value = .{ .map = &.{
979 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } },
980 .{ .key = "ops", .value = .{ .array = &valid_ops } },
981 } };
982
983 try v.checkCommitStructure(valid_payload);
984
985 // invalid collection in path
986 const invalid_ops = [_]zat.cbor.Value{
987 .{ .map = &.{
988 .{ .key = "action", .value = .{ .text = "create" } },
989 .{ .key = "path", .value = .{ .text = "not-an-nsid/3k2abc000000" } },
990 } },
991 };
992
993 const invalid_payload: zat.cbor.Value = .{ .map = &.{
994 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } },
995 .{ .key = "ops", .value = .{ .array = &invalid_ops } },
996 } };
997
998 try std.testing.expectError(error.InvalidFrame, v.checkCommitStructure(invalid_payload));
999}
1000
1001test "queueResolve deduplicates repeated DIDs" {
1002 var stats = broadcaster.Stats{};
1003 var v = Validator.init(std.testing.allocator, &stats, std.testing.io);
1004 defer v.deinit();
1005
1006 // queue the same DID 100 times
1007 for (0..100) |_| {
1008 v.queueResolve("did:plc:duplicate");
1009 }
1010
1011 // should have exactly 1 entry, not 100
1012 try std.testing.expectEqual(@as(usize, 1), v.queue.items.len);
1013 try std.testing.expectEqual(@as(u32, 1), v.queued_set.count());
1014}