my harness for niri
1import { buildBootstrap } from "../bootstrap.js"
2import { endConversation, logMessage, startConversation } from "../db.js"
3import { emit } from "../stream.js"
4import { runLoop } from "./loop.js"
5import type { RunnerStateInternal } from "./types.js"
6import { clearSession, loadSession, saveSession } from "./util.js"
7import type { UserMessage } from "../types.js"
8
9let eventResolvers: Array<(event: UserMessage) => void> = []
10let shutdownResolve: (() => void) | null = null
11
12const state: RunnerStateInternal = {
13 contextSize: 0,
14 running: false,
15 conversation: [],
16 pendingInputs: [],
17 tokenCount: 0,
18 toolInFlight: false,
19 deferredEvents: [],
20}
21
22/**
23 * Returns whether a runner session is currently active.
24 *
25 * @returns `true` when the wake loop is running, otherwise `false`.
26 */
27export function isRunning(): boolean {
28 return state.running
29}
30
31/** Push an event into the live session, resolving the loop's wait if it's idle. */
32function deliverEvent(event: UserMessage): void {
33 console.log("[runner] queued event from", event.source)
34 if (state.toolInFlight) {
35 // Defer external steering until the currently running tool settles.
36 state.deferredEvents.push(event)
37 return
38 }
39
40 // Never push directly into conversation as the loop adds the user message at
41 // waitForEvent() time, which is only reached when no tool calls are in flight.
42 const resolver = eventResolvers.shift()
43 if (resolver) {
44 resolver(event)
45 } else {
46 state.pendingInputs.push(event)
47 }
48}
49
50function flushDeferredEvents(): void {
51 if (state.toolInFlight || state.deferredEvents.length === 0) return
52 const events = state.deferredEvents
53 state.deferredEvents = []
54 for (const event of events) {
55 deliverEvent(event)
56 }
57}
58
59/**
60 * Enqueues an event for the active session, or stores it for the next wake cycle.
61 *
62 * @param event - Trigger event to deliver to the runner.
63 */
64export function enqueueEvent(event: UserMessage): void {
65 if (state.running) {
66 deliverEvent(event)
67 } else {
68 state.pendingInputs.push(event)
69 }
70}
71
72/**
73 * Requests graceful shutdown by injecting a final chat event and waiting for rest.
74 *
75 * Resolves immediately if no session is currently running.
76 *
77 * @returns A promise that resolves when shutdown handling has completed.
78 */
79export function shutdown(): Promise<void> {
80 if (!state.running) return Promise.resolve()
81
82 return new Promise<void>((resolve) => {
83 if (!state.running) {
84 resolve()
85 return
86 }
87 shutdownResolve = resolve
88 deliverEvent({
89 source: "chat",
90 content: "hey, the harness is shutting down. please journal this session and rest 💙",
91 triggeredAt: new Date().toISOString(),
92 raw: {},
93 })
94 })
95}
96
97/** Wait until the next event arrives (or return immediately if one is pending). */
98function waitForEvent(): Promise<UserMessage> {
99 if (state.pendingInputs.length > 0) {
100 return Promise.resolve(state.pendingInputs.shift()!)
101 }
102 return new Promise<UserMessage>((resolve) => {
103 eventResolvers.push(resolve)
104 })
105}
106
107function formatIncomingEvent(event: UserMessage): string {
108 return `[incoming — ${event.source}]\n\n${event.content}`
109}
110
111function emitUserEvent(event: UserMessage): void {
112 emit({
113 type: "user",
114 source: event.source,
115 text: event.content,
116 triggeredAt: event.triggeredAt,
117 clientId: event.clientId,
118 })
119}
120
121function injectIncomingEvent(convId: number, event: UserMessage): void {
122 const incomingMessage = formatIncomingEvent(event)
123 state.conversation.push({
124 role: "user",
125 content: incomingMessage,
126 })
127 logMessage(convId, "user", incomingMessage)
128 emitUserEvent(event)
129 console.log("[runner] injected event from", event.source)
130}
131
132/**
133 * Starts the runner loop for an incoming trigger event.
134 *
135 * If a session is already active, the event is queued instead of starting a new loop.
136 *
137 * @param event - Event that should wake or steer the runner.
138 * @returns A promise that resolves once this wake cycle fully completes.
139 */
140export async function wake(event: UserMessage): Promise<void> {
141 if (state.running) {
142 enqueueEvent(event)
143 return
144 }
145
146 state.running = true
147 state.tokenCount = 0
148 state.contextSize = 0
149
150 const saved = await loadSession()
151 if (saved) {
152 state.conversation = saved
153 state.conversation.push({
154 role: "user",
155 content: `[harness restarted — ${event.source} @ ${event.triggeredAt}]\n\n${event.content}`,
156 })
157 } else {
158 state.conversation = await buildBootstrap(event)
159 }
160
161 const convId = startConversation(event.source, event.triggeredAt)
162
163 const wakeMsg = state.conversation[state.conversation.length - 1]
164 if (wakeMsg && wakeMsg.role === "user") {
165 logMessage(convId, "user", typeof wakeMsg.content === "string" ? wakeMsg.content : JSON.stringify(wakeMsg.content))
166 emitUserEvent(event)
167 }
168
169 console.log("[runner] niri is awake")
170
171 try {
172 await runLoop(convId, state, {
173 waitForEvent,
174 injectIncomingEvent,
175 flushDeferredEvents,
176 clearSession,
177 saveSession: async () => saveSession(state.conversation),
178 })
179 } finally {
180 endConversation(convId, state.tokenCount)
181 state.running = false
182 state.conversation = []
183 state.toolInFlight = false
184 flushDeferredEvents()
185 state.deferredEvents = []
186 eventResolvers = []
187 console.log("[runner] niri is resting")
188 shutdownResolve?.()
189 shutdownResolve = null
190
191 if (state.pendingInputs.length > 0) {
192 const next = state.pendingInputs.shift()!
193 console.log("[runner] pending event queued, waking again")
194 await wake(next)
195 }
196 }
197}