my harness for niri
1import Fastify from "fastify"
2import fastifyStatic from "@fastify/static"
3import { existsSync } from "node:fs"
4import { dirname, join } from "node:path"
5import { fileURLToPath } from "node:url"
6import { wake, isRunning, isWaitingForEvent, enqueueEvent } from "./runner/index.js"
7import { buildDiscordBatchDigest, scanDiscordChannels } from "./discord/state.js"
8import { handleDiscordIngress } from "./discord/pipeline.js"
9import { fromBsky } from "./triggers/bsky.js"
10import { fromWebhook } from "./triggers/webhook.js"
11import { fromCron } from "./triggers/cron.js"
12import { fromChat } from "./triggers/chat.js"
13import { subscribe } from "./stream.js"
14import { getMetrics, getMetricDetail, getDiscordMetricDetail } from "./metrics.js"
15import type { MetricListType } from "./metrics.js"
16import type { UserMessage } from "./types.js"
17
18const SRC_DIR = dirname(fileURLToPath(import.meta.url))
19const WEB_DIST_DIR = join(SRC_DIR, "..", "apps", "web", "dist")
20const DISCORD_BATCH_INTERVAL_MS = Math.max(
21 1_000,
22 parseInt(process.env.DISCORD_BATCH_INTERVAL_MS ?? "60000", 10) || 60_000,
23)
24const DISCORD_BATCH_MAX_MESSAGES = Math.max(
25 5,
26 Math.min(200, parseInt(process.env.DISCORD_BATCH_MAX_MESSAGES ?? "40", 10) || 40),
27)
28const DISCORD_BATCH_SCAN = (process.env.DISCORD_BATCH_SCAN ?? "true").trim().toLowerCase() !== "false"
29const METRIC_LIST_TYPES = new Set<MetricListType>(["response", "summarization", "memory", "prompt", "usage", "discord"])
30const METRIC_TYPE_ALIASES: Record<string, MetricListType> = {
31 compaction: "summarization",
32 memory: "memory",
33 memories: "memory",
34 summary: "summarization",
35 summaries: "summarization",
36 response: "response",
37 prompt_response: "response",
38 "prompt-response": "response",
39 completion: "response",
40}
41
42function parseMetricTypes(raw: string | undefined): MetricListType[] | undefined {
43 if (!raw?.trim()) return undefined
44
45 const types: MetricListType[] = []
46 for (const item of raw.split(",")) {
47 const normalized = item.trim().toLowerCase()
48 const type = METRIC_TYPE_ALIASES[normalized] ?? normalized
49 if (!METRIC_LIST_TYPES.has(type as MetricListType)) continue
50 if (!types.includes(type as MetricListType)) types.push(type as MetricListType)
51 }
52 return types.length ? types : undefined
53}
54
55export function createServer() {
56 const app = Fastify({ logger: false })
57 let discordBatchInFlight = false
58 let discordBatchTimer: ReturnType<typeof setInterval> | null = null
59
60 const runDiscordBatch = async (): Promise<void> => {
61 if (discordBatchInFlight) return
62 discordBatchInFlight = true
63 try {
64 if (!isRunning()) return
65 if (!isWaitingForEvent()) return
66
67 if (DISCORD_BATCH_SCAN) {
68 await scanDiscordChannels({ limit: DISCORD_BATCH_MAX_MESSAGES })
69 }
70
71 const digest = buildDiscordBatchDigest({
72 maxMessages: DISCORD_BATCH_MAX_MESSAGES,
73 intervalMs: DISCORD_BATCH_INTERVAL_MS,
74 })
75 if (!digest) return
76
77 enqueueEvent(
78 {
79 source: "discord",
80 triggeredAt: new Date().toISOString(),
81 content: digest.content,
82 raw: {
83 type: "discord_batch",
84 digest,
85 source: "gateway_cache",
86 },
87 },
88 { onlyIfWaiting: true },
89 )
90 } catch (err) {
91 console.warn("[discord batch] failed:", err instanceof Error ? err.message : String(err))
92 } finally {
93 discordBatchInFlight = false
94 }
95 }
96
97 const hasDiscordToken = Boolean(process.env.DISCORD_BOT_TOKEN?.trim())
98 if (hasDiscordToken) {
99 discordBatchTimer = setInterval(() => {
100 void runDiscordBatch()
101 }, DISCORD_BATCH_INTERVAL_MS)
102 if (typeof discordBatchTimer.unref === "function") discordBatchTimer.unref()
103
104 // Kick one shortly after startup; it only emits if niri is already waiting.
105 setTimeout(() => {
106 void runDiscordBatch()
107 }, 5_000).unref?.()
108 }
109
110 app.addHook("onClose", async () => {
111 if (!discordBatchTimer) return
112 clearInterval(discordBatchTimer)
113 discordBatchTimer = null
114 })
115
116 if (existsSync(WEB_DIST_DIR)) {
117 app.register(fastifyStatic, {
118 root: WEB_DIST_DIR,
119 prefix: "/ui/",
120 index: false,
121 })
122
123 app.get("/ui", async (_req, reply) => reply.sendFile("index.html"))
124 } else {
125 app.get("/ui", async (_req, reply) => {
126 reply.code(503)
127 return { error: "web ui is not built yet. run `npm run build:web` first." }
128 })
129 }
130
131 app.post("/trigger/discord", async (req, reply) => {
132 const result = handleDiscordIngress(req.body)
133 return reply.send({
134 ok: true,
135 ...result,
136 })
137 })
138
139 app.post("/trigger/bsky", async (req, reply) => {
140 const event = fromBsky(req.body)
141 isRunning() ? enqueueEvent(event) : wake(event)
142 return reply.send({ ok: true })
143 })
144
145 app.post("/trigger/webhook", async (req, reply) => {
146 const event = fromWebhook(req.body)
147 isRunning() ? enqueueEvent(event) : wake(event)
148 return reply.send({ ok: true })
149 })
150
151 app.post("/trigger/cron", async (req, reply) => {
152 const event = fromCron()
153 isRunning() ? enqueueEvent(event) : wake(event)
154 return reply.send({ ok: true })
155 })
156
157 app.post("/trigger/chat", async (req, reply) => {
158 const event = fromChat(req.body)
159 if (!event.content.trim()) {
160 return reply.code(400).send({ error: "content is required" })
161 }
162 isRunning() ? enqueueEvent(event) : wake(event)
163 return reply.send({ ok: true, running: true })
164 })
165
166 app.get("/chat/stream", (req, reply) => {
167 reply.hijack()
168
169 const res = reply.raw
170 res.writeHead(200, {
171 "content-type": "text/event-stream",
172 "cache-control": "no-cache",
173 connection: "keep-alive",
174 "x-accel-buffering": "no",
175 })
176
177 // Send initial keepalive so the connection is established
178 res.write(":ok\n\n")
179
180 const unsubscribe = subscribe((event) => {
181 try {
182 res.write(`data: ${JSON.stringify(event)}\n\n`)
183 } catch {
184 // connection closed
185 }
186 })
187
188 const keepalive = setInterval(() => {
189 try {
190 res.write(":ping\n\n")
191 } catch {
192 clearInterval(keepalive)
193 unsubscribe()
194 }
195 }, 15_000)
196
197 req.raw.on("close", () => {
198 clearInterval(keepalive)
199 unsubscribe()
200 })
201 })
202
203 app.get("/status", async () => ({
204 running: isRunning(),
205 }))
206
207 app.get("/metrics", async (req) => {
208 const query = req.query as {
209 limit?: string
210 cursor?: string
211 type?: string
212 includeRaw?: string
213 q?: string
214 from?: string
215 to?: string
216 cursor_memories?: string
217 cursor_summarization?: string
218 cursor_response?: string
219 cursor_prompt?: string
220 cursor_usage?: string
221 cursor_discord?: string
222 }
223 return getMetrics({
224 limit: query.limit ? parseInt(query.limit, 10) : undefined,
225 cursor: query.cursor ? parseInt(query.cursor, 10) : undefined,
226 cursors: {
227 memories: query.cursor_memories,
228 summarization: query.cursor_summarization,
229 response: query.cursor_response,
230 prompt: query.cursor_prompt,
231 usage: query.cursor_usage,
232 discord: query.cursor_discord,
233 },
234 type: parseMetricTypes(query.type),
235 includeRaw: query.includeRaw === "true" || query.includeRaw === "1",
236 q: query.q,
237 from: query.from,
238 to: query.to,
239 })
240 })
241
242 app.get("/metrics/discord/:id", async (req, reply) => {
243 const { id } = req.params as { id: string }
244 const metric = getDiscordMetricDetail(id)
245 if (!metric) return reply.code(404).send({ error: "discord metric not found" })
246 return metric
247 })
248
249 app.get("/metrics/:id", async (req, reply) => {
250 const { id } = req.params as { id: string }
251 const metric = getMetricDetail(parseInt(id, 10))
252 if (!metric) return reply.code(404).send({ error: "metric not found" })
253 return metric
254 })
255
256 return app
257}