programmatic subagents
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(pi-mill): persist spawn sessions, hard-cancel workers, and add ops runbook

+724 -89
+11 -5
README.md
··· 48 48 mill watch <runId> stream tier-1 events (NDJSON with --json) 49 49 mill inspect <id>[.<spawnId>] inspect run or spawn detail 50 50 mill inspect <id> --session resolve full agent session via driver 51 - mill cancel <runId> interrupt run and all live spawns 51 + mill cancel <runId> mark cancelled + kill worker process tree 52 52 mill ls [--status <filter>] list runs 53 53 mill init generate starter mill.config.ts 54 54 ``` ··· 104 104 105 105 ``` 106 106 ~/.mill/runs/<runId>/ 107 - run.json metadata 108 - events.ndjson append-only structured event log 109 - result.json final output 110 - program.ts copied source 107 + run.json metadata (status is canonical) 108 + events.ndjson append-only structured event log 109 + result.json final output 110 + program.ts copied source 111 + worker.pid detached worker pid (best effort) 112 + logs/worker.log worker lifecycle breadcrumbs 113 + logs/cancel.log cancel/kill lifecycle breadcrumbs 114 + sessions/<spawn>.jsonl per-spawn pi session transcripts (pi driver) 111 115 ``` 116 + 117 + For operations/debugging conventions, see `docs/references/mill-v0-operations-and-troubleshooting.md`. 112 118 113 119 ### Internals 114 120
+1
docs/indexes/docs.index.md
··· 5 5 - Product spec: `docs/product-specs/mill-v0-product-spec.md` 6 6 - Architecture & boundaries: `docs/design-docs/mill-v0-architecture-and-boundaries.md` 7 7 - Toolchain, invariants, non-goals, order: `docs/references/mill-v0-toolchain-and-invariants.md` 8 + - Operations runbook: `docs/references/mill-v0-operations-and-troubleshooting.md` 8 9 9 10 ## Execution plans 10 11
+1
docs/references/README.md
··· 1 1 # References Index 2 2 3 3 - `mill-v0-toolchain-and-invariants.md` — Sections 19–23 from SPEC (toolchain, scripts, guardrails, invariants, non-goals, implementation order). 4 + - `mill-v0-operations-and-troubleshooting.md` — Runbook for cancellation semantics, run-state authority, logs, session transcripts, and stale-state reconciliation.
+65
docs/references/mill-v0-operations-and-troubleshooting.md
··· 1 + # mill v0 Operations & Troubleshooting 2 + 3 + Operational conventions for diagnosing stuck runs, cancellations, and stale UI state. 4 + 5 + ## 1) Source of truth for run state 6 + 7 + - **Canonical:** `mill status <runId> --json` and `run.json` under the run directory. 8 + - **Advisory only:** extension-local mirrors (widget/monitor caches, historical `run.json` snapshots in pi session folders). 9 + 10 + When in doubt, always trust canonical mill state. 11 + 12 + ## 2) Cancellation semantics 13 + 14 + `mill cancel <runId>` performs two steps: 15 + 16 + 1. **Logical cancel** 17 + - Appends `run:cancelled` (if needed) 18 + - Sets run status to `cancelled` 19 + 2. **Physical cancel** 20 + - Reads `worker.pid` 21 + - Validates it belongs to `_worker --run-id <runId>` 22 + - Sends `SIGTERM` to worker + descendants 23 + - After a short grace period, sends `SIGKILL` to survivors 24 + 25 + Cancel behavior is idempotent at run-state level. 26 + 27 + ## 3) On-disk artifacts to inspect 28 + 29 + For run `<runId>` in runs dir `<runsDir>`: 30 + 31 + - `<runsDir>/<runId>/run.json` 32 + - `<runsDir>/<runId>/events.ndjson` 33 + - `<runsDir>/<runId>/result.json` 34 + - `<runsDir>/<runId>/worker.pid` (best effort) 35 + - `<runsDir>/<runId>/logs/worker.log` 36 + - `<runsDir>/<runId>/logs/cancel.log` 37 + - `<runsDir>/<runId>/sessions/<spawnId>.jsonl` (pi driver transcripts) 38 + 39 + ## 4) Session behavior (pi driver) 40 + 41 + pi driver uses explicit per-spawn session files: 42 + 43 + - `--session <runDir>/sessions/<spawnId>.jsonl` 44 + - `sessionRef` in spawn result points to that file path 45 + 46 + This keeps transcripts available for post-hoc debugging and parent-orchestrator context recovery. 47 + 48 + ## 5) Fast triage checklist for "run stuck in running" 49 + 50 + 1. `mill inspect <runId> --json` 51 + - if you only see `spawn:start` and no `spawn:complete`, the child driver call is still in-flight. 52 + 2. Check process liveness using `worker.pid` + OS process list. 53 + 3. `mill cancel <runId> --json` 54 + 4. Read `logs/cancel.log` 55 + - verify TERM/KILL steps and survivor count. 56 + 5. Re-check `mill status <runId> --json` 57 + 58 + ## 6) Stale historical entries in pi monitor 59 + 60 + Convention: 61 + 62 + - Historical `status: running` entries are reconciled against canonical `mill status` on scan. 63 + - If canonical status is terminal, scanner rewrites the historical record with reconciled terminal status. 64 + 65 + This avoids long-lived "running" ghosts from previous failures.
+23 -1
packages/cli/src/public/index.api.ts
··· 86 86 87 87 const millBinPath = decodeURIComponent(new URL("../bin/mill.ts", import.meta.url).pathname); 88 88 89 + const normalizePath = (path: string): string => { 90 + if (path.length <= 1) { 91 + return path; 92 + } 93 + 94 + return path.endsWith("/") ? path.slice(0, -1) : path; 95 + }; 96 + 97 + const joinPath = (base: string, child: string): string => 98 + normalizePath(base) === "/" ? `/${child}` : `${normalizePath(base)}/${child}`; 99 + 100 + const workerPidPath = (runsDirectory: string, runId: string): string => 101 + joinPath(joinPath(runsDirectory, runId), "worker.pid"); 102 + 89 103 const launchDetachedWorker = async (input: LaunchWorkerInput): Promise<void> => { 90 104 const workerCommand = PlatformCommand.make( 91 105 process.execPath, ··· 112 126 await runWithBunContext( 113 127 Effect.gen(function* () { 114 128 const detachedScope = yield* Scope.make(); 129 + const processHandle = yield* Scope.extend( 130 + PlatformCommand.start(workerCommand), 131 + detachedScope, 132 + ); 133 + const fileSystem = yield* FileSystem.FileSystem; 134 + const pidPath = workerPidPath(input.runsDirectory, input.runId); 135 + const runDirectory = pidPath.slice(0, pidPath.lastIndexOf("/")); 115 136 116 - yield* Scope.extend(PlatformCommand.start(workerCommand), detachedScope); 137 + yield* fileSystem.makeDirectory(runDirectory, { recursive: true }); 138 + yield* fileSystem.writeFileString(pidPath, `${Number(processHandle.pid)}\n`); 117 139 }), 118 140 ); 119 141 };
+60
packages/core/src/internal/engine.effect.ts
··· 149 149 return String(error); 150 150 }; 151 151 152 + const normalizePath = (path: string): string => { 153 + if (path.length <= 1) { 154 + return path; 155 + } 156 + 157 + return path.endsWith("/") ? path.slice(0, -1) : path; 158 + }; 159 + 160 + const joinPath = (base: string, child: string): string => 161 + normalizePath(base) === "/" ? `/${child}` : `${normalizePath(base)}/${child}`; 162 + 152 163 const nextSequence = (sequenceRef: Ref.Ref<number>): Effect.Effect<number> => 153 164 Ref.updateAndGet(sequenceRef, (current) => current + 1); 154 165 ··· 724 735 }), 725 736 ); 726 737 738 + yield* Effect.logDebug("mill.engine:spawn-driver-start", { 739 + runId: runInput.runId, 740 + spawnId, 741 + driver: input.driver.name, 742 + agent: spawnInput.agent, 743 + model: spawnInput.model ?? input.defaultModel, 744 + }); 745 + 727 746 const driverOutputExit = yield* Effect.exit( 728 747 Effect.mapError( 729 748 input.driver.spawn({ 730 749 runId: runInput.runId, 750 + runDirectory: joinPath(input.runsDirectory, runInput.runId), 731 751 spawnId, 732 752 agent: spawnInput.agent, 733 753 systemPrompt: spawnInput.systemPrompt, ··· 745 765 if (Exit.isFailure(driverOutputExit)) { 746 766 const failureMessage = Cause.pretty(driverOutputExit.cause); 747 767 768 + yield* Effect.logDebug("mill.engine:spawn-driver-failed", { 769 + runId: runInput.runId, 770 + spawnId, 771 + driver: input.driver.name, 772 + message: failureMessage, 773 + }); 774 + 748 775 yield* appendSpawnErrorEvent( 749 776 input.extensions, 750 777 extensionContext, ··· 763 790 }), 764 791 ); 765 792 } 793 + 794 + yield* Effect.logDebug("mill.engine:spawn-driver-complete", { 795 + runId: runInput.runId, 796 + spawnId, 797 + driver: input.driver.name, 798 + rawLines: driverOutputExit.value.raw?.length ?? 0, 799 + events: driverOutputExit.value.events.length, 800 + }); 766 801 767 802 for (const rawLine of driverOutputExit.value.raw ?? []) { 768 803 yield* publishRawEvent(runInput.runId, rawLine); ··· 882 917 883 918 yield* Ref.update(spawnResultsRef, (items) => [...items, spawnResult]); 884 919 920 + yield* Effect.logDebug("mill.engine:spawn-complete", { 921 + runId: runInput.runId, 922 + spawnId, 923 + agent: spawnResult.agent, 924 + model: spawnResult.model, 925 + sessionRef: spawnResult.sessionRef, 926 + exitCode: spawnResult.exitCode, 927 + }); 928 + 885 929 return spawnResult; 886 930 }); 887 931 ··· 1061 1105 Effect.gen(function* () { 1062 1106 const run = yield* runStore.getRun(runId); 1063 1107 1108 + yield* Effect.logDebug("mill.engine:cancel-requested", { 1109 + runId, 1110 + status: run.status, 1111 + reason, 1112 + }); 1113 + 1064 1114 if (isRunTerminalStatus(run.status)) { 1115 + yield* Effect.logDebug("mill.engine:cancel-noop-terminal", { 1116 + runId, 1117 + status: run.status, 1118 + }); 1119 + 1065 1120 return { 1066 1121 run, 1067 1122 alreadyTerminal: true, ··· 1111 1166 "LifecycleInvariantError", 1112 1167 () => runStore.getRun(runId), 1113 1168 ); 1169 + 1170 + yield* Effect.logDebug("mill.engine:cancelled", { 1171 + runId, 1172 + status: cancelledRun.status, 1173 + }); 1114 1174 1115 1175 return { 1116 1176 run: cancelledRun,
+85 -1
packages/core/src/public/run.api.test.ts
··· 1 + import { spawn } from "node:child_process"; 1 2 import { describe, expect, it } from "bun:test"; 2 3 import { mkdtemp, readFile, rm, writeFile } from "node:fs/promises"; 3 4 import { tmpdir } from "node:os"; ··· 5 6 import * as Schema from "@effect/schema/Schema"; 6 7 import { Effect } from "effect"; 7 8 import { decodeMillEventJsonSync } from "../domain/event.schema"; 8 - import { runProgramSync, runWorker } from "./run.api"; 9 + import { decodeRunIdSync } from "../domain/run.schema"; 10 + import { makeRunStore } from "../internal/run-store.effect"; 11 + import { runWithBunContext } from "./test-runtime.api"; 12 + import { cancelRun, runProgramSync, runWorker } from "./run.api"; 9 13 import type { MillConfig } from "./types"; 10 14 11 15 const ProgramResultEnvelope = Schema.parseJson( ··· 15 19 executor: Schema.optional(Schema.String), 16 20 }), 17 21 ); 22 + 23 + const sleep = (millis: number): Promise<void> => 24 + new Promise((resolve) => { 25 + setTimeout(resolve, millis); 26 + }); 27 + 28 + const waitForProcessExit = async (pid: number, timeoutMillis: number): Promise<void> => { 29 + const deadline = Date.now() + timeoutMillis; 30 + 31 + while (Date.now() < deadline) { 32 + try { 33 + process.kill(pid, 0); 34 + } catch { 35 + return; 36 + } 37 + 38 + await sleep(25); 39 + } 40 + 41 + throw new Error(`child process ${pid} did not exit in time`); 42 + }; 18 43 19 44 const makeConfig = (): MillConfig => ({ 20 45 defaultDriver: "default", ··· 205 230 expect(hostMarker).toContain("process-host:bun"); 206 231 expect(hostMarker).toContain(`executor=${output.run.executor}`); 207 232 } finally { 233 + await rm(tempDirectory, { recursive: true, force: true }); 234 + } 235 + }); 236 + 237 + it("cancelRun kills detached worker processes using persisted worker.pid", async () => { 238 + const tempDirectory = await mkdtemp(join(tmpdir(), "mill-run-cancel-")); 239 + const runsDirectory = join(tempDirectory, "runs"); 240 + const runId = decodeRunIdSync(`run_${crypto.randomUUID()}`); 241 + const defaults = makeConfig(); 242 + 243 + const store = makeRunStore({ runsDirectory }); 244 + 245 + const worker = spawn( 246 + process.execPath, 247 + ["-e", "setInterval(() => {}, 1000)", "_worker", "--run-id", runId], 248 + { 249 + stdio: "ignore", 250 + }, 251 + ); 252 + 253 + if (worker.pid === undefined) { 254 + throw new Error("failed to start synthetic worker process"); 255 + } 256 + 257 + try { 258 + await runWithBunContext( 259 + store.create({ 260 + runId, 261 + programPath: "/tmp/program.ts", 262 + driver: "default", 263 + executor: "direct", 264 + status: "running", 265 + timestamp: "2026-02-25T10:00:00.000Z", 266 + }), 267 + ); 268 + 269 + const runDirectory = join(runsDirectory, runId); 270 + await writeFile(join(runDirectory, "worker.pid"), `${worker.pid}\n`, "utf-8"); 271 + 272 + const cancelled = await cancelRun({ 273 + defaults, 274 + runId, 275 + runsDirectory, 276 + cwd: tempDirectory, 277 + pathExists: async () => false, 278 + }); 279 + 280 + expect(cancelled.status).toBe("cancelled"); 281 + 282 + await waitForProcessExit(worker.pid, 2000); 283 + 284 + const cancelLog = await readFile(join(runDirectory, "logs", "cancel.log"), "utf-8"); 285 + expect(cancelLog).toContain("cancel:kill term-sent"); 286 + } finally { 287 + try { 288 + worker.kill("SIGKILL"); 289 + } catch { 290 + // already exited 291 + } 208 292 await rm(tempDirectory, { recursive: true, force: true }); 209 293 } 210 294 });
+249 -27
packages/core/src/public/run.api.ts
··· 1 + import { spawnSync } from "node:child_process"; 2 + import * as fs from "node:fs"; 1 3 import * as FileSystem from "@effect/platform/FileSystem"; 2 4 import * as BunContext from "@effect/platform-bun/BunContext"; 3 5 import { Effect, Runtime, Stream } from "effect"; ··· 100 102 } 101 103 102 104 const DEFAULT_SYNC_WAIT_TIMEOUT_SECONDS = 60 * 60 * 24 * 365; 105 + const WORKER_PID_FILENAME = "worker.pid"; 106 + const CANCEL_LOG_PATH = "logs/cancel.log"; 107 + const PROCESS_EXIT_GRACE_MILLIS = 400; 103 108 104 109 const normalizePath = (path: string): string => { 105 110 if (path.length <= 1) { ··· 112 117 const joinPath = (base: string, child: string): string => 113 118 normalizePath(base) === "/" ? `/${child}` : `${normalizePath(base)}/${child}`; 114 119 120 + const sleep = (millis: number): Promise<void> => 121 + new Promise((resolve) => { 122 + setTimeout(resolve, millis); 123 + }); 124 + 125 + const runDirectoryFor = (runsDirectory: string, runId: string): string => 126 + joinPath(runsDirectory, runId); 127 + 128 + const workerPidPathFor = (runDirectory: string): string => 129 + joinPath(runDirectory, WORKER_PID_FILENAME); 130 + 131 + const appendCancelLog = (runDirectory: string, message: string): void => { 132 + const logPath = joinPath(runDirectory, CANCEL_LOG_PATH); 133 + const logDirectory = logPath.slice(0, logPath.lastIndexOf("/")); 134 + const timestamp = new Date().toISOString(); 135 + 136 + try { 137 + fs.mkdirSync(logDirectory, { recursive: true }); 138 + fs.appendFileSync(logPath, `${timestamp} ${message}\n`, "utf-8"); 139 + } catch { 140 + // best effort logging only 141 + } 142 + }; 143 + 144 + const readWorkerPid = (runDirectory: string): number | undefined => { 145 + const pidPath = workerPidPathFor(runDirectory); 146 + 147 + try { 148 + const raw = fs.readFileSync(pidPath, "utf-8").trim(); 149 + const parsed = Number.parseInt(raw, 10); 150 + 151 + if (!Number.isInteger(parsed) || parsed <= 0) { 152 + return undefined; 153 + } 154 + 155 + return parsed; 156 + } catch { 157 + return undefined; 158 + } 159 + }; 160 + 161 + const removeWorkerPidFile = (runDirectory: string): void => { 162 + try { 163 + fs.rmSync(workerPidPathFor(runDirectory), { force: true }); 164 + } catch { 165 + // best effort cleanup only 166 + } 167 + }; 168 + 169 + const readProcessCommand = (pid: number): string | undefined => { 170 + const output = spawnSync("ps", ["-o", "command=", "-p", String(pid)], { 171 + encoding: "utf-8", 172 + stdio: ["ignore", "pipe", "ignore"], 173 + }); 174 + 175 + if (output.status !== 0) { 176 + return undefined; 177 + } 178 + 179 + const commandLine = output.stdout.trim(); 180 + return commandLine.length > 0 ? commandLine : undefined; 181 + }; 182 + 183 + const readProcessTable = (): ReadonlyArray<{ pid: number; ppid: number }> => { 184 + const output = spawnSync("ps", ["-ax", "-o", "pid=,ppid="], { 185 + encoding: "utf-8", 186 + stdio: ["ignore", "pipe", "ignore"], 187 + }); 188 + 189 + if (output.status !== 0) { 190 + return []; 191 + } 192 + 193 + return output.stdout 194 + .split("\n") 195 + .map((line) => line.trim()) 196 + .filter((line) => line.length > 0) 197 + .map((line) => line.split(/\s+/)) 198 + .map(([pidText, ppidText]) => ({ 199 + pid: Number.parseInt(pidText ?? "", 10), 200 + ppid: Number.parseInt(ppidText ?? "", 10), 201 + })) 202 + .filter((entry) => Number.isInteger(entry.pid) && Number.isInteger(entry.ppid)); 203 + }; 204 + 205 + const descendantsFor = ( 206 + rootPid: number, 207 + table: ReadonlyArray<{ pid: number; ppid: number }>, 208 + ): number[] => { 209 + const byParent = new Map<number, Array<number>>(); 210 + 211 + for (const entry of table) { 212 + const children = byParent.get(entry.ppid); 213 + if (children === undefined) { 214 + byParent.set(entry.ppid, [entry.pid]); 215 + } else { 216 + children.push(entry.pid); 217 + } 218 + } 219 + 220 + const descendants: Array<number> = []; 221 + const stack: Array<number> = [...(byParent.get(rootPid) ?? [])]; 222 + 223 + while (stack.length > 0) { 224 + const current = stack.pop(); 225 + if (current === undefined) { 226 + continue; 227 + } 228 + 229 + descendants.push(current); 230 + 231 + const nested = byParent.get(current); 232 + if (nested !== undefined) { 233 + stack.push(...nested); 234 + } 235 + } 236 + 237 + return descendants; 238 + }; 239 + 240 + const isProcessAlive = (pid: number): boolean => { 241 + try { 242 + process.kill(pid, 0); 243 + return true; 244 + } catch { 245 + return false; 246 + } 247 + }; 248 + 249 + const sendSignal = (pid: number, signal: NodeJS.Signals): boolean => { 250 + try { 251 + process.kill(pid, signal); 252 + return true; 253 + } catch { 254 + return false; 255 + } 256 + }; 257 + 258 + const looksLikeMillWorkerCommand = (commandLine: string, runId: string): boolean => { 259 + if (!commandLine.includes("_worker")) { 260 + return false; 261 + } 262 + 263 + return commandLine.includes(`--run-id ${runId}`); 264 + }; 265 + 266 + const terminateWorkerProcessTree = async (runsDirectory: string, runId: string): Promise<void> => { 267 + const runDirectory = runDirectoryFor(runsDirectory, runId); 268 + const workerPid = readWorkerPid(runDirectory); 269 + 270 + if (workerPid === undefined) { 271 + appendCancelLog(runDirectory, `cancel:kill skipped run=${runId} reason=no-worker-pid`); 272 + return; 273 + } 274 + 275 + const commandLine = readProcessCommand(workerPid); 276 + 277 + if (commandLine === undefined) { 278 + appendCancelLog( 279 + runDirectory, 280 + `cancel:kill stale-pid run=${runId} pid=${workerPid} reason=command-missing`, 281 + ); 282 + removeWorkerPidFile(runDirectory); 283 + return; 284 + } 285 + 286 + if (!looksLikeMillWorkerCommand(commandLine, runId)) { 287 + appendCancelLog( 288 + runDirectory, 289 + `cancel:kill skipped run=${runId} pid=${workerPid} reason=pid-mismatch command=${commandLine}`, 290 + ); 291 + return; 292 + } 293 + 294 + const table = readProcessTable(); 295 + const descendants = descendantsFor(workerPid, table); 296 + const targets = [...new Set([...descendants, workerPid])]; 297 + 298 + const termCount = targets.reduce( 299 + (count, pid) => (sendSignal(pid, "SIGTERM") ? count + 1 : count), 300 + 0, 301 + ); 302 + 303 + appendCancelLog( 304 + runDirectory, 305 + `cancel:kill term-sent run=${runId} pid=${workerPid} targets=${targets.length} signaled=${termCount}`, 306 + ); 307 + 308 + await sleep(PROCESS_EXIT_GRACE_MILLIS); 309 + 310 + const survivors = targets.filter((pid) => isProcessAlive(pid)); 311 + const killCount = survivors.reduce( 312 + (count, pid) => (sendSignal(pid, "SIGKILL") ? count + 1 : count), 313 + 0, 314 + ); 315 + 316 + appendCancelLog( 317 + runDirectory, 318 + `cancel:kill kill-sent run=${runId} pid=${workerPid} survivors=${survivors.length} signaled=${killCount}`, 319 + ); 320 + 321 + if (!isProcessAlive(workerPid)) { 322 + removeWorkerPidFile(runDirectory); 323 + } 324 + }; 325 + 115 326 const resolveProgramPath = (cwd: string, programPath: string): string => 116 327 programPath.startsWith("/") ? normalizePath(programPath) : joinPath(cwd, programPath); 117 328 ··· 305 516 const programPath = resolveProgramPath(cwd, input.programPath); 306 517 const programSource = await runWithBunContext(readProgramSource(programPath)); 307 518 const engineContext = await makeEngineForConfig(input); 519 + const runDirectory = runDirectoryFor(engineContext.runsDirectory, input.runId); 520 + const workerPidPath = workerPidPathFor(runDirectory); 308 521 309 - return runWithBunContext( 310 - runDetachedWorker({ 311 - engine: engineContext.engine, 312 - runId: decodeRunIdSync(input.runId), 313 - programPath, 314 - runsDirectory: engineContext.runsDirectory, 315 - executeProgram: (spawn) => 316 - Effect.mapError( 317 - engineContext.selectedExecutorRuntime.runProgram({ 318 - runId: input.runId, 319 - programPath, 320 - execute: executeProgramInProcessHost({ 522 + fs.mkdirSync(runDirectory, { recursive: true }); 523 + fs.writeFileSync(workerPidPath, `${process.pid}\n`, "utf-8"); 524 + 525 + try { 526 + return await runWithBunContext( 527 + runDetachedWorker({ 528 + engine: engineContext.engine, 529 + runId: decodeRunIdSync(input.runId), 530 + programPath, 531 + runsDirectory: engineContext.runsDirectory, 532 + executeProgram: (spawn) => 533 + Effect.mapError( 534 + engineContext.selectedExecutorRuntime.runProgram({ 321 535 runId: input.runId, 322 - runDirectory: joinPath(engineContext.runsDirectory, input.runId), 323 - workingDirectory: cwd, 324 536 programPath, 325 - programSource, 326 - executorName: engineContext.selectedExecutorName, 327 - extensions: engineContext.selectedExtensions, 328 - spawn, 329 - }), 330 - }), 331 - (error) => 332 - new ProgramExecutionError({ 333 - runId: input.runId, 334 - message: String(error), 537 + execute: executeProgramInProcessHost({ 538 + runId: input.runId, 539 + runDirectory: joinPath(engineContext.runsDirectory, input.runId), 540 + workingDirectory: cwd, 541 + programPath, 542 + programSource, 543 + executorName: engineContext.selectedExecutorName, 544 + extensions: engineContext.selectedExtensions, 545 + spawn, 546 + }), 335 547 }), 336 - ), 337 - }), 338 - ); 548 + (error) => 549 + new ProgramExecutionError({ 550 + runId: input.runId, 551 + message: String(error), 552 + }), 553 + ), 554 + }), 555 + ); 556 + } finally { 557 + removeWorkerPidFile(runDirectory); 558 + } 339 559 }; 340 560 341 561 export const getRunStatus = async (input: GetRunStatusInput): Promise<RunRecord> => { ··· 495 715 const cancelled = await runWithBunContext( 496 716 engineContext.engine.cancel(decodeRunIdSync(input.runId), input.reason), 497 717 ); 718 + 719 + await terminateWorkerProcessTree(engineContext.runsDirectory, input.runId); 498 720 499 721 return { 500 722 runId: cancelled.run.id,
+1
packages/core/src/public/types.ts
··· 20 20 21 21 export interface DriverSpawnInput { 22 22 readonly runId: string; 23 + readonly runDirectory: string; 23 24 readonly spawnId: string; 24 25 readonly agent: string; 25 26 readonly systemPrompt: string;
+21
packages/core/src/runtime/program-host.effect.ts
··· 292 292 Command.stderr("pipe"), 293 293 ); 294 294 295 + yield* Effect.logDebug("mill.program-host:start", { 296 + runId: input.runId, 297 + hostProgramPath, 298 + workingDirectory: input.workingDirectory, 299 + }); 300 + 295 301 const processHandle = yield* Effect.mapError( 296 302 Command.start(command), 297 303 (error) => ··· 301 307 }), 302 308 ); 303 309 310 + yield* Effect.logDebug("mill.program-host:started", { 311 + runId: input.runId, 312 + pid: Number(processHandle.pid), 313 + }); 314 + 304 315 const responseQueue = yield* Queue.unbounded<Uint8Array>(); 305 316 306 317 const stdinFiber = yield* Effect.forkScoped( ··· 322 333 kind: "result", 323 334 ok: false, 324 335 message: `Malformed program host payload: ${toMessage(decoded.left)}`, 336 + }); 337 + yield* Effect.logDebug("mill.program-host:malformed-payload", { 338 + runId: input.runId, 339 + message: toMessage(decoded.left), 325 340 }); 326 341 yield* Effect.ignore(processHandle.kill("SIGTERM")); 327 342 return; ··· 405 420 message: `Program host process failed before completion: ${toMessage(error)}`, 406 421 }), 407 422 ); 423 + 424 + yield* Effect.logDebug("mill.program-host:exit", { 425 + runId: input.runId, 426 + pid: Number(processHandle.pid), 427 + exitCode, 428 + }); 408 429 409 430 yield* Queue.shutdown(responseQueue); 410 431 yield* Effect.ignore(Fiber.join(stdinFiber));
+12 -3
packages/core/src/runtime/worker.effect.ts
··· 82 82 const workerLogPath = joinPath(runDirectory, "logs/worker.log"); 83 83 84 84 yield* appendWorkerLog(workerLogPath, `worker:start runId=${input.runId}`); 85 + yield* Effect.logDebug("mill.worker:start", { runId: input.runId, runDirectory }); 85 86 86 87 if (isTerminalStatus(submittedRun.status)) { 87 88 const existingResult = yield* input.engine.result(input.runId); ··· 91 92 workerLogPath, 92 93 `worker:terminal-noop runId=${input.runId} status=${submittedRun.status}`, 93 94 ); 95 + yield* Effect.logDebug("mill.worker:terminal-noop", { 96 + runId: input.runId, 97 + status: submittedRun.status, 98 + }); 94 99 95 100 return { 96 101 run: submittedRun, ··· 106 111 executeProgram: input.executeProgram, 107 112 }), 108 113 (error) => 109 - appendWorkerLog( 110 - workerLogPath, 111 - `worker:failed runId=${input.runId} message=${String(error)}`, 114 + Effect.zipRight( 115 + appendWorkerLog( 116 + workerLogPath, 117 + `worker:failed runId=${input.runId} message=${String(error)}`, 118 + ), 119 + Effect.logDebug("mill.worker:failed", { runId: input.runId, message: String(error) }), 112 120 ), 113 121 ); 114 122 115 123 yield* appendWorkerLog(workerLogPath, `worker:complete runId=${input.runId}`); 124 + yield* Effect.logDebug("mill.worker:complete", { runId: input.runId }); 116 125 117 126 return runOutput; 118 127 });
+1
packages/driver-claude/src/public/index.api.test.ts
··· 38 38 Effect.provide( 39 39 driver.runtime.spawn({ 40 40 runId: "run_claude_test", 41 + runDirectory: "/tmp/run_claude_test", 41 42 spawnId: "spawn_claude_test", 42 43 agent: "scout", 43 44 systemPrompt: "You are concise.",
+1
packages/driver-codex/src/public/index.api.test.ts
··· 39 39 Effect.provide( 40 40 driver.runtime.spawn({ 41 41 runId: "run_codex_test", 42 + runDirectory: "/tmp/run_codex_test", 42 43 spawnId: "spawn_codex_test", 43 44 agent: "scout", 44 45 systemPrompt: "You are concise.",
+58 -2
packages/driver-pi/src/internal/process-driver.effect.ts
··· 1 1 import * as Command from "@effect/platform/Command"; 2 + import * as FileSystem from "@effect/platform/FileSystem"; 2 3 import { Data, Effect } from "effect"; 3 4 import type { DriverProcessConfig, DriverRuntime, DriverSpawnInput } from "@mill/core"; 4 5 import { decodePiProcessOutput } from "./pi.codec"; ··· 15 16 return String(error); 16 17 }; 17 18 18 - const commandForSpawn = (config: DriverProcessConfig, input: DriverSpawnInput): Command.Command => { 19 + const normalizePath = (path: string): string => { 20 + if (path.length <= 1) { 21 + return path; 22 + } 23 + 24 + return path.endsWith("/") ? path.slice(0, -1) : path; 25 + }; 26 + 27 + const joinPath = (base: string, child: string): string => 28 + normalizePath(base) === "/" ? `/${child}` : `${normalizePath(base)}/${child}`; 29 + 30 + const sessionPathForSpawn = (input: DriverSpawnInput): string => 31 + joinPath(joinPath(input.runDirectory, "sessions"), `${input.spawnId}.jsonl`); 32 + 33 + const commandForSpawn = ( 34 + config: DriverProcessConfig, 35 + input: DriverSpawnInput, 36 + sessionPath: string, 37 + ): Command.Command => { 19 38 const command = Command.make( 20 39 config.command, 21 40 ...config.args, 41 + "--session", 42 + sessionPath, 22 43 "--model", 23 44 input.model, 24 45 "--system-prompt", ··· 37 58 name: "pi", 38 59 spawn: (input) => 39 60 Effect.gen(function* () { 40 - const command = commandForSpawn(config, input); 61 + const sessionPath = sessionPathForSpawn(input); 62 + const sessionsDirectory = sessionPath.slice(0, sessionPath.lastIndexOf("/")); 63 + const fileSystem = yield* FileSystem.FileSystem; 64 + 65 + yield* Effect.mapError( 66 + fileSystem.makeDirectory(sessionsDirectory, { recursive: true }), 67 + (error) => 68 + new PiProcessDriverError({ 69 + message: `Unable to create session directory '${sessionsDirectory}': ${toMessage(error)}`, 70 + }), 71 + ); 72 + 73 + const command = commandForSpawn(config, input, sessionPath); 74 + 75 + yield* Effect.logDebug("mill.driver-pi:spawn:start", { 76 + runId: input.runId, 77 + spawnId: input.spawnId, 78 + agent: input.agent, 79 + model: input.model, 80 + command: config.command, 81 + sessionPath, 82 + }); 83 + 41 84 const stdout = yield* Effect.mapError( 42 85 Command.string(command), 43 86 (error) => ··· 63 106 .map((line) => line.trim()) 64 107 .filter((line) => line.length > 0); 65 108 109 + const result = { 110 + ...decoded.result, 111 + sessionRef: sessionPath, 112 + }; 113 + 114 + yield* Effect.logDebug("mill.driver-pi:spawn:complete", { 115 + runId: input.runId, 116 + spawnId: input.spawnId, 117 + rawLines: raw.length, 118 + sessionRef: result.sessionRef, 119 + }); 120 + 66 121 return { 67 122 ...decoded, 123 + result, 68 124 raw, 69 125 }; 70 126 }),
+3 -1
packages/driver-pi/src/public/index.api.test.ts
··· 53 53 Effect.provide( 54 54 driver.runtime.spawn({ 55 55 runId: "run_driver_test", 56 + runDirectory: "/tmp/run_driver_test", 56 57 spawnId: "spawn_driver_test", 57 58 agent: "scout", 58 59 systemPrompt: "You are concise.", ··· 64 65 ); 65 66 66 67 expect(output.events.some((event) => event.type === "tool_call")).toBe(true); 67 - expect(output.result.sessionRef).toBe("session-test"); 68 + expect(output.result.sessionRef).toBe("/tmp/run_driver_test/sessions/spawn_driver_test.jsonl"); 68 69 expect(output.result.agent).toBe("scout"); 69 70 expect(output.result.model).toBe("openai/gpt-5.3-codex"); 70 71 expect(output.result.text).toBe("fixture:Say hello"); ··· 119 120 Effect.provide( 120 121 driver.runtime.spawn({ 121 122 runId: "run_driver_duplicate", 123 + runDirectory: "/tmp/run_driver_duplicate", 122 124 spawnId: "spawn_driver_duplicate", 123 125 agent: "scout", 124 126 systemPrompt: "You are concise.",
-1
packages/driver-pi/src/public/index.api.ts
··· 28 28 "--mode", 29 29 "json", 30 30 "--print", 31 - "--no-session", 32 31 "--no-extensions", 33 32 "--no-skills", 34 33 "--no-prompt-templates",
+2 -2
packages/pi-mill/README.md
··· 62 62 ]); 63 63 ``` 64 64 65 - Each `mill.spawn()` submits an async mill run (`mill run --json`) and then follows completion via mill APIs (`watch` + `inspect`). Model selection, driver routing, and execution behavior all come from your mill configuration. 65 + Each `mill.spawn()` submits an async mill run (`mill run --json`) and then follows completion via mill APIs (`wait` + `inspect`). Model selection, driver routing, and execution behavior all come from your mill configuration. 66 66 67 67 Runs are **async by default** — the tool returns a `runId` immediately and delivers results via notification when complete. 68 68 ··· 98 98 99 99 ## Context flow 100 100 101 - Each subagent receives the parent session path and can use `search_thread` to explore the orchestrator's conversation for context. Results (including each subagent's `sessionPath`) flow back to the orchestrator via `result.text`. 101 + Each subagent receives the parent session path and can use `search_thread` to explore the orchestrator's conversation for context. Results include each subagent's `sessionPath` (session reference, typically a `.jsonl` path for pi driver) for later inspection and context recovery.
+6 -40
packages/pi-mill/index.ts
··· 12 12 import { confirmExecution, executeProgram } from "./executors/program-executor.js"; 13 13 import { FactoryWidget } from "./widget.js"; 14 14 import { FactoryMonitor } from "./monitor.js"; 15 + import { cwdToSessionDir, getSessionsBase, scanRuns } from "./scanner.js"; 15 16 import { registerMessageRenderer, notifyCompletion } from "./notify.js"; 16 17 import type { RunSummary } from "./types.js"; 17 18 ··· 201 202 function loadHistoricalRuns(ctx: ExtensionContext, registry: RunRegistry): void { 202 203 const sessionDir = ctx.sessionManager.getSessionDir(); 203 204 if (!sessionDir) return; 204 - const millDir = path.join(sessionDir, ".mill"); 205 - if (!fs.existsSync(millDir)) return; 206 - try { 207 - for (const entry of fs.readdirSync(millDir)) { 208 - const runJsonPath = path.join(millDir, entry, "run.json"); 209 - if (!fs.existsSync(runJsonPath)) continue; 210 - try { 211 - const data = JSON.parse(fs.readFileSync(runJsonPath, "utf-8")); 212 - const artifactsDir = path.join(millDir, entry); 213 - registry.loadHistorical({ 214 - runId: data.runId, 215 - status: data.status ?? "done", 216 - summary: { 217 - runId: data.runId, 218 - status: data.status ?? "done", 219 - results: data.results ?? [], 220 - error: data.error, 221 - metadata: { 222 - task: data.task, 223 - millCommand: data.mill?.command, 224 - millArgs: data.mill?.args, 225 - millRunsDir: data.mill?.runsDir, 226 - }, 227 - observability: { 228 - status: data.status ?? "done", 229 - events: [], 230 - artifacts: [], 231 - artifactsDir, 232 - startedAt: data.startedAt ?? Date.now(), 233 - endedAt: data.completedAt, 234 - }, 235 - }, 236 - startedAt: data.startedAt ?? Date.now(), 237 - completedAt: data.completedAt, 238 - acknowledged: true, 239 - task: data.task, 240 - }); 241 - } catch {} 242 - } 243 - } catch {} 205 + 206 + const records = scanRuns(getSessionsBase(), cwdToSessionDir(sessionDir)); 207 + for (const record of records) { 208 + registry.loadHistorical(record); 209 + } 244 210 } 245 211 246 212 // ── Extension entry point ──────────────────────────────────────────────
+1 -1
packages/pi-mill/package.json
··· 1 1 { 2 2 "name": "pi-mill", 3 - "version": "0.1.1", 3 + "version": "0.1.2", 4 4 "description": "Pi extension package that routes subagent execution through mill", 5 5 "keywords": [ 6 6 "extension",
+1
packages/pi-mill/program-env.d.ts
··· 26 26 errorMessage?: string; 27 27 step?: number; 28 28 text: string; 29 + /** Subagent session reference (session id or .jsonl path). */ 29 30 sessionPath?: string; 30 31 } 31 32
+121 -4
packages/pi-mill/scanner.ts
··· 16 16 return "--" + cwd.slice(1).replace(/\//g, "-") + "--"; 17 17 } 18 18 19 - /** Shape of run.json on disk (written by writeRunJson in index.ts). */ 19 + /** 20 + * Shape of run.json on disk (written by writeRunJson in index.ts). 21 + * Convention: status="running" is advisory; scanner reconciles it against canonical mill status. 22 + */ 20 23 interface RunJsonData { 21 24 runId: string; 22 - status?: RunStatus; 25 + status?: string; 23 26 task?: string; 24 27 startedAt?: number; 25 28 completedAt?: number; 29 + reconciledAt?: number; 26 30 mill?: { 27 31 command?: string; 28 32 args?: string[]; ··· 42 46 error?: { code: string; message: string; recoverable: boolean }; 43 47 } 44 48 49 + const parseJsonObjectFromText = (text: string): Record<string, unknown> | undefined => { 50 + const lines = text 51 + .split("\n") 52 + .map((line) => line.trim()) 53 + .filter((line) => line.length > 0) 54 + .reverse(); 55 + 56 + for (const line of lines) { 57 + try { 58 + const parsed = JSON.parse(line) as unknown; 59 + if (typeof parsed === "object" && parsed !== null) { 60 + return parsed as Record<string, unknown>; 61 + } 62 + } catch { 63 + continue; 64 + } 65 + } 66 + 67 + return undefined; 68 + }; 69 + 70 + const normalizeRunStatus = (status: string | undefined): RunStatus => { 71 + if (status === undefined) { 72 + return "done"; 73 + } 74 + 75 + switch (status) { 76 + case "done": 77 + case "complete": 78 + return "done"; 79 + case "failed": 80 + return "failed"; 81 + case "cancelled": 82 + return "cancelled"; 83 + case "running": 84 + case "pending": 85 + return "running"; 86 + default: 87 + return "running"; 88 + } 89 + }; 90 + 91 + const extractStatusFromMillPayload = (payload: Record<string, unknown>): string | undefined => { 92 + const direct = payload.status; 93 + if (typeof direct === "string" && direct.length > 0) { 94 + return direct; 95 + } 96 + 97 + const nestedRun = payload.run; 98 + if (typeof nestedRun === "object" && nestedRun !== null) { 99 + const nestedStatus = (nestedRun as { status?: unknown }).status; 100 + if (typeof nestedStatus === "string" && nestedStatus.length > 0) { 101 + return nestedStatus; 102 + } 103 + } 104 + 105 + return undefined; 106 + }; 107 + 108 + const reconcileRunningStatus = (runJsonPath: string, data: RunJsonData): RunJsonData => { 109 + if (normalizeRunStatus(data.status) !== "running") { 110 + return data; 111 + } 112 + 113 + if (typeof data.runId !== "string" || data.runId.length === 0) { 114 + return data; 115 + } 116 + 117 + const command = data.mill?.command?.trim() || "mill"; 118 + const args = [...(data.mill?.args ?? []), "status", data.runId, "--json"]; 119 + 120 + if (data.mill?.runsDir && data.mill.runsDir.trim().length > 0) { 121 + args.push("--runs-dir", data.mill.runsDir); 122 + } 123 + 124 + const result = spawnSync(command, args, { 125 + stdio: ["ignore", "pipe", "pipe"], 126 + shell: false, 127 + encoding: "utf-8", 128 + }); 129 + 130 + if (result.status !== 0) { 131 + return data; 132 + } 133 + 134 + const payload = parseJsonObjectFromText(`${result.stdout}\n${result.stderr}`); 135 + if (!payload) { 136 + return data; 137 + } 138 + 139 + const canonicalStatus = normalizeRunStatus(extractStatusFromMillPayload(payload)); 140 + 141 + if (canonicalStatus === "running") { 142 + return data; 143 + } 144 + 145 + const reconciled: RunJsonData = { 146 + ...data, 147 + status: canonicalStatus, 148 + completedAt: data.completedAt ?? Date.now(), 149 + reconciledAt: Date.now(), 150 + }; 151 + 152 + try { 153 + fs.writeFileSync(runJsonPath, `${JSON.stringify(reconciled, null, 2)}\n`, "utf-8"); 154 + } catch { 155 + // best effort persistence; return reconciled in-memory snapshot regardless 156 + } 157 + 158 + return reconciled; 159 + }; 160 + 45 161 /** Parse a single run.json into a RunRecord (without promise/abort). */ 46 162 function parseRunJson( 47 163 data: RunJsonData, 48 164 artifactsDir: string, 49 165 ): Omit<RunRecord, "promise" | "abort"> { 50 - const status: RunStatus = data.status ?? "done"; 166 + const status: RunStatus = normalizeRunStatus(data.status); 51 167 const results: ExecutionResult[] = (data.results ?? []).map((r) => ({ 52 168 taskId: "", 53 169 agent: r.agent ?? "unknown", ··· 132 248 try { 133 249 const raw = fs.readFileSync(runJsonPath, "utf-8"); 134 250 const data: RunJsonData = JSON.parse(raw); 135 - records.push(parseRunJson(data, path.join(millDir, entry))); 251 + const reconciled = reconcileRunningStatus(runJsonPath, data); 252 + records.push(parseRunJson(reconciled, path.join(millDir, entry))); 136 253 } catch { 137 254 // Skip malformed run.json files 138 255 }
+1 -1
packages/pi-mill/types.ts
··· 25 25 step?: number; 26 26 /** Final assistant text output, auto-populated on completion. */ 27 27 text: string; 28 - /** Path to the subagent's session .jsonl file. Use search_thread to explore. */ 28 + /** Subagent session reference (session id or .jsonl path). Use search_thread to explore. */ 29 29 sessionPath?: string; 30 30 } 31 31