my harness for niri
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

i shouldve designed it like this from the start

+65 -37
+6 -3
src/discord/pipeline.ts
··· 3 3 import { ingestDiscordEvent } from "./state.js" 4 4 5 5 const DISCORD_WAKE_ON_EVENT = (process.env.DISCORD_WAKE_ON_EVENT ?? "false").trim().toLowerCase() === "true" 6 - const DISCORD_WAKE_ON_DM = (process.env.DISCORD_WAKE_ON_DM ?? "true").trim().toLowerCase() !== "false" 7 6 8 7 export type DiscordIngressOutcome = { 9 8 ingested: boolean ··· 15 14 export function handleDiscordIngress(payload: unknown): DiscordIngressOutcome { 16 15 const ingest = ingestDiscordEvent(payload) 17 16 const isWakeEligible = ingest.isNew && !ingest.isFromSelf && Boolean(ingest.bucket) 18 - const wakeForDm = DISCORD_WAKE_ON_DM && ingest.bucket === "dm" 17 + const wakeForDm = ingest.bucket === "dm" 19 18 const shouldWake = isWakeEligible && (DISCORD_WAKE_ON_EVENT || wakeForDm) 20 19 21 20 if (!shouldWake) { ··· 28 27 } 29 28 30 29 const event = fromDiscord(payload) 31 - isRunning() ? enqueueEvent(event) : wake(event) 30 + if (isRunning()) { 31 + enqueueEvent(event, { priority: wakeForDm }) 32 + } else { 33 + void wake(event) 34 + } 32 35 33 36 return { 34 37 ingested: ingest.stored,
+41 -8
src/runner/index.ts
··· 30 30 return state.running 31 31 } 32 32 33 + /** Returns whether the runner is currently blocked in wait or wait_then_continue. */ 34 + export function isWaitingForEvent(): boolean { 35 + return eventResolvers.length > 0 36 + } 37 + 38 + type EnqueueOptions = { 39 + onlyIfWaiting?: boolean 40 + priority?: boolean 41 + } 42 + 33 43 /** Push an event into the live session, resolving the loop's wait if it's idle. */ 34 - function deliverEvent(event: UserMessage): void { 44 + function deliverEvent(event: UserMessage, options: EnqueueOptions = {}): boolean { 45 + if (options.onlyIfWaiting && eventResolvers.length === 0) { 46 + return false 47 + } 48 + 35 49 console.log("[runner] queued event from", event.source) 36 50 if (state.toolInFlight) { 37 51 // Defer external steering until the currently running tool settles. 38 - state.deferredEvents.push(event) 39 - return 52 + const deferred = { event, priority: Boolean(options.priority) } 53 + if (options.priority) { 54 + state.deferredEvents.unshift(deferred) 55 + } else { 56 + state.deferredEvents.push(deferred) 57 + } 58 + return true 40 59 } 41 60 42 61 // Never push directly into conversation as the loop adds the user message at ··· 45 64 if (resolver) { 46 65 resolver(event) 47 66 } else { 48 - state.pendingInputs.push(event) 67 + if (options.priority) { 68 + state.pendingInputs.unshift(event) 69 + } else { 70 + state.pendingInputs.push(event) 71 + } 49 72 } 73 + return true 50 74 } 51 75 52 76 function flushDeferredEvents(): void { 53 77 if (state.toolInFlight || state.deferredEvents.length === 0) return 54 78 const events = state.deferredEvents 55 79 state.deferredEvents = [] 56 - for (const event of events) { 57 - deliverEvent(event) 80 + for (const { event, priority } of events) { 81 + deliverEvent(event, { priority }) 58 82 } 59 83 } 60 84 61 85 /** 62 86 * Enqueues an event for the active session, or stores it for the next wake cycle. 87 + * `onlyIfWaiting` rejects the event unless a wait resolver is active. 63 88 * 64 89 * @param event - Trigger event to deliver to the runner. 90 + * @returns `true` when the event was accepted for delivery. 65 91 */ 66 - export function enqueueEvent(event: UserMessage): void { 92 + export function enqueueEvent(event: UserMessage, options: EnqueueOptions = {}): boolean { 67 93 if (state.running) { 68 - deliverEvent(event) 94 + return deliverEvent(event, options) 95 + } 96 + if (options.onlyIfWaiting) { 97 + return false 98 + } 99 + if (options.priority) { 100 + state.pendingInputs.unshift(event) 69 101 } else { 70 102 state.pendingInputs.push(event) 71 103 } 104 + return true 72 105 } 73 106 74 107 /**
+1 -1
src/runner/types.ts
··· 36 36 /** Internal runtime state for the runner service. */ 37 37 export interface RunnerStateInternal extends RunnerState { 38 38 toolInFlight: boolean 39 - deferredEvents: UserMessage[] 39 + deferredEvents: Array<{ event: UserMessage; priority: boolean }> 40 40 }
+17 -25
src/server.ts
··· 3 3 import { existsSync } from "node:fs" 4 4 import { dirname, join } from "node:path" 5 5 import { fileURLToPath } from "node:url" 6 - import { wake, isRunning, enqueueEvent } from "./runner/index.js" 6 + import { wake, isRunning, isWaitingForEvent, enqueueEvent } from "./runner/index.js" 7 7 import { buildDiscordBatchDigest } from "./discord/state.js" 8 8 import { handleDiscordIngress } from "./discord/pipeline.js" 9 9 import { fromBsky } from "./triggers/bsky.js" ··· 30 30 let discordBatchInFlight = false 31 31 let discordBatchTimer: ReturnType<typeof setInterval> | null = null 32 32 33 - const emitDiscordBatchEvent = (content: string, raw: Record<string, unknown>): void => { 34 - const event: UserMessage = { 35 - source: "discord", 36 - triggeredAt: new Date().toISOString(), 37 - content, 38 - raw: { 39 - ...raw, 40 - interrupt_wait: true, 41 - }, 42 - } 43 - if (isRunning()) { 44 - console.log("[discord batch] interrupting active runner wait/loop") 45 - enqueueEvent(event) 46 - } else { 47 - void wake(event) 48 - } 49 - } 50 - 51 33 const runDiscordBatch = async (): Promise<void> => { 52 34 if (discordBatchInFlight) return 35 + if (!isRunning()) return 36 + if (!isWaitingForEvent()) return 53 37 discordBatchInFlight = true 54 38 try { 55 39 const digest = buildDiscordBatchDigest({ ··· 58 42 }) 59 43 if (!digest) return 60 44 61 - emitDiscordBatchEvent(digest.content, { 62 - type: "discord_batch", 63 - digest, 64 - source: "gateway_cache", 65 - }) 45 + enqueueEvent( 46 + { 47 + source: "discord", 48 + triggeredAt: new Date().toISOString(), 49 + content: digest.content, 50 + raw: { 51 + type: "discord_batch", 52 + digest, 53 + source: "gateway_cache", 54 + }, 55 + }, 56 + { onlyIfWaiting: true }, 57 + ) 66 58 } finally { 67 59 discordBatchInFlight = false 68 60 } ··· 75 67 }, DISCORD_BATCH_INTERVAL_MS) 76 68 if (typeof discordBatchTimer.unref === "function") discordBatchTimer.unref() 77 69 78 - // Kick one shortly after startup so a sleeping niri can receive recent Discord context quickly. 70 + // Kick one shortly after startup; it only emits if niri is already waiting. 79 71 setTimeout(() => { 80 72 void runDiscordBatch() 81 73 }, 5_000).unref?.()