atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: merge migration queue into DID resolve path, cap resolve queue at 100K

the migration queue grew unbounded during warm-up because resolveLoop
only drained it when the DID queue was empty (which never happened with
~1000 new DIDs/sec vs ~40/sec drain rate). RSS grew ~17 MB/min.

like indigo and rsky, host validation now happens inline when resolving
a DID document — no separate migration queue. the resolve queue is
capped at 100K entries; dropped DIDs can be re-queued on future cache
misses since they're not added to the dedup set.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz 37fa194f f0c7bafa

+49 -196
+2 -12
src/broadcaster.zig
··· 553 553 resolve_queued_set_count: usize = 0, 554 554 }; 555 555 556 - pub fn formatPrometheusMetrics(stats: *const Stats, cache_entries: usize, migration_queue_len: usize, migration_pending_count: u32, attribution: AttributionMetrics, data_dir: []const u8, buf: []u8) []const u8 { 556 + pub fn formatPrometheusMetrics(stats: *const Stats, cache_entries: usize, attribution: AttributionMetrics, data_dir: []const u8, buf: []u8) []const u8 { 557 557 const uptime: i64 = std.time.timestamp() - stats.start_time; 558 558 var fbs = std.io.fixedBufferStream(buf); 559 559 const w = fbs.writer(); ··· 609 609 \\# TYPE relay_validator_cache_entries gauge 610 610 \\relay_validator_cache_entries {d} 611 611 \\ 612 - \\# TYPE relay_validator_migration_queue gauge 613 - \\relay_validator_migration_queue {d} 614 - \\ 615 - \\# TYPE relay_validator_migration_pending gauge 616 - \\# HELP relay_validator_migration_pending DIDs suppressed from re-queueing migration checks 617 - \\relay_validator_migration_pending {d} 618 - \\ 619 612 \\# TYPE relay_validator_cache_evictions_total counter 620 613 \\relay_validator_cache_evictions_total {d} 621 614 \\ ··· 656 649 stats.relay_seq.load(.acquire), 657 650 uptime, 658 651 cache_entries, 659 - migration_queue_len, 660 - migration_pending_count, 661 652 stats.cache_evictions.load(.acquire), 662 653 attribution.history_entries, 663 654 attribution.evtbuf_entries, ··· 886 877 stats.cache_misses.store(100, .release); 887 878 888 879 var buf: [65536]u8 = undefined; 889 - const output = formatPrometheusMetrics(&stats, 42, 3, 7, .{}, "/tmp", &buf); 880 + const output = formatPrometheusMetrics(&stats, 42, .{}, "/tmp", &buf); 890 881 891 882 try std.testing.expect(std.mem.indexOf(u8, output, "relay_frames_received_total 10000") != null); 892 883 try std.testing.expect(std.mem.indexOf(u8, output, "relay_frames_broadcast_total 9000") != null); ··· 898 889 try std.testing.expect(std.mem.indexOf(u8, output, "relay_seq 12345") != null); 899 890 try std.testing.expect(std.mem.indexOf(u8, output, "# TYPE relay_uptime_seconds gauge") != null); 900 891 try std.testing.expect(std.mem.indexOf(u8, output, "relay_validator_cache_entries 42") != null); 901 - try std.testing.expect(std.mem.indexOf(u8, output, "relay_validator_migration_queue 3") != null); 902 892 try std.testing.expect(std.mem.indexOf(u8, output, "relay_validator_cache_evictions_total 0") != null); 903 893 } 904 894
-3
src/frame_worker.zig
··· 81 81 const uid: u64 = if (work.persist) |dp| blk: { 82 82 if (did) |d| { 83 83 const result = dp.uidForDidFromHost(d, work.host_id) catch break :blk @as(u64, 0); 84 - if (result.host_changed or result.is_new) { 85 - work.validator.queueMigrationCheck(d, work.host_id); 86 - } 87 84 break :blk result.uid; 88 85 } else break :blk @as(u64, 0); 89 86 } else 0;
+1 -3
src/main.zig
··· 95 95 } }) catch {}; 96 96 } else if (std.mem.eql(u8, path, "/metrics")) { 97 97 const cache_entries = validator.cacheSize(); 98 - const migration_queue_len = validator.migrationQueueLen(); 99 - const migration_pending_count = validator.migrationPendingCount(); 100 98 const attribution = broadcaster.AttributionMetrics{ 101 99 .history_entries = bc.history.count(), 102 100 .evtbuf_entries = persist.evtbufLen(), ··· 106 104 }; 107 105 108 106 var metrics_buf: [65536]u8 = undefined; 109 - const body = broadcaster.formatPrometheusMetrics(stats, cache_entries, migration_queue_len, migration_pending_count, attribution, data_dir, &metrics_buf); 107 + const body = broadcaster.formatPrometheusMetrics(stats, cache_entries, attribution, data_dir, &metrics_buf); 110 108 request.respond(body, .{ .status = .ok, .keep_alive = false, .extra_headers = &.{ 111 109 .{ .name = "content-type", .value = "text/plain; version=0.0.4; charset=utf-8" }, 112 110 .{ .name = "server", .value = "zlay (atproto-relay)" },
-3
src/subscriber.zig
··· 390 390 const uid: u64 = if (sub.persist) |dp| blk: { 391 391 if (did) |d| { 392 392 const result = dp.uidForDidFromHost(d, sub.options.host_id) catch break :blk @as(u64, 0); 393 - if (result.host_changed or result.is_new) { 394 - sub.validator.queueMigrationCheck(d, sub.options.host_id); 395 - } 396 393 break :blk result.uid; 397 394 } else break :blk @as(u64, 0); 398 395 } else 0;
+46 -175
src/validator.zig
··· 41 41 rev_clock_skew: i64 = 300, // 5 minutes 42 42 }; 43 43 44 - const MigrationCheck = struct { 45 - did: []const u8, // duped, owned by validator 46 - new_host_id: u64, 47 - }; 48 - 49 44 pub const Validator = struct { 50 45 allocator: Allocator, 51 46 stats: *broadcaster.Stats, ··· 57 52 queue: std.ArrayListUnmanaged([]const u8) = .{}, 58 53 // in-flight set — prevents duplicate DID entries in the queue 59 54 queued_set: std.StringHashMapUnmanaged(void) = .{}, 60 - // migration validation queue (with dedup to prevent unbounded growth) 61 - migration_queue: std.ArrayListUnmanaged(MigrationCheck) = .{}, 62 - migration_pending: std.StringHashMapUnmanaged(void) = .{}, 63 55 queue_mutex: std.Thread.Mutex = .{}, 64 56 queue_cond: std.Thread.Condition = .{}, 65 57 resolver_threads: [max_resolver_threads]?std.Thread = .{null} ** max_resolver_threads, ··· 68 60 69 61 const max_resolver_threads = 8; 70 62 const default_resolver_threads = 4; 63 + const max_queue_size: usize = 100_000; 71 64 72 65 pub fn init(allocator: Allocator, stats: *broadcaster.Stats) Validator { 73 66 return initWithConfig(allocator, stats, .{}); ··· 100 93 } 101 94 self.queue.deinit(self.allocator); 102 95 self.queued_set.deinit(self.allocator); 103 - 104 - // free migration queue 105 - for (self.migration_queue.items) |mc| { 106 - self.allocator.free(mc.did); 107 - } 108 - self.migration_queue.deinit(self.allocator); 109 - 110 - // free migration dedup set keys 111 - var mig_it = self.migration_pending.keyIterator(); 112 - while (mig_it.next()) |k| { 113 - self.allocator.free(k.*); 114 - } 115 - self.migration_pending.deinit(self.allocator); 116 96 } 117 97 118 98 /// start background resolver threads ··· 400 380 return; 401 381 } 402 382 383 + // cap queue size — drop DID without adding to queued_set so it can be re-queued later 384 + if (self.queue.items.len >= max_queue_size) { 385 + self.allocator.free(duped); 386 + return; 387 + } 388 + 403 389 self.queue.append(self.allocator, duped) catch { 404 390 self.allocator.free(duped); 405 391 return; ··· 414 400 415 401 while (self.alive.load(.acquire)) { 416 402 var did: ?[]const u8 = null; 417 - var migration: ?MigrationCheck = null; 418 403 { 419 404 self.queue_mutex.lock(); 420 405 defer self.queue_mutex.unlock(); 421 - while (self.queue.items.len == 0 and self.migration_queue.items.len == 0 and self.alive.load(.acquire)) { 406 + while (self.queue.items.len == 0 and self.alive.load(.acquire)) { 422 407 self.queue_cond.timedWait(&self.queue_mutex, 1 * std.time.ns_per_s) catch {}; 423 408 } 424 409 if (self.queue.items.len > 0) { 425 410 did = self.queue.orderedRemove(0); 426 411 _ = self.queued_set.remove(did.?); 427 - } else if (self.migration_queue.items.len > 0) { 428 - migration = self.migration_queue.orderedRemove(0); 429 412 } 430 413 } 431 414 432 - if (did) |d| { 433 - defer self.allocator.free(d); 415 + const d = did orelse continue; 416 + defer self.allocator.free(d); 434 417 435 - // skip if already cached (resolved while queued) 436 - if (self.cache.contains(d)) continue; 418 + // skip if already cached (resolved while queued) 419 + if (self.cache.contains(d)) continue; 437 420 438 - // resolve DID → signing key 439 - const parsed = zat.Did.parse(d) orelse continue; 440 - var doc = resolver.resolve(parsed) catch |err| { 441 - log.debug("DID resolve failed for {s}: {s}", .{ d, @errorName(err) }); 442 - continue; 443 - }; 444 - defer doc.deinit(); 421 + // resolve DID → signing key 422 + const parsed = zat.Did.parse(d) orelse continue; 423 + var doc = resolver.resolve(parsed) catch |err| { 424 + log.debug("DID resolve failed for {s}: {s}", .{ d, @errorName(err) }); 425 + continue; 426 + }; 427 + defer doc.deinit(); 445 428 446 - // extract and decode signing key 447 - const vm = doc.signingKey() orelse continue; 448 - const key_bytes = zat.multibase.decode(self.allocator, vm.public_key_multibase) catch continue; 449 - defer self.allocator.free(key_bytes); 450 - const public_key = zat.multicodec.parsePublicKey(key_bytes) catch continue; 429 + // extract and decode signing key 430 + const vm = doc.signingKey() orelse continue; 431 + const key_bytes = zat.multibase.decode(self.allocator, vm.public_key_multibase) catch continue; 432 + defer self.allocator.free(key_bytes); 433 + const public_key = zat.multicodec.parsePublicKey(key_bytes) catch continue; 451 434 452 - // store decoded key in cache (fixed-size, no pointer chasing) 453 - var cached = CachedKey{ 454 - .key_type = public_key.key_type, 455 - .raw = undefined, 456 - .len = @intCast(public_key.raw.len), 457 - .resolve_time = std.time.timestamp(), 458 - }; 459 - @memcpy(cached.raw[0..public_key.raw.len], public_key.raw); 435 + // store decoded key in cache (fixed-size, no pointer chasing) 436 + var cached = CachedKey{ 437 + .key_type = public_key.key_type, 438 + .raw = undefined, 439 + .len = @intCast(public_key.raw.len), 440 + .resolve_time = std.time.timestamp(), 441 + }; 442 + @memcpy(cached.raw[0..public_key.raw.len], public_key.raw); 460 443 461 - self.cache.put(d, cached) catch continue; 462 - } else if (migration) |mc| { 463 - defer self.allocator.free(mc.did); 464 - const confirmed = self.processMigrationCheck(&resolver, mc); 444 + self.cache.put(d, cached) catch continue; 465 445 466 - // remove from dedup set — if confirmed, allow future re-queue 467 - // if rejected, leave in set to suppress re-queueing 468 - if (confirmed) { 469 - self.queue_mutex.lock(); 470 - defer self.queue_mutex.unlock(); 471 - if (self.migration_pending.fetchRemove(mc.did)) |entry| { 472 - self.allocator.free(entry.key); 446 + // --- host validation (merged from migration queue) --- 447 + // while we have the DID doc, check PDS endpoint and update host if needed. 448 + // best-effort: failures don't prevent signing key caching. 449 + if (self.persist) |persist| { 450 + if (doc.pdsEndpoint()) |pds_endpoint| { 451 + if (extractHostFromUrl(pds_endpoint)) |pds_host| { 452 + const pds_host_id = (persist.getHostIdForHostname(pds_host) catch null) orelse continue; 453 + const uid = persist.uidForDid(d) catch continue; 454 + const current_host = persist.getAccountHostId(uid) catch continue; 455 + if (current_host != 0 and current_host != pds_host_id) { 456 + persist.setAccountHostId(uid, pds_host_id) catch {}; 457 + log.info("host updated via DID doc: {s} -> host {d}", .{ d, pds_host_id }); 458 + } 473 459 } 474 460 } 475 - // rejected DIDs stay in migration_pending, suppressing re-queueing 476 - // until the set is cleared (see migrationPendingCleanup) 477 461 } 478 462 } 479 463 } 480 464 481 - /// validate a host migration by resolving the DID document and checking the PDS endpoint. 482 - /// returns true if migration was confirmed, false if rejected/failed. 483 - fn processMigrationCheck(self: *Validator, resolver: *zat.DidResolver, mc: MigrationCheck) bool { 484 - const persist = self.persist orelse return false; 485 - 486 - const parsed = zat.Did.parse(mc.did) orelse { 487 - log.debug("migration check: invalid DID {s}", .{mc.did}); 488 - return false; 489 - }; 490 - 491 - var doc = resolver.resolve(parsed) catch |err| { 492 - log.debug("migration check: DID resolve failed for {s}: {s}", .{ mc.did, @errorName(err) }); 493 - return false; 494 - }; 495 - defer doc.deinit(); 496 - 497 - const pds_endpoint = doc.pdsEndpoint() orelse { 498 - log.debug("migration check: no PDS endpoint for {s}", .{mc.did}); 499 - return false; 500 - }; 501 - 502 - // extract hostname from PDS endpoint URL (strip https:// prefix) 503 - const pds_host = extractHostFromUrl(pds_endpoint) orelse { 504 - log.debug("migration check: cannot parse PDS URL '{s}' for {s}", .{ pds_endpoint, mc.did }); 505 - return false; 506 - }; 507 - 508 - // look up the hostname → host_id 509 - const resolved_host_id = (persist.getHostIdForHostname(pds_host) catch { 510 - log.debug("migration check: host lookup failed for {s}", .{pds_host}); 511 - return false; 512 - }) orelse { 513 - log.debug("migration check: unknown host {s} for {s}", .{ pds_host, mc.did }); 514 - return false; 515 - }; 516 - 517 - if (resolved_host_id == mc.new_host_id) { 518 - // DID document confirms the new host — update 519 - const uid = persist.uidForDid(mc.did) catch return false; 520 - persist.setAccountHostId(uid, mc.new_host_id) catch return false; 521 - log.info("migration validated: {s} → host {d} (confirmed by DID doc)", .{ mc.did, mc.new_host_id }); 522 - return true; 523 - } else { 524 - // mismatch — reject new accounts, warn on migrations 525 - const uid = persist.uidForDid(mc.did) catch return false; 526 - const current_host = persist.getAccountHostId(uid) catch return false; 527 - if (current_host == mc.new_host_id) { 528 - // new account: host not confirmed by DID doc → reject 529 - persist.updateAccountUpstreamStatus(uid, "rejected") catch return false; 530 - log.warn("new account rejected: {s} on host {d}, DID doc says {s} (host {d})", .{ 531 - mc.did, mc.new_host_id, pds_host, resolved_host_id, 532 - }); 533 - } else { 534 - // migration: host not confirmed (existing warning behavior) 535 - log.warn("migration rejected: {s} claims host {d}, but DID doc says {s} (host {d})", .{ 536 - mc.did, mc.new_host_id, pds_host, resolved_host_id, 537 - }); 538 - } 539 - return false; 540 - } 541 - } 542 - 543 465 /// evict a DID's cached signing key (e.g. on #identity event). 544 466 /// the next commit from this DID will trigger a fresh resolution. 545 - /// also clears migration suppression so host changes are re-evaluated. 546 467 pub fn evictKey(self: *Validator, did: []const u8) void { 547 468 _ = self.cache.remove(did); 548 - 549 - // clear migration suppression — DID doc may have changed 550 - self.queue_mutex.lock(); 551 - defer self.queue_mutex.unlock(); 552 - if (self.migration_pending.fetchRemove(did)) |entry| { 553 - self.allocator.free(entry.key); 554 - } 555 - } 556 - 557 - /// queue a DID for async migration validation (host change detected). 558 - /// deduped: only one pending check per DID at a time. 559 - pub fn queueMigrationCheck(self: *Validator, did: []const u8, new_host_id: u64) void { 560 - self.queue_mutex.lock(); 561 - defer self.queue_mutex.unlock(); 562 - 563 - // skip if already queued or recently checked (prevents unbounded growth) 564 - if (self.migration_pending.contains(did)) return; 565 - 566 - const duped = self.allocator.dupe(u8, did) catch return; 567 - const set_key = self.allocator.dupe(u8, did) catch { 568 - self.allocator.free(duped); 569 - return; 570 - }; 571 - 572 - self.migration_queue.append(self.allocator, .{ 573 - .did = duped, 574 - .new_host_id = new_host_id, 575 - }) catch { 576 - self.allocator.free(duped); 577 - self.allocator.free(set_key); 578 - return; 579 - }; 580 - self.migration_pending.put(self.allocator, set_key, {}) catch { 581 - self.allocator.free(set_key); 582 - }; 583 - self.queue_cond.signal(); 584 469 } 585 470 586 471 /// cache size (for diagnostics) 587 472 pub fn cacheSize(self: *Validator) u32 { 588 473 return self.cache.count(); 589 - } 590 - 591 - /// migration queue length (for diagnostics — non-blocking, returns 0 if lock is contended) 592 - pub fn migrationQueueLen(self: *Validator) usize { 593 - if (!self.queue_mutex.tryLock()) return 0; 594 - defer self.queue_mutex.unlock(); 595 - return self.migration_queue.items.len; 596 - } 597 - 598 - /// migration pending (suppressed) count (for diagnostics — non-blocking) 599 - pub fn migrationPendingCount(self: *Validator) u32 { 600 - if (!self.queue_mutex.tryLock()) return 0; 601 - defer self.queue_mutex.unlock(); 602 - return self.migration_pending.count(); 603 474 } 604 475 605 476 /// resolve queue length (for diagnostics — non-blocking)