my harness for niri
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at master 197 lines 5.6 kB view raw
1import { buildBootstrap } from "../bootstrap.js" 2import { endConversation, logMessage, startConversation } from "../db.js" 3import { emit } from "../stream.js" 4import { runLoop } from "./loop.js" 5import type { RunnerStateInternal } from "./types.js" 6import { clearSession, loadSession, saveSession } from "./util.js" 7import type { UserMessage } from "../types.js" 8 9let eventResolvers: Array<(event: UserMessage) => void> = [] 10let shutdownResolve: (() => void) | null = null 11 12const state: RunnerStateInternal = { 13 contextSize: 0, 14 running: false, 15 conversation: [], 16 pendingInputs: [], 17 tokenCount: 0, 18 toolInFlight: false, 19 deferredEvents: [], 20} 21 22/** 23 * Returns whether a runner session is currently active. 24 * 25 * @returns `true` when the wake loop is running, otherwise `false`. 26 */ 27export function isRunning(): boolean { 28 return state.running 29} 30 31/** Push an event into the live session, resolving the loop's wait if it's idle. */ 32function deliverEvent(event: UserMessage): void { 33 console.log("[runner] queued event from", event.source) 34 if (state.toolInFlight) { 35 // Defer external steering until the currently running tool settles. 36 state.deferredEvents.push(event) 37 return 38 } 39 40 // Never push directly into conversation as the loop adds the user message at 41 // waitForEvent() time, which is only reached when no tool calls are in flight. 42 const resolver = eventResolvers.shift() 43 if (resolver) { 44 resolver(event) 45 } else { 46 state.pendingInputs.push(event) 47 } 48} 49 50function flushDeferredEvents(): void { 51 if (state.toolInFlight || state.deferredEvents.length === 0) return 52 const events = state.deferredEvents 53 state.deferredEvents = [] 54 for (const event of events) { 55 deliverEvent(event) 56 } 57} 58 59/** 60 * Enqueues an event for the active session, or stores it for the next wake cycle. 61 * 62 * @param event - Trigger event to deliver to the runner. 63 */ 64export function enqueueEvent(event: UserMessage): void { 65 if (state.running) { 66 deliverEvent(event) 67 } else { 68 state.pendingInputs.push(event) 69 } 70} 71 72/** 73 * Requests graceful shutdown by injecting a final chat event and waiting for rest. 74 * 75 * Resolves immediately if no session is currently running. 76 * 77 * @returns A promise that resolves when shutdown handling has completed. 78 */ 79export function shutdown(): Promise<void> { 80 if (!state.running) return Promise.resolve() 81 82 return new Promise<void>((resolve) => { 83 if (!state.running) { 84 resolve() 85 return 86 } 87 shutdownResolve = resolve 88 deliverEvent({ 89 source: "chat", 90 content: "hey, the harness is shutting down. please journal this session and rest 💙", 91 triggeredAt: new Date().toISOString(), 92 raw: {}, 93 }) 94 }) 95} 96 97/** Wait until the next event arrives (or return immediately if one is pending). */ 98function waitForEvent(): Promise<UserMessage> { 99 if (state.pendingInputs.length > 0) { 100 return Promise.resolve(state.pendingInputs.shift()!) 101 } 102 return new Promise<UserMessage>((resolve) => { 103 eventResolvers.push(resolve) 104 }) 105} 106 107function formatIncomingEvent(event: UserMessage): string { 108 return `[incoming — ${event.source}]\n\n${event.content}` 109} 110 111function emitUserEvent(event: UserMessage): void { 112 emit({ 113 type: "user", 114 source: event.source, 115 text: event.content, 116 triggeredAt: event.triggeredAt, 117 clientId: event.clientId, 118 }) 119} 120 121function injectIncomingEvent(convId: number, event: UserMessage): void { 122 const incomingMessage = formatIncomingEvent(event) 123 state.conversation.push({ 124 role: "user", 125 content: incomingMessage, 126 }) 127 logMessage(convId, "user", incomingMessage) 128 emitUserEvent(event) 129 console.log("[runner] injected event from", event.source) 130} 131 132/** 133 * Starts the runner loop for an incoming trigger event. 134 * 135 * If a session is already active, the event is queued instead of starting a new loop. 136 * 137 * @param event - Event that should wake or steer the runner. 138 * @returns A promise that resolves once this wake cycle fully completes. 139 */ 140export async function wake(event: UserMessage): Promise<void> { 141 if (state.running) { 142 enqueueEvent(event) 143 return 144 } 145 146 state.running = true 147 state.tokenCount = 0 148 state.contextSize = 0 149 150 const saved = await loadSession() 151 if (saved) { 152 state.conversation = saved 153 state.conversation.push({ 154 role: "user", 155 content: `[harness restarted — ${event.source} @ ${event.triggeredAt}]\n\n${event.content}`, 156 }) 157 } else { 158 state.conversation = await buildBootstrap(event) 159 } 160 161 const convId = startConversation(event.source, event.triggeredAt) 162 163 const wakeMsg = state.conversation[state.conversation.length - 1] 164 if (wakeMsg && wakeMsg.role === "user") { 165 logMessage(convId, "user", typeof wakeMsg.content === "string" ? wakeMsg.content : JSON.stringify(wakeMsg.content)) 166 emitUserEvent(event) 167 } 168 169 console.log("[runner] niri is awake") 170 171 try { 172 await runLoop(convId, state, { 173 waitForEvent, 174 injectIncomingEvent, 175 flushDeferredEvents, 176 clearSession, 177 saveSession: async () => saveSession(state.conversation), 178 }) 179 } finally { 180 endConversation(convId, state.tokenCount) 181 state.running = false 182 state.conversation = [] 183 state.toolInFlight = false 184 flushDeferredEvents() 185 state.deferredEvents = [] 186 eventResolvers = [] 187 console.log("[runner] niri is resting") 188 shutdownResolve?.() 189 shutdownResolve = null 190 191 if (state.pendingInputs.length > 0) { 192 const next = state.pendingInputs.shift()! 193 console.log("[runner] pending event queued, waking again") 194 await wake(next) 195 } 196 } 197}