my harness for niri
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 257 lines 7.9 kB view raw
1import Fastify from "fastify" 2import fastifyStatic from "@fastify/static" 3import { existsSync } from "node:fs" 4import { dirname, join } from "node:path" 5import { fileURLToPath } from "node:url" 6import { wake, isRunning, isWaitingForEvent, enqueueEvent } from "./runner/index.js" 7import { buildDiscordBatchDigest, scanDiscordChannels } from "./discord/state.js" 8import { handleDiscordIngress } from "./discord/pipeline.js" 9import { fromBsky } from "./triggers/bsky.js" 10import { fromWebhook } from "./triggers/webhook.js" 11import { fromCron } from "./triggers/cron.js" 12import { fromChat } from "./triggers/chat.js" 13import { subscribe } from "./stream.js" 14import { getMetrics, getMetricDetail, getDiscordMetricDetail } from "./metrics.js" 15import type { MetricListType } from "./metrics.js" 16import type { UserMessage } from "./types.js" 17 18const SRC_DIR = dirname(fileURLToPath(import.meta.url)) 19const WEB_DIST_DIR = join(SRC_DIR, "..", "apps", "web", "dist") 20const DISCORD_BATCH_INTERVAL_MS = Math.max( 21 1_000, 22 parseInt(process.env.DISCORD_BATCH_INTERVAL_MS ?? "60000", 10) || 60_000, 23) 24const DISCORD_BATCH_MAX_MESSAGES = Math.max( 25 5, 26 Math.min(200, parseInt(process.env.DISCORD_BATCH_MAX_MESSAGES ?? "40", 10) || 40), 27) 28const DISCORD_BATCH_SCAN = (process.env.DISCORD_BATCH_SCAN ?? "true").trim().toLowerCase() !== "false" 29const METRIC_LIST_TYPES = new Set<MetricListType>(["response", "summarization", "memory", "prompt", "usage", "discord"]) 30const METRIC_TYPE_ALIASES: Record<string, MetricListType> = { 31 compaction: "summarization", 32 memory: "memory", 33 memories: "memory", 34 summary: "summarization", 35 summaries: "summarization", 36 response: "response", 37 prompt_response: "response", 38 "prompt-response": "response", 39 completion: "response", 40} 41 42function parseMetricTypes(raw: string | undefined): MetricListType[] | undefined { 43 if (!raw?.trim()) return undefined 44 45 const types: MetricListType[] = [] 46 for (const item of raw.split(",")) { 47 const normalized = item.trim().toLowerCase() 48 const type = METRIC_TYPE_ALIASES[normalized] ?? normalized 49 if (!METRIC_LIST_TYPES.has(type as MetricListType)) continue 50 if (!types.includes(type as MetricListType)) types.push(type as MetricListType) 51 } 52 return types.length ? types : undefined 53} 54 55export function createServer() { 56 const app = Fastify({ logger: false }) 57 let discordBatchInFlight = false 58 let discordBatchTimer: ReturnType<typeof setInterval> | null = null 59 60 const runDiscordBatch = async (): Promise<void> => { 61 if (discordBatchInFlight) return 62 discordBatchInFlight = true 63 try { 64 if (!isRunning()) return 65 if (!isWaitingForEvent()) return 66 67 if (DISCORD_BATCH_SCAN) { 68 await scanDiscordChannels({ limit: DISCORD_BATCH_MAX_MESSAGES }) 69 } 70 71 const digest = buildDiscordBatchDigest({ 72 maxMessages: DISCORD_BATCH_MAX_MESSAGES, 73 intervalMs: DISCORD_BATCH_INTERVAL_MS, 74 }) 75 if (!digest) return 76 77 enqueueEvent( 78 { 79 source: "discord", 80 triggeredAt: new Date().toISOString(), 81 content: digest.content, 82 raw: { 83 type: "discord_batch", 84 digest, 85 source: "gateway_cache", 86 }, 87 }, 88 { onlyIfWaiting: true }, 89 ) 90 } catch (err) { 91 console.warn("[discord batch] failed:", err instanceof Error ? err.message : String(err)) 92 } finally { 93 discordBatchInFlight = false 94 } 95 } 96 97 const hasDiscordToken = Boolean(process.env.DISCORD_BOT_TOKEN?.trim()) 98 if (hasDiscordToken) { 99 discordBatchTimer = setInterval(() => { 100 void runDiscordBatch() 101 }, DISCORD_BATCH_INTERVAL_MS) 102 if (typeof discordBatchTimer.unref === "function") discordBatchTimer.unref() 103 104 // Kick one shortly after startup; it only emits if niri is already waiting. 105 setTimeout(() => { 106 void runDiscordBatch() 107 }, 5_000).unref?.() 108 } 109 110 app.addHook("onClose", async () => { 111 if (!discordBatchTimer) return 112 clearInterval(discordBatchTimer) 113 discordBatchTimer = null 114 }) 115 116 if (existsSync(WEB_DIST_DIR)) { 117 app.register(fastifyStatic, { 118 root: WEB_DIST_DIR, 119 prefix: "/ui/", 120 index: false, 121 }) 122 123 app.get("/ui", async (_req, reply) => reply.sendFile("index.html")) 124 } else { 125 app.get("/ui", async (_req, reply) => { 126 reply.code(503) 127 return { error: "web ui is not built yet. run `npm run build:web` first." } 128 }) 129 } 130 131 app.post("/trigger/discord", async (req, reply) => { 132 const result = handleDiscordIngress(req.body) 133 return reply.send({ 134 ok: true, 135 ...result, 136 }) 137 }) 138 139 app.post("/trigger/bsky", async (req, reply) => { 140 const event = fromBsky(req.body) 141 isRunning() ? enqueueEvent(event) : wake(event) 142 return reply.send({ ok: true }) 143 }) 144 145 app.post("/trigger/webhook", async (req, reply) => { 146 const event = fromWebhook(req.body) 147 isRunning() ? enqueueEvent(event) : wake(event) 148 return reply.send({ ok: true }) 149 }) 150 151 app.post("/trigger/cron", async (req, reply) => { 152 const event = fromCron() 153 isRunning() ? enqueueEvent(event) : wake(event) 154 return reply.send({ ok: true }) 155 }) 156 157 app.post("/trigger/chat", async (req, reply) => { 158 const event = fromChat(req.body) 159 if (!event.content.trim()) { 160 return reply.code(400).send({ error: "content is required" }) 161 } 162 isRunning() ? enqueueEvent(event) : wake(event) 163 return reply.send({ ok: true, running: true }) 164 }) 165 166 app.get("/chat/stream", (req, reply) => { 167 reply.hijack() 168 169 const res = reply.raw 170 res.writeHead(200, { 171 "content-type": "text/event-stream", 172 "cache-control": "no-cache", 173 connection: "keep-alive", 174 "x-accel-buffering": "no", 175 }) 176 177 // Send initial keepalive so the connection is established 178 res.write(":ok\n\n") 179 180 const unsubscribe = subscribe((event) => { 181 try { 182 res.write(`data: ${JSON.stringify(event)}\n\n`) 183 } catch { 184 // connection closed 185 } 186 }) 187 188 const keepalive = setInterval(() => { 189 try { 190 res.write(":ping\n\n") 191 } catch { 192 clearInterval(keepalive) 193 unsubscribe() 194 } 195 }, 15_000) 196 197 req.raw.on("close", () => { 198 clearInterval(keepalive) 199 unsubscribe() 200 }) 201 }) 202 203 app.get("/status", async () => ({ 204 running: isRunning(), 205 })) 206 207 app.get("/metrics", async (req) => { 208 const query = req.query as { 209 limit?: string 210 cursor?: string 211 type?: string 212 includeRaw?: string 213 q?: string 214 from?: string 215 to?: string 216 cursor_memories?: string 217 cursor_summarization?: string 218 cursor_response?: string 219 cursor_prompt?: string 220 cursor_usage?: string 221 cursor_discord?: string 222 } 223 return getMetrics({ 224 limit: query.limit ? parseInt(query.limit, 10) : undefined, 225 cursor: query.cursor ? parseInt(query.cursor, 10) : undefined, 226 cursors: { 227 memories: query.cursor_memories, 228 summarization: query.cursor_summarization, 229 response: query.cursor_response, 230 prompt: query.cursor_prompt, 231 usage: query.cursor_usage, 232 discord: query.cursor_discord, 233 }, 234 type: parseMetricTypes(query.type), 235 includeRaw: query.includeRaw === "true" || query.includeRaw === "1", 236 q: query.q, 237 from: query.from, 238 to: query.to, 239 }) 240 }) 241 242 app.get("/metrics/discord/:id", async (req, reply) => { 243 const { id } = req.params as { id: string } 244 const metric = getDiscordMetricDetail(id) 245 if (!metric) return reply.code(404).send({ error: "discord metric not found" }) 246 return metric 247 }) 248 249 app.get("/metrics/:id", async (req, reply) => { 250 const { id } = req.params as { id: string } 251 const metric = getMetricDetail(parseInt(id, 10)) 252 if (!metric) return reply.code(404).send({ error: "metric not found" }) 253 return metric 254 }) 255 256 return app 257}