my harness for niri
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at master 195 lines 5.8 kB view raw
1import Fastify from "fastify" 2import fastifyStatic from "@fastify/static" 3import { existsSync } from "node:fs" 4import { dirname, join } from "node:path" 5import { fileURLToPath } from "node:url" 6import { wake, isRunning, enqueueEvent } from "./runner/index.js" 7import { buildDiscordBatchDigest, scanDiscordChannels } from "./discord/state.js" 8import { handleDiscordIngress } from "./discord/pipeline.js" 9import { fromBsky } from "./triggers/bsky.js" 10import { fromWebhook } from "./triggers/webhook.js" 11import { fromCron } from "./triggers/cron.js" 12import { fromChat } from "./triggers/chat.js" 13import { subscribe } from "./stream.js" 14import type { UserMessage } from "./types.js" 15 16const SRC_DIR = dirname(fileURLToPath(import.meta.url)) 17const WEB_DIST_DIR = join(SRC_DIR, "..", "apps", "web", "dist") 18const DISCORD_BATCH_INTERVAL_MS = Math.max( 19 60_000, 20 parseInt(process.env.DISCORD_BATCH_INTERVAL_MS ?? "60000", 10) || 60_000, 21) 22const DISCORD_BATCH_MAX_MESSAGES = Math.max( 23 5, 24 Math.min(200, parseInt(process.env.DISCORD_BATCH_MAX_MESSAGES ?? "40", 10) || 40), 25) 26const DISCORD_BATCH_SCAN = (process.env.DISCORD_BATCH_SCAN ?? "true").trim().toLowerCase() !== "false" 27 28export function createServer() { 29 const app = Fastify({ logger: false }) 30 let discordBatchInFlight = false 31 let discordBatchTimer: ReturnType<typeof setInterval> | null = null 32 33 const emitDiscordBatchEvent = (content: string, raw: Record<string, unknown>): void => { 34 const event: UserMessage = { 35 source: "discord", 36 triggeredAt: new Date().toISOString(), 37 content, 38 raw, 39 } 40 isRunning() ? enqueueEvent(event) : wake(event) 41 } 42 43 const runDiscordBatch = async (): Promise<void> => { 44 if (discordBatchInFlight) return 45 discordBatchInFlight = true 46 try { 47 let scanSummary: Record<string, unknown> | null = null 48 let scanError: string | null = null 49 50 if (DISCORD_BATCH_SCAN) { 51 try { 52 scanSummary = await scanDiscordChannels({ 53 limit: Math.min(100, DISCORD_BATCH_MAX_MESSAGES), 54 }) 55 } catch (err) { 56 scanError = err instanceof Error ? err.message : String(err) 57 console.warn("[discord batch] scan failed:", scanError) 58 } 59 } 60 61 const digest = buildDiscordBatchDigest({ 62 maxMessages: DISCORD_BATCH_MAX_MESSAGES, 63 intervalMs: DISCORD_BATCH_INTERVAL_MS, 64 }) 65 if (!digest) return 66 67 const scanNote = scanSummary 68 ? `\n\nscan snapshot: ${JSON.stringify(scanSummary)}` 69 : scanError 70 ? `\n\nscan snapshot error: ${scanError}` 71 : "" 72 73 emitDiscordBatchEvent(`${digest.content}${scanNote}`, { 74 type: "discord_batch", 75 digest, 76 ...(scanSummary ? { scan: scanSummary } : {}), 77 ...(scanError ? { scan_error: scanError } : {}), 78 }) 79 } finally { 80 discordBatchInFlight = false 81 } 82 } 83 84 const hasDiscordToken = Boolean(process.env.DISCORD_BOT_TOKEN?.trim()) 85 if (hasDiscordToken) { 86 discordBatchTimer = setInterval(() => { 87 void runDiscordBatch() 88 }, DISCORD_BATCH_INTERVAL_MS) 89 if (typeof discordBatchTimer.unref === "function") discordBatchTimer.unref() 90 91 // Kick one shortly after startup so a sleeping niri can receive recent Discord context quickly. 92 setTimeout(() => { 93 void runDiscordBatch() 94 }, 5_000).unref?.() 95 } 96 97 app.addHook("onClose", async () => { 98 if (!discordBatchTimer) return 99 clearInterval(discordBatchTimer) 100 discordBatchTimer = null 101 }) 102 103 if (existsSync(WEB_DIST_DIR)) { 104 app.register(fastifyStatic, { 105 root: WEB_DIST_DIR, 106 prefix: "/ui/", 107 index: false, 108 }) 109 110 app.get("/ui", async (_req, reply) => reply.sendFile("index.html")) 111 } else { 112 app.get("/ui", async (_req, reply) => { 113 reply.code(503) 114 return { error: "web ui is not built yet. run `npm run build:web` first." } 115 }) 116 } 117 118 app.post("/trigger/discord", async (req, reply) => { 119 const result = handleDiscordIngress(req.body) 120 return reply.send({ 121 ok: true, 122 ...result, 123 }) 124 }) 125 126 app.post("/trigger/bsky", async (req, reply) => { 127 const event = fromBsky(req.body) 128 isRunning() ? enqueueEvent(event) : wake(event) 129 return reply.send({ ok: true }) 130 }) 131 132 app.post("/trigger/webhook", async (req, reply) => { 133 const event = fromWebhook(req.body) 134 isRunning() ? enqueueEvent(event) : wake(event) 135 return reply.send({ ok: true }) 136 }) 137 138 app.post("/trigger/cron", async (req, reply) => { 139 const event = fromCron() 140 isRunning() ? enqueueEvent(event) : wake(event) 141 return reply.send({ ok: true }) 142 }) 143 144 app.post("/trigger/chat", async (req, reply) => { 145 const event = fromChat(req.body) 146 if (!event.content.trim()) { 147 return reply.code(400).send({ error: "content is required" }) 148 } 149 isRunning() ? enqueueEvent(event) : wake(event) 150 return reply.send({ ok: true, running: true }) 151 }) 152 153 app.get("/chat/stream", (req, reply) => { 154 reply.hijack() 155 156 const res = reply.raw 157 res.writeHead(200, { 158 "content-type": "text/event-stream", 159 "cache-control": "no-cache", 160 connection: "keep-alive", 161 "x-accel-buffering": "no", 162 }) 163 164 // Send initial keepalive so the connection is established 165 res.write(":ok\n\n") 166 167 const unsubscribe = subscribe((event) => { 168 try { 169 res.write(`data: ${JSON.stringify(event)}\n\n`) 170 } catch { 171 // connection closed 172 } 173 }) 174 175 const keepalive = setInterval(() => { 176 try { 177 res.write(":ping\n\n") 178 } catch { 179 clearInterval(keepalive) 180 unsubscribe() 181 } 182 }, 15_000) 183 184 req.raw.on("close", () => { 185 clearInterval(keepalive) 186 unsubscribe() 187 }) 188 }) 189 190 app.get("/status", async () => ({ 191 running: isRunning(), 192 })) 193 194 return app 195}