declarative relay deployment on hetzner relay-eval.waow.tech
atproto relay
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

relay-eval: add /api/phi/monitors endpoint

per-relay monitor objects designed for phi (bluesky bot) to poll
and post about meaningful status transitions. shape:

[{
"name": "...",
"status": "nominal"|"degraded"|"critical",
"headline": "...",
"metrics": { ... },
"last_changed": "<iso8601>",
"checked_at": "<iso8601>"
}, ...]

status logic:
- critical: no connection in recent runs OR short/baseline < 0.70
- degraded: short/baseline < 0.90
- nominal: otherwise

coverage is computed as unique_dids / max(unique_dids) per run
(self-normalizing — replay-heavy relays inflate only their own
coverage above 1.0 instead of deflating everyone else's).
short window = last 3 valid runs (~15 min); baseline = last 24
(~2h). disconnected runs excluded from the mean.

status transitions are tracked in a new monitor_state table so
phi can judge "is this old news" via last_changed.

public read, no auth. adding a monitored relay happens naturally
when a host appears in a run — no phi-side config change needed.

Co-Authored-By: Claude Opus 4 (1M context) <noreply@anthropic.com>

+315
+6
.gitignore
··· 24 24 25 25 # user-local claude harness settings (shared skills live under .claude/skills/) 26 26 .claude/settings.local.json 27 + .claude/scheduled_tasks.lock 28 + 29 + # zig build artifacts (local-machine dep cache from zig build) 30 + **/zig-pkg/ 31 + **/zig-cache/ 32 + **/.zig-cache/
+192
relay-eval/src/server.zig
··· 75 75 } else if (std.mem.eql(u8, path, "/api/trend")) { 76 76 const limit = parseLimitParam(query, trend_limit, store); 77 77 try serveTrend(allocator, stream, store, limit); 78 + } else if (std.mem.eql(u8, path, "/api/phi/monitors")) { 79 + try servePhiMonitors(allocator, stream, store); 78 80 } else if (std.mem.eql(u8, path, "/og")) { 79 81 try serveOgPng(allocator, stream, og_png_path); 80 82 } else if (std.mem.eql(u8, path, "/og.svg")) { ··· 342 344 const s = std.fmt.bufPrint(&buf, "{d}", .{value}) catch unreachable; 343 345 try svg.appendSlice(allocator, s); 344 346 } 347 + } 348 + 349 + // --- phi monitors endpoint --- 350 + // 351 + // shape is stable and documented — phi (bluesky bot) polls this and reports 352 + // meaningful status transitions. it does not interpret; it reports what's 353 + // written in `headline`. see docs for details. 354 + // 355 + // thresholds (a host's short-window coverage vs baseline coverage): 356 + // ratio < 0.70 or disconnected in majority of recent runs → "critical" 357 + // ratio < 0.90 → "degraded" 358 + // otherwise → "nominal" 359 + // 360 + // short window = last 3 valid runs (~15 min at 5-min cadence). 361 + // baseline window = last 24 valid runs (~2h). 362 + // coverages are clamped to [0, 1] for comparison to avoid replay-induced 363 + // inflation flipping healthy relays into "critical". 364 + 365 + const short_window_runs: u32 = 3; 366 + const baseline_window_runs: u32 = 24; 367 + 368 + const Status = enum { 369 + nominal, 370 + degraded, 371 + critical, 372 + 373 + fn text(self: Status) []const u8 { 374 + return switch (self) { 375 + .nominal => "nominal", 376 + .degraded => "degraded", 377 + .critical => "critical", 378 + }; 379 + } 380 + }; 381 + 382 + fn classifyMonitor(short: Store.MonitorSnapshot, baseline_opt: ?Store.MonitorSnapshot) Status { 383 + // connectivity check dominates: if the relay isn't actually delivering, 384 + // nothing else matters. 385 + if (short.n_runs > 0 and short.connected_runs == 0) return .critical; 386 + if (short.n_runs >= 3 and short.connected_runs <= short.n_runs / 2) return .critical; 387 + 388 + // coverage ratio vs baseline. clamp both sides to 1.0 so a replay-heavy 389 + // run doesn't push a healthy relay into "degraded" via inflated baseline. 390 + const s_cov = @min(short.coverage, 1.0); 391 + const base_cov = if (baseline_opt) |b| @min(b.coverage, 1.0) else s_cov; 392 + 393 + if (base_cov <= 0) return .nominal; 394 + const ratio = s_cov / base_cov; 395 + if (ratio < 0.70) return .critical; 396 + if (ratio < 0.90) return .degraded; 397 + return .nominal; 398 + } 399 + 400 + fn findSnapshot(list: []const Store.MonitorSnapshot, host: []const u8) ?Store.MonitorSnapshot { 401 + for (list) |s| { 402 + if (std.mem.eql(u8, s.host, host)) return s; 403 + } 404 + return null; 405 + } 406 + 407 + fn formatIsoNow(buf: []u8) []const u8 { 408 + const now = std.time.timestamp(); 409 + const es = std.time.epoch.EpochSeconds{ .secs = @intCast(now) }; 410 + const day = es.getEpochDay(); 411 + const yd = day.calculateYearDay(); 412 + const md = yd.calculateMonthDay(); 413 + const ds = es.getDaySeconds(); 414 + return std.fmt.bufPrint(buf, "{d}-{d:0>2}-{d:0>2}T{d:0>2}:{d:0>2}:{d:0>2}Z", .{ 415 + yd.year, 416 + @as(u32, @intFromEnum(md.month)), 417 + @as(u32, md.day_index) + 1, 418 + ds.getHoursIntoDay(), 419 + ds.getMinutesIntoHour(), 420 + ds.getSecondsIntoMinute(), 421 + }) catch "1970-01-01T00:00:00Z"; 422 + } 423 + 424 + fn pctInt(v: f64) u32 { 425 + const clamped = std.math.clamp(v, 0.0, 10.0); // allow >1.0 to be visible in headlines 426 + return @intFromFloat(clamped * 100.0 + 0.5); 427 + } 428 + 429 + fn writeHeadline( 430 + out: *std.ArrayList(u8), 431 + allocator: std.mem.Allocator, 432 + host: []const u8, 433 + status: Status, 434 + short: Store.MonitorSnapshot, 435 + baseline_opt: ?Store.MonitorSnapshot, 436 + ) !void { 437 + const short_pct = pctInt(short.coverage); 438 + const base_pct = if (baseline_opt) |b| pctInt(b.coverage) else short_pct; 439 + 440 + switch (status) { 441 + .nominal => { 442 + try out.print(allocator, "{s}: {d}% coverage over last {d} eval runs", .{ host, short_pct, short.n_runs }); 443 + }, 444 + .degraded => { 445 + try out.print(allocator, "{s}: coverage at {d}% over last {d} runs (baseline {d}%)", .{ host, short_pct, short.n_runs, base_pct }); 446 + }, 447 + .critical => { 448 + const disconnected = short.n_runs - short.connected_runs; 449 + if (short.connected_runs == 0 and short.n_runs > 0) { 450 + try out.print(allocator, "{s}: no firehose connection in last {d} eval runs", .{ host, short.n_runs }); 451 + } else if (disconnected > short.n_runs / 2) { 452 + try out.print(allocator, "{s}: firehose disconnected in {d} of last {d} eval runs", .{ host, disconnected, short.n_runs }); 453 + } else { 454 + try out.print(allocator, "{s}: coverage at {d}% over last {d} runs (baseline {d}%)", .{ host, short_pct, short.n_runs, base_pct }); 455 + } 456 + }, 457 + } 458 + } 459 + 460 + fn servePhiMonitors(allocator: std.mem.Allocator, stream: std.net.Stream, store: *Store) !void { 461 + const short = try store.getPerHostStats(allocator, short_window_runs); 462 + defer { 463 + for (short) |s| allocator.free(s.host); 464 + allocator.free(short); 465 + } 466 + const baseline = try store.getPerHostStats(allocator, baseline_window_runs); 467 + defer { 468 + for (baseline) |s| allocator.free(s.host); 469 + allocator.free(baseline); 470 + } 471 + 472 + var ts_buf: [32]u8 = undefined; 473 + const checked_at = formatIsoNow(&ts_buf); 474 + 475 + var json: std.ArrayList(u8) = .empty; 476 + defer json.deinit(allocator); 477 + 478 + try json.append(allocator, '['); 479 + 480 + for (short, 0..) |host_short, i| { 481 + if (i > 0) try json.append(allocator, ','); 482 + 483 + const host_baseline = findSnapshot(baseline, host_short.host); 484 + const status = classifyMonitor(host_short, host_baseline); 485 + 486 + // look up prior monitor_state; if status differs, update last_changed. 487 + // on disagreement OR no prior state, last_changed = now. 488 + var last_changed_buf: [32]u8 = undefined; 489 + const last_changed: []const u8 = blk: { 490 + const prior = try store.getMonitorState(allocator, host_short.host); 491 + if (prior) |p| { 492 + defer { 493 + allocator.free(p.status); 494 + allocator.free(p.last_changed); 495 + } 496 + if (std.mem.eql(u8, p.status, status.text())) { 497 + // status unchanged — keep existing last_changed 498 + const copy = std.fmt.bufPrint(&last_changed_buf, "{s}", .{p.last_changed}) catch p.last_changed; 499 + break :blk copy; 500 + } 501 + // status transitioned — bump to checked_at 502 + try store.upsertMonitorState(host_short.host, status.text(), checked_at); 503 + const copy = std.fmt.bufPrint(&last_changed_buf, "{s}", .{checked_at}) catch checked_at; 504 + break :blk copy; 505 + } else { 506 + // first time seeing this host — seed with current state 507 + try store.upsertMonitorState(host_short.host, status.text(), checked_at); 508 + const copy = std.fmt.bufPrint(&last_changed_buf, "{s}", .{checked_at}) catch checked_at; 509 + break :blk copy; 510 + } 511 + }; 512 + 513 + // emit monitor object 514 + try json.print(allocator, "{{\"name\":\"{s}\",\"status\":\"{s}\",\"headline\":\"", .{ host_short.host, status.text() }); 515 + try writeHeadline(&json, allocator, host_short.host, status, host_short, host_baseline); 516 + try json.appendSlice(allocator, "\",\"metrics\":{"); 517 + 518 + // coverage values are fractions (0..~1 for normal runs, can exceed 1 519 + // transiently during replay). emit as percent with 2 decimals: a 520 + // 0.9784 fraction → "97.84". 521 + const short_pct_hundredths: u32 = @intFromFloat(std.math.clamp(host_short.coverage, 0, 10.0) * 10000.0 + 0.5); 522 + try json.print(allocator, "\"coverage_short_pct\":{d}.{d:0>2}", .{ short_pct_hundredths / 100, short_pct_hundredths % 100 }); 523 + 524 + if (host_baseline) |b| { 525 + const base_pct_hundredths: u32 = @intFromFloat(std.math.clamp(b.coverage, 0, 10.0) * 10000.0 + 0.5); 526 + try json.print(allocator, ",\"coverage_baseline_pct\":{d}.{d:0>2},\"baseline_runs\":{d}", .{ base_pct_hundredths / 100, base_pct_hundredths % 100, b.n_runs }); 527 + } 528 + 529 + try json.print(allocator, 530 + ",\"events_short\":{d},\"dids_short\":{d},\"connected_runs\":{d},\"short_runs\":{d}}},\"last_changed\":\"{s}\",\"checked_at\":\"{s}\"}}", 531 + .{ host_short.total_events, host_short.total_dids, host_short.connected_runs, host_short.n_runs, last_changed, checked_at }, 532 + ); 533 + } 534 + 535 + try json.append(allocator, ']'); 536 + try sendResponse(stream, "200 OK", "application/json", json.items); 345 537 } 346 538 347 539 fn sendResponse(stream: std.net.Stream, status: []const u8, content_type: []const u8, body: []const u8) !void {
+117
relay-eval/src/store.zig
··· 56 56 \\); 57 57 \\CREATE INDEX IF NOT EXISTS idx_relay_stats_run_id ON relay_stats(run_id); 58 58 \\CREATE INDEX IF NOT EXISTS idx_diffs_run_id ON diffs(run_id); 59 + \\CREATE TABLE IF NOT EXISTS monitor_state ( 60 + \\ name TEXT PRIMARY KEY, 61 + \\ status TEXT NOT NULL, 62 + \\ last_changed TEXT NOT NULL 63 + \\); 59 64 ; 60 65 try self.exec(sql); 61 66 } ··· 361 366 }); 362 367 } 363 368 return points.toOwnedSlice(allocator); 369 + } 370 + 371 + // --- monitor support (/api/phi/monitors) --- 372 + 373 + pub const MonitorSnapshot = struct { 374 + host: []const u8, 375 + // mean of (unique_dids / union_dids) across the window, clamped >= 0 376 + coverage: f64, 377 + total_events: u64, 378 + total_dids: u32, 379 + connected_runs: u32, 380 + n_runs: u32, 381 + }; 382 + 383 + /// aggregate per-host stats across the last N valid runs. 384 + /// coverage is the mean per-run (unique_dids / max unique_dids in that run) — 385 + /// the leading relay is 100% and others measure relative. self-normalizes 386 + /// against replay: a relay emitting replay events only inflates its own 387 + /// coverage above 1.0, never pulls other relays down. 388 + /// disconnected runs (connected=0) are excluded from the mean so they 389 + /// don't drag the baseline down below the actual delivery rate. 390 + pub fn getPerHostStats(self: *Store, allocator: std.mem.Allocator, last_n: u32) ![]MonitorSnapshot { 391 + const sql = 392 + \\SELECT rs.host, 393 + \\ AVG(CASE 394 + \\ WHEN rs.connected = 1 395 + \\ THEN CAST(rs.unique_dids AS REAL) / NULLIF(mpr.m, 0) 396 + \\ ELSE NULL 397 + \\ END) AS coverage, 398 + \\ COALESCE(SUM(rs.events), 0) AS total_events, 399 + \\ COALESCE(SUM(rs.unique_dids), 0) AS total_dids, 400 + \\ COALESCE(SUM(rs.connected), 0) AS connected_runs, 401 + \\ COUNT(*) AS n_runs 402 + \\ FROM runs r 403 + \\ JOIN relay_stats rs ON rs.run_id = r.id 404 + \\ JOIN ( 405 + \\ SELECT run_id, MAX(unique_dids) AS m 406 + \\ FROM relay_stats 407 + \\ GROUP BY run_id 408 + \\ ) mpr ON mpr.run_id = r.id 409 + \\ WHERE r.id IN ( 410 + \\ SELECT id FROM runs r2 411 + \\ WHERE r2.id IN (SELECT run_id FROM relay_stats GROUP BY run_id HAVING SUM(connected) > COUNT(*) / 2) 412 + \\ ORDER BY id DESC LIMIT ? 413 + \\ ) 414 + \\ GROUP BY rs.host 415 + \\ ORDER BY rs.host 416 + ; 417 + var stmt: ?*c.sqlite3_stmt = null; 418 + if (c.sqlite3_prepare_v2(self.db, sql, -1, &stmt, null) != c.SQLITE_OK) { 419 + return self.sqlError(); 420 + } 421 + defer _ = c.sqlite3_finalize(stmt); 422 + 423 + _ = c.sqlite3_bind_int(stmt, 1, @intCast(last_n)); 424 + 425 + var out: std.ArrayList(MonitorSnapshot) = .empty; 426 + while (c.sqlite3_step(stmt) == c.SQLITE_ROW) { 427 + try out.append(allocator, .{ 428 + .host = try self.colText(allocator, stmt, 0), 429 + .coverage = c.sqlite3_column_double(stmt, 1), 430 + .total_events = @intCast(c.sqlite3_column_int64(stmt, 2)), 431 + .total_dids = @intCast(c.sqlite3_column_int(stmt, 3)), 432 + .connected_runs = @intCast(c.sqlite3_column_int(stmt, 4)), 433 + .n_runs = @intCast(c.sqlite3_column_int(stmt, 5)), 434 + }); 435 + } 436 + return out.toOwnedSlice(allocator); 437 + } 438 + 439 + pub const MonitorStateRow = struct { 440 + status: []const u8, 441 + last_changed: []const u8, 442 + }; 443 + 444 + pub fn getMonitorState(self: *Store, allocator: std.mem.Allocator, name: []const u8) !?MonitorStateRow { 445 + const sql = "SELECT status, last_changed FROM monitor_state WHERE name = ?"; 446 + var stmt: ?*c.sqlite3_stmt = null; 447 + if (c.sqlite3_prepare_v2(self.db, sql, -1, &stmt, null) != c.SQLITE_OK) { 448 + return self.sqlError(); 449 + } 450 + defer _ = c.sqlite3_finalize(stmt); 451 + 452 + _ = c.sqlite3_bind_text(stmt, 1, name.ptr, @intCast(name.len), SQLITE_STATIC); 453 + 454 + if (c.sqlite3_step(stmt) == c.SQLITE_ROW) { 455 + return .{ 456 + .status = try self.colText(allocator, stmt, 0), 457 + .last_changed = try self.colText(allocator, stmt, 1), 458 + }; 459 + } 460 + return null; 461 + } 462 + 463 + pub fn upsertMonitorState(self: *Store, name: []const u8, status: []const u8, last_changed: []const u8) !void { 464 + const sql = 465 + \\INSERT INTO monitor_state (name, status, last_changed) VALUES (?, ?, ?) 466 + \\ON CONFLICT(name) DO UPDATE SET status = excluded.status, last_changed = excluded.last_changed 467 + ; 468 + var stmt: ?*c.sqlite3_stmt = null; 469 + if (c.sqlite3_prepare_v2(self.db, sql, -1, &stmt, null) != c.SQLITE_OK) { 470 + return self.sqlError(); 471 + } 472 + defer _ = c.sqlite3_finalize(stmt); 473 + 474 + _ = c.sqlite3_bind_text(stmt, 1, name.ptr, @intCast(name.len), SQLITE_STATIC); 475 + _ = c.sqlite3_bind_text(stmt, 2, status.ptr, @intCast(status.len), SQLITE_STATIC); 476 + _ = c.sqlite3_bind_text(stmt, 3, last_changed.ptr, @intCast(last_changed.len), SQLITE_STATIC); 477 + 478 + if (c.sqlite3_step(stmt) != c.SQLITE_DONE) { 479 + return self.sqlError(); 480 + } 364 481 } 365 482 366 483 /// get total number of valid runs