//! admin endpoint handlers for relay management. //! //! all handlers require Bearer token auth against RELAY_ADMIN_PASSWORD. //! includes host blocking/unblocking, account bans, and backfill control. //! //! DB-accessing handlers use DbRequest + DbRequestQueue to route queries //! through pool_io workers. const std = @import("std"); const Io = std.Io; const h = @import("http.zig"); const router = @import("router.zig"); const websocket = @import("websocket"); const event_log_mod = @import("../event_log.zig"); const backfill_mod = @import("../backfill.zig"); const cleaner_mod = @import("../cleaner.zig"); const resync_mod = @import("../resync.zig"); const log = std.log.scoped(.relay); const HttpContext = router.HttpContext; const DbRequest = event_log_mod.DbRequest; const DiskPersist = event_log_mod.DiskPersist; /// check admin auth via headers, send error response if not authorized. returns true if authorized. pub fn checkAdmin(conn: *h.Conn, headers: ?*const websocket.Handshake.KeyValue) bool { const admin_pw = getenv("RELAY_ADMIN_PASSWORD") orelse { h.respondJson(conn, .forbidden, "{\"error\":\"admin endpoint not configured\"}"); return false; }; const kv = headers orelse { h.respondJson(conn, .unauthorized, "{\"error\":\"missing authorization header\"}"); return false; }; // handshake parser lowercases all header names const auth_value = kv.get("authorization") orelse { h.respondJson(conn, .unauthorized, "{\"error\":\"missing authorization header\"}"); return false; }; const bearer_prefix = "Bearer "; if (!std.mem.startsWith(u8, auth_value, bearer_prefix)) { h.respondJson(conn, .unauthorized, "{\"error\":\"invalid authorization scheme\"}"); return false; } const token = auth_value[bearer_prefix.len..]; if (!std.mem.eql(u8, token, admin_pw)) { h.respondJson(conn, .unauthorized, "{\"error\":\"invalid token\"}"); return false; } return true; } pub fn handleBan(conn: *h.Conn, body: []const u8, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { if (!checkAdmin(conn, headers)) return; const parsed = std.json.parseFromSlice(struct { did: []const u8 }, ctx.persist.allocator, body, .{ .ignore_unknown_fields = true }) catch { h.respondJson(conn, .bad_request, "{\"error\":\"invalid JSON, expected {\\\"did\\\":\\\"...\\\"}\"}"); return; }; defer parsed.deinit(); const did = parsed.value.did; // resolve DID → UID via DbRequestQueue const UidReq = struct { base: DbRequest = .{ .callback = &execute }, did_buf: [256]u8 = undefined, did_len: usize = 0, uid: u64 = 0, fn execute(b: *DbRequest, dp: *DiskPersist) void { const self: *@This() = @fieldParentPtr("base", b); const d = self.did_buf[0..self.did_len]; // check database if (dp.db.rowUnsafe("SELECT uid FROM account WHERE did = $1", .{d}) catch null) |row| { var r = row; defer r.deinit() catch {}; self.uid = @intCast(r.get(i64, 0)); return; } // create new account row _ = dp.db.exec("INSERT INTO account (did) VALUES ($1) ON CONFLICT (did) DO NOTHING", .{d}) catch { b.err = error.DatabaseError; return; }; var row = dp.db.rowUnsafe("SELECT uid FROM account WHERE did = $1", .{d}) catch { b.err = error.DatabaseError; return; } orelse { b.err = error.AccountCreationFailed; return; }; defer row.deinit() catch {}; self.uid = @intCast(row.get(i64, 0)); } }; var uid_req: UidReq = .{}; const copy_len = @min(did.len, uid_req.did_buf.len); @memcpy(uid_req.did_buf[0..copy_len], did[0..copy_len]); uid_req.did_len = copy_len; ctx.db_queue.push(&uid_req.base); uid_req.base.wait(ctx.io, ctx.shutdown); if (uid_req.base.err != null) { h.respondJson(conn, .internal_server_error, "{\"error\":\"failed to resolve DID\"}"); return; } // remove from collection index so banned accounts don't appear in listReposByCollection ctx.collection_index.removeAll(did) catch |err| { log.debug("collection removeAll after ban failed: {s}", .{@errorName(err)}); }; // build CBOR #account frame and route takedown + persist + broadcast // through host_ops queue (pool_io thread) — fire and forget. const host_ops_mod = @import("../host_ops.zig"); var td: host_ops_mod.HostOp.Payload.Takedown = .{ .uid = uid_req.uid }; if (buildAccountFrame(ctx.persist.allocator, did)) |frame_bytes| { defer ctx.persist.allocator.free(frame_bytes); if (frame_bytes.len <= td.frame_buf.len) { @memcpy(td.frame_buf[0..frame_bytes.len], frame_bytes); td.frame_len = @intCast(frame_bytes.len); } } ctx.host_ops.push(.{ .host_id = 0, // not host-specific .kind = .takedown_user, .payload = .{ .takedown = td }, }); log.info("admin: banned {s} (uid={d}), takedown enqueued", .{ did, uid_req.uid }); h.respondJson(conn, .ok, "{\"success\":true}"); } pub fn handleAdminListHosts(conn: *h.Conn, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { if (!checkAdmin(conn, headers)) return; // list all hosts via DbRequestQueue const ListAllHostsReq = struct { base: DbRequest = .{ .callback = &execute }, alloc: std.mem.Allocator, result: ?[]event_log_mod.DiskPersist.Host = null, fn execute(b: *DbRequest, dp: *DiskPersist) void { const self: *@This() = @fieldParentPtr("base", b); self.result = dp.listAllHosts(self.alloc) catch |e| { b.err = e; return; }; } }; var list_req: ListAllHostsReq = .{ .alloc = ctx.persist.allocator }; ctx.db_queue.push(&list_req.base); list_req.base.wait(ctx.io, ctx.shutdown); if (list_req.base.err != null or list_req.result == null) { h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); return; } const hosts = list_req.result.?; defer { for (hosts) |host| { ctx.persist.allocator.free(host.hostname); ctx.persist.allocator.free(host.status); } ctx.persist.allocator.free(hosts); } var aw: Io.Writer.Allocating = .init(ctx.persist.allocator); defer aw.deinit(); const w = &aw.writer; w.writeAll("{\"hosts\":[") catch return; for (hosts, 0..) |host, i| { if (i > 0) w.writeByte(',') catch return; if (host.account_limit) |limit| { w.print("{{\"id\":{d},\"hostname\":\"{s}\",\"status\":\"{s}\",\"last_seq\":{d},\"failed_attempts\":{d},\"account_limit\":{d}}}", .{ host.id, host.hostname, host.status, host.last_seq, host.failed_attempts, limit, }) catch return; } else { w.print("{{\"id\":{d},\"hostname\":\"{s}\",\"status\":\"{s}\",\"last_seq\":{d},\"failed_attempts\":{d},\"account_limit\":null}}", .{ host.id, host.hostname, host.status, host.last_seq, host.failed_attempts, }) catch return; } } w.print("],\"active_workers\":{d}}}", .{ctx.slurper.workerCount()}) catch return; h.respondJson(conn, .ok, aw.written()); } pub fn handleAdminBlockHost(conn: *h.Conn, body: []const u8, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { if (!checkAdmin(conn, headers)) return; const parsed = std.json.parseFromSlice(struct { hostname: []const u8 }, ctx.persist.allocator, body, .{ .ignore_unknown_fields = true }) catch { h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid JSON\"}"); return; }; defer parsed.deinit(); const BlockHostReq = struct { base: DbRequest = .{ .callback = &execute }, hostname_buf: [256]u8 = undefined, hostname_len: usize = 0, host_id: u64 = 0, fn execute(b: *DbRequest, dp: *DiskPersist) void { const self: *@This() = @fieldParentPtr("base", b); const hn = self.hostname_buf[0..self.hostname_len]; const info = dp.getOrCreateHost(hn) catch |e| { b.err = e; return; }; self.host_id = info.id; dp.updateHostStatus(info.id, "blocked") catch |e| { b.err = e; return; }; } }; var req: BlockHostReq = .{}; const copy_len = @min(parsed.value.hostname.len, req.hostname_buf.len); @memcpy(req.hostname_buf[0..copy_len], parsed.value.hostname[0..copy_len]); req.hostname_len = copy_len; ctx.db_queue.push(&req.base); req.base.wait(ctx.io, ctx.shutdown); if (req.base.err != null) { h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"operation failed\"}"); return; } log.info("admin: blocked host {s} (id={d})", .{ parsed.value.hostname, req.host_id }); h.respondJson(conn, .ok, "{\"success\":true}"); } pub fn handleAdminUnblockHost(conn: *h.Conn, body: []const u8, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { if (!checkAdmin(conn, headers)) return; const parsed = std.json.parseFromSlice(struct { hostname: []const u8 }, ctx.persist.allocator, body, .{ .ignore_unknown_fields = true }) catch { h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid JSON\"}"); return; }; defer parsed.deinit(); const UnblockHostReq = struct { base: DbRequest = .{ .callback = &execute }, hostname_buf: [256]u8 = undefined, hostname_len: usize = 0, host_id: u64 = 0, fn execute(b: *DbRequest, dp: *DiskPersist) void { const self: *@This() = @fieldParentPtr("base", b); const hn = self.hostname_buf[0..self.hostname_len]; const info = dp.getOrCreateHost(hn) catch |e| { b.err = e; return; }; self.host_id = info.id; dp.updateHostStatus(info.id, "active") catch |e| { b.err = e; return; }; dp.resetHostFailures(info.id) catch {}; } }; var req: UnblockHostReq = .{}; const copy_len = @min(parsed.value.hostname.len, req.hostname_buf.len); @memcpy(req.hostname_buf[0..copy_len], parsed.value.hostname[0..copy_len]); req.hostname_len = copy_len; ctx.db_queue.push(&req.base); req.base.wait(ctx.io, ctx.shutdown); if (req.base.err != null) { h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"operation failed\"}"); return; } log.info("admin: unblocked host {s} (id={d})", .{ parsed.value.hostname, req.host_id }); h.respondJson(conn, .ok, "{\"success\":true}"); } /// set or clear the account_limit override for a host. pub fn handleAdminChangeLimits(conn: *h.Conn, body: []const u8, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { if (!checkAdmin(conn, headers)) return; const parsed = std.json.parseFromSlice( struct { host: []const u8, account_limit: ?u64 }, ctx.persist.allocator, body, .{ .ignore_unknown_fields = true }, ) catch { h.respondJson(conn, .bad_request, "{\"error\":\"invalid JSON, expected {\\\"host\\\":\\\"...\\\",\\\"account_limit\\\":...}\"}"); return; }; defer parsed.deinit(); const ChangeLimitsReq = struct { base: DbRequest = .{ .callback = &execute }, hostname_buf: [256]u8 = undefined, hostname_len: usize = 0, new_limit: ?u64, host_id: ?u64 = null, effective: u64 = 0, fn execute(b: *DbRequest, dp: *DiskPersist) void { const self: *@This() = @fieldParentPtr("base", b); const hn = self.hostname_buf[0..self.hostname_len]; self.host_id = dp.getHostIdForHostname(hn) catch |e| { b.err = e; return; }; const hid = self.host_id orelse return; dp.setHostAccountLimit(hid, self.new_limit) catch |e| { b.err = e; return; }; self.effective = if (self.new_limit) |l| l else dp.getHostAccountCount(hid); } }; var req: ChangeLimitsReq = .{ .new_limit = parsed.value.account_limit }; const copy_len = @min(parsed.value.host.len, req.hostname_buf.len); @memcpy(req.hostname_buf[0..copy_len], parsed.value.host[0..copy_len]); req.hostname_len = copy_len; ctx.db_queue.push(&req.base); req.base.wait(ctx.io, ctx.shutdown); if (req.base.err != null) { h.respondJson(conn, .internal_server_error, "{\"error\":\"database error\"}"); return; } const host_id = req.host_id orelse { h.respondJson(conn, .not_found, "{\"error\":\"host not found\"}"); return; }; // update running subscriber's rate limits immediately ctx.slurper.updateHostLimits(host_id, req.effective); if (parsed.value.account_limit) |limit| { log.info("admin: set account_limit for {s} (id={d}): {d}", .{ parsed.value.host, host_id, limit }); } else { log.info("admin: cleared account_limit for {s} (id={d}), reverted to COUNT(*)", .{ parsed.value.host, host_id }); } h.respondJson(conn, .ok, "{\"success\":true}"); } pub fn handleAdminBackfillTrigger(conn: *h.Conn, query: []const u8, headers: *const websocket.Handshake.KeyValue, backfiller: *backfill_mod.Backfiller) void { if (!checkAdmin(conn, headers)) return; const source = h.queryParam(query, "source") orelse "bsky.network"; backfiller.start(source) catch |err| { switch (err) { error.AlreadyRunning => { h.respondJson(conn, .conflict, "{\"error\":\"backfill already in progress\"}"); }, else => { h.respondJson(conn, .internal_server_error, "{\"error\":\"failed to start backfill\"}"); }, } return; }; var buf: [256]u8 = undefined; const resp_body = std.fmt.bufPrint(&buf, "{{\"status\":\"started\",\"source\":\"{s}\"}}", .{source}) catch { h.respondJson(conn, .ok, "{\"status\":\"started\"}"); return; }; h.respondJson(conn, .ok, resp_body); } pub fn handleAdminBackfillStatus(conn: *h.Conn, headers: *const websocket.Handshake.KeyValue, backfiller: *backfill_mod.Backfiller) void { if (!checkAdmin(conn, headers)) return; const body = backfiller.getStatus(backfiller.allocator) catch { h.respondJson(conn, .internal_server_error, "{\"error\":\"failed to query backfill status\"}"); return; }; defer backfiller.allocator.free(body); h.respondJson(conn, .ok, body); } pub fn handleCleanupTrigger(conn: *h.Conn, headers: *const websocket.Handshake.KeyValue, cleaner: *cleaner_mod.Cleaner) void { if (!checkAdmin(conn, headers)) return; cleaner.start() catch |err| { switch (err) { error.AlreadyRunning => { h.respondJson(conn, .conflict, "{\"error\":\"cleanup already in progress\"}"); }, else => { h.respondJson(conn, .internal_server_error, "{\"error\":\"failed to start cleanup\"}"); }, } return; }; h.respondJson(conn, .ok, "{\"status\":\"started\"}"); } pub fn handleCleanupStatus(conn: *h.Conn, headers: *const websocket.Handshake.KeyValue, cleaner: *cleaner_mod.Cleaner) void { if (!checkAdmin(conn, headers)) return; const status = cleaner.getStatus(); var buf: [256]u8 = undefined; const body = std.fmt.bufPrint(&buf, "{{\"running\":{},\"scanned\":{d},\"removed\":{d}}}", .{ status.running, status.scanned, status.removed, }) catch { h.respondJson(conn, .internal_server_error, "{\"error\":\"format failed\"}"); return; }; h.respondJson(conn, .ok, body); } pub fn handleResyncStatus(conn: *h.Conn, headers: *const websocket.Handshake.KeyValue, resyncer: *resync_mod.Resyncer) void { if (!checkAdmin(conn, headers)) return; var buf: [256]u8 = undefined; const body = std.fmt.bufPrint(&buf, "{{\"processed\":{d},\"failed\":{d},\"dropped\":{d},\"queue_depth\":{d}}}", .{ resyncer.processed.load(.monotonic), resyncer.failed.load(.monotonic), resyncer.dropped.load(.monotonic), resyncer.queueDepth(), }) catch { h.respondJson(conn, .internal_server_error, "{\"error\":\"format failed\"}"); return; }; h.respondJson(conn, .ok, body); } pub fn handleResyncTrigger(conn: *h.Conn, body: []const u8, headers: *const websocket.Handshake.KeyValue, resyncer: *resync_mod.Resyncer) void { if (!checkAdmin(conn, headers)) return; const parsed = std.json.parseFromSlice( struct { did: []const u8, hostname: []const u8 }, std.heap.c_allocator, body, .{ .ignore_unknown_fields = true }, ) catch { h.respondJson(conn, .bad_request, "{\"error\":\"invalid JSON, expected {\\\"did\\\":\\\"...\\\",\\\"hostname\\\":\\\"...\\\"}\"}"); return; }; defer parsed.deinit(); resyncer.enqueue(parsed.value.did, parsed.value.hostname); h.respondJson(conn, .ok, "{\"status\":\"enqueued\"}"); } // --- protocol helpers (used only by handleBan) --- /// build a CBOR #account frame for a takedown event. fn buildAccountFrame(allocator: std.mem.Allocator, did: []const u8) ?[]const u8 { const zat = @import("zat"); const cbor = zat.cbor; const header: cbor.Value = .{ .map = &.{ .{ .key = "op", .value = .{ .unsigned = 1 } }, .{ .key = "t", .value = .{ .text = "#account" } }, } }; var time_buf: [24]u8 = undefined; const time_str = formatTimestamp(&time_buf); const payload: cbor.Value = .{ .map = &.{ .{ .key = "seq", .value = .{ .unsigned = 0 } }, .{ .key = "did", .value = .{ .text = did } }, .{ .key = "time", .value = .{ .text = time_str } }, .{ .key = "active", .value = .{ .boolean = false } }, .{ .key = "status", .value = .{ .text = "takendown" } }, } }; const header_bytes = cbor.encodeAlloc(allocator, header) catch return null; const payload_bytes = cbor.encodeAlloc(allocator, payload) catch { allocator.free(header_bytes); return null; }; var frame = allocator.alloc(u8, header_bytes.len + payload_bytes.len) catch { allocator.free(header_bytes); allocator.free(payload_bytes); return null; }; @memcpy(frame[0..header_bytes.len], header_bytes); @memcpy(frame[header_bytes.len..], payload_bytes); allocator.free(header_bytes); allocator.free(payload_bytes); return frame; } fn formatTimestamp(buf: *[24]u8) []const u8 { var tp: std.c.timespec = undefined; _ = std.c.clock_gettime(.REALTIME, &tp); const ts: u64 = @intCast(tp.sec); const es = std.time.epoch.EpochSeconds{ .secs = ts }; const day = es.getEpochDay(); const yd = day.calculateYearDay(); const md = yd.calculateMonthDay(); const ds = es.getDaySeconds(); return std.fmt.bufPrint(buf, "{d:0>4}-{d:0>2}-{d:0>2}T{d:0>2}:{d:0>2}:{d:0>2}Z", .{ yd.year, @as(u32, @intFromEnum(md.month)) + 1, @as(u32, md.day_index) + 1, ds.getHoursIntoDay(), ds.getMinutesIntoHour(), ds.getSecondsIntoMinute(), }) catch "1970-01-01T00:00:00Z"; } fn getenv(key: [*:0]const u8) ?[]const u8 { const ptr = std.c.getenv(key) orelse return null; return std.mem.sliceTo(ptr, 0); }