my harness for niri
1import Fastify from "fastify"
2import fastifyStatic from "@fastify/static"
3import { existsSync } from "node:fs"
4import { dirname, join } from "node:path"
5import { fileURLToPath } from "node:url"
6import { wake, isRunning, enqueueEvent } from "./runner/index.js"
7import { buildDiscordBatchDigest, scanDiscordChannels } from "./discord/state.js"
8import { handleDiscordIngress } from "./discord/pipeline.js"
9import { fromBsky } from "./triggers/bsky.js"
10import { fromWebhook } from "./triggers/webhook.js"
11import { fromCron } from "./triggers/cron.js"
12import { fromChat } from "./triggers/chat.js"
13import { subscribe } from "./stream.js"
14import type { UserMessage } from "./types.js"
15
16const SRC_DIR = dirname(fileURLToPath(import.meta.url))
17const WEB_DIST_DIR = join(SRC_DIR, "..", "apps", "web", "dist")
18const DISCORD_BATCH_INTERVAL_MS = Math.max(
19 60_000,
20 parseInt(process.env.DISCORD_BATCH_INTERVAL_MS ?? "60000", 10) || 60_000,
21)
22const DISCORD_BATCH_MAX_MESSAGES = Math.max(
23 5,
24 Math.min(200, parseInt(process.env.DISCORD_BATCH_MAX_MESSAGES ?? "40", 10) || 40),
25)
26const DISCORD_BATCH_SCAN = (process.env.DISCORD_BATCH_SCAN ?? "true").trim().toLowerCase() !== "false"
27
28export function createServer() {
29 const app = Fastify({ logger: false })
30 let discordBatchInFlight = false
31 let discordBatchTimer: ReturnType<typeof setInterval> | null = null
32
33 const emitDiscordBatchEvent = (content: string, raw: Record<string, unknown>): void => {
34 const event: UserMessage = {
35 source: "discord",
36 triggeredAt: new Date().toISOString(),
37 content,
38 raw,
39 }
40 isRunning() ? enqueueEvent(event) : wake(event)
41 }
42
43 const runDiscordBatch = async (): Promise<void> => {
44 if (discordBatchInFlight) return
45 discordBatchInFlight = true
46 try {
47 let scanSummary: Record<string, unknown> | null = null
48 let scanError: string | null = null
49
50 if (DISCORD_BATCH_SCAN) {
51 try {
52 scanSummary = await scanDiscordChannels({
53 limit: Math.min(100, DISCORD_BATCH_MAX_MESSAGES),
54 })
55 } catch (err) {
56 scanError = err instanceof Error ? err.message : String(err)
57 console.warn("[discord batch] scan failed:", scanError)
58 }
59 }
60
61 const digest = buildDiscordBatchDigest({
62 maxMessages: DISCORD_BATCH_MAX_MESSAGES,
63 intervalMs: DISCORD_BATCH_INTERVAL_MS,
64 })
65 if (!digest) return
66
67 const scanNote = scanSummary
68 ? `\n\nscan snapshot: ${JSON.stringify(scanSummary)}`
69 : scanError
70 ? `\n\nscan snapshot error: ${scanError}`
71 : ""
72
73 emitDiscordBatchEvent(`${digest.content}${scanNote}`, {
74 type: "discord_batch",
75 digest,
76 ...(scanSummary ? { scan: scanSummary } : {}),
77 ...(scanError ? { scan_error: scanError } : {}),
78 })
79 } finally {
80 discordBatchInFlight = false
81 }
82 }
83
84 const hasDiscordToken = Boolean(process.env.DISCORD_BOT_TOKEN?.trim())
85 if (hasDiscordToken) {
86 discordBatchTimer = setInterval(() => {
87 void runDiscordBatch()
88 }, DISCORD_BATCH_INTERVAL_MS)
89 if (typeof discordBatchTimer.unref === "function") discordBatchTimer.unref()
90
91 // Kick one shortly after startup so a sleeping niri can receive recent Discord context quickly.
92 setTimeout(() => {
93 void runDiscordBatch()
94 }, 5_000).unref?.()
95 }
96
97 app.addHook("onClose", async () => {
98 if (!discordBatchTimer) return
99 clearInterval(discordBatchTimer)
100 discordBatchTimer = null
101 })
102
103 if (existsSync(WEB_DIST_DIR)) {
104 app.register(fastifyStatic, {
105 root: WEB_DIST_DIR,
106 prefix: "/ui/",
107 index: false,
108 })
109
110 app.get("/ui", async (_req, reply) => reply.sendFile("index.html"))
111 } else {
112 app.get("/ui", async (_req, reply) => {
113 reply.code(503)
114 return { error: "web ui is not built yet. run `npm run build:web` first." }
115 })
116 }
117
118 app.post("/trigger/discord", async (req, reply) => {
119 const result = handleDiscordIngress(req.body)
120 return reply.send({
121 ok: true,
122 ...result,
123 })
124 })
125
126 app.post("/trigger/bsky", async (req, reply) => {
127 const event = fromBsky(req.body)
128 isRunning() ? enqueueEvent(event) : wake(event)
129 return reply.send({ ok: true })
130 })
131
132 app.post("/trigger/webhook", async (req, reply) => {
133 const event = fromWebhook(req.body)
134 isRunning() ? enqueueEvent(event) : wake(event)
135 return reply.send({ ok: true })
136 })
137
138 app.post("/trigger/cron", async (req, reply) => {
139 const event = fromCron()
140 isRunning() ? enqueueEvent(event) : wake(event)
141 return reply.send({ ok: true })
142 })
143
144 app.post("/trigger/chat", async (req, reply) => {
145 const event = fromChat(req.body)
146 if (!event.content.trim()) {
147 return reply.code(400).send({ error: "content is required" })
148 }
149 isRunning() ? enqueueEvent(event) : wake(event)
150 return reply.send({ ok: true, running: true })
151 })
152
153 app.get("/chat/stream", (req, reply) => {
154 reply.hijack()
155
156 const res = reply.raw
157 res.writeHead(200, {
158 "content-type": "text/event-stream",
159 "cache-control": "no-cache",
160 connection: "keep-alive",
161 "x-accel-buffering": "no",
162 })
163
164 // Send initial keepalive so the connection is established
165 res.write(":ok\n\n")
166
167 const unsubscribe = subscribe((event) => {
168 try {
169 res.write(`data: ${JSON.stringify(event)}\n\n`)
170 } catch {
171 // connection closed
172 }
173 })
174
175 const keepalive = setInterval(() => {
176 try {
177 res.write(":ping\n\n")
178 } catch {
179 clearInterval(keepalive)
180 unsubscribe()
181 }
182 }, 15_000)
183
184 req.raw.on("close", () => {
185 clearInterval(keepalive)
186 unsubscribe()
187 })
188 })
189
190 app.get("/status", async () => ({
191 running: isRunning(),
192 }))
193
194 return app
195}