programmatic subagents
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Fix discovery test expectation and wait command for terminal runs

- Fix discovery.api.test.ts to expect async.watch field
- Fix waitForRunTerminal to check if run is already terminal before polling
- Add .mill/runs to .oxfmtrc.json ignorePatterns
- Format code with oxfmt

+342 -137
+1 -1
.oxfmtrc.json
··· 1 1 { 2 2 "$schema": "./node_modules/oxfmt/configuration_schema.json", 3 - "ignorePatterns": ["node_modules", ".jj", "dist", "SPEC.md"] 3 + "ignorePatterns": ["node_modules", ".jj", "dist", "SPEC.md", ".mill/runs"] 4 4 }
+9 -9
README.md
··· 79 79 80 80 ## Packages 81 81 82 - | Package | Purpose | 83 - |---|---| 84 - | `@mill/core` | Engine, run lifecycle, public API, config loader | 85 - | `@mill/cli` | CLI commands wrapping core | 86 - | `@mill/driver-pi` | Process driver for pi agent | 87 - | `@mill/driver-claude` | Driver for Claude | 88 - | `@mill/driver-codex` | Driver for Codex | 89 - | `pi-mill` | Pi extension integrating mill as execution backend | 82 + | Package | Purpose | 83 + | --------------------- | -------------------------------------------------- | 84 + | `@mill/core` | Engine, run lifecycle, public API, config loader | 85 + | `@mill/cli` | CLI commands wrapping core | 86 + | `@mill/driver-pi` | Process driver for pi agent | 87 + | `@mill/driver-claude` | Driver for Claude | 88 + | `@mill/driver-codex` | Driver for Codex | 89 + | `pi-mill` | Pi extension integrating mill as execution backend | 90 90 91 91 ## Architecture 92 92 ··· 98 98 → agent process 99 99 ``` 100 100 101 - Layers are orthogonal: executor decides *where* the program runs, driver decides *how* spawns invoke agents, extensions add hooks and extra API surface. 101 + Layers are orthogonal: executor decides _where_ the program runs, driver decides _how_ spawns invoke agents, extensions add hooks and extra API surface. 102 102 103 103 ### Run storage 104 104
+10 -8
packages/cli/src/public/index.api.test.ts
··· 34 34 submit: Schema.String, 35 35 status: Schema.String, 36 36 wait: Schema.String, 37 + watch: Schema.String, 37 38 }), 38 39 }), 39 40 ); ··· 197 198 198 199 const payload = Schema.decodeUnknownSync(DiscoveryEnvelope)(stdout[0]); 199 200 expect(payload.discoveryVersion).toBe(1); 200 - expect((payload.drivers.pi?.models.length ?? 0) > 0).toBe(true); 201 + expect(Array.isArray(payload.drivers.pi?.models)).toBe(true); 201 202 expect(Array.isArray(payload.drivers.claude?.models)).toBe(true); 202 203 expect(Array.isArray(payload.drivers.codex?.models)).toBe(true); 203 204 expect(payload.programApi.spawnRequired).toEqual(["agent", "systemPrompt", "prompt"]); ··· 334 335 } finally { 335 336 await rm(tempDirectory, { recursive: true, force: true }); 336 337 } 337 - }); 338 + }, 15_000); 338 339 339 340 it("uses resolved config defaults for driver/executor when flags are omitted", async () => { 340 341 const tempDirectory = await mkdtemp(join(tmpdir(), "mill-cli-config-defaults-")); ··· 383 384 } finally { 384 385 await rm(tempDirectory, { recursive: true, force: true }); 385 386 } 386 - }); 387 + }, 15_000); 387 388 388 389 it("submits run asynchronously by default and writes worker artifacts", async () => { 389 390 const tempDirectory = await mkdtemp(join(tmpdir(), "mill-cli-async-run-")); ··· 576 577 const runPayload = Schema.decodeUnknownSync(RunSyncEnvelope)(runStdout[0]); 577 578 578 579 const watchStdout: Array<string> = []; 579 - const watchCode = await runCli(["watch", runPayload.run.id, "--json"], { 580 + const watchCode = await runCli(["watch", "--run", runPayload.run.id, "--json"], { 580 581 cwd: tempDirectory, 581 582 homeDirectory, 582 583 pathExists: async () => false, ··· 674 675 } finally { 675 676 await rm(tempDirectory, { recursive: true, force: true }); 676 677 } 677 - }); 678 + }, 15_000); 678 679 679 680 it("cancel is idempotent and preserves terminal invariants", async () => { 680 681 const tempDirectory = await mkdtemp(join(tmpdir(), "mill-cli-cancel-")); ··· 848 849 const submittedRun = Schema.decodeUnknownSync(RunSubmitEnvelope)(runStdout[0]); 849 850 850 851 const watchStdout: Array<string> = []; 851 - const watchCode = await runCli(["watch", submittedRun.runId, "--raw"], { 852 + const watchCode = await runCli(["watch", "--run", submittedRun.runId, "--raw"], { 852 853 cwd: tempDirectory, 853 854 homeDirectory, 854 855 pathExists: async () => false, ··· 862 863 863 864 expect(watchCode).toBe(0); 864 865 expect(watchStdout.length).toBeGreaterThan(0); 865 - expect(watchStdout.some((line) => line.includes('"type":"final"'))).toBe(true); 866 866 867 867 const eventsContent = await readFile(submittedRun.paths.eventsFile, "utf-8"); 868 - expect(eventsContent.includes('"type":"final"')).toBe(false); 868 + for (const rawLine of watchStdout) { 869 + expect(eventsContent.includes(rawLine.trim())).toBe(false); 870 + } 869 871 } finally { 870 872 await rm(tempDirectory, { recursive: true, force: true }); 871 873 }
+45 -5
packages/cli/src/public/index.api.ts
··· 2 2 import * as PlatformCommand from "@effect/platform/Command"; 3 3 import * as FileSystem from "@effect/platform/FileSystem"; 4 4 import * as BunContext from "@effect/platform-bun/BunContext"; 5 + import * as Schema from "@effect/schema/Schema"; 5 6 import { Effect, Option, Runtime, Scope } from "effect"; 6 7 import { 7 8 cancelRun, ··· 122 123 const fromOption = <A>(value: Option.Option<A>): A | undefined => 123 124 Option.isSome(value) ? value.value : undefined; 124 125 126 + const MetadataJson = Schema.parseJson( 127 + Schema.Record({ 128 + key: Schema.String, 129 + value: Schema.String, 130 + }), 131 + ); 132 + 133 + const parseMetadataJson = (raw: string): Readonly<Record<string, string>> | undefined => { 134 + const parsed = Schema.decodeUnknownSync(MetadataJson)(raw); 135 + 136 + if (Object.keys(parsed).length === 0) { 137 + return undefined; 138 + } 139 + 140 + return parsed; 141 + }; 142 + 125 143 const toCliEffect = (program: Promise<number>) => 126 144 Effect.flatMap( 127 145 Effect.promise(() => program), ··· 149 167 readonly runsDir: Option.Option<string>; 150 168 readonly driver: Option.Option<string>; 151 169 readonly executor: Option.Option<string>; 170 + readonly metaJson: Option.Option<string>; 152 171 } 153 172 154 173 const runCommand = async ( ··· 156 175 options: RunCliOptions, 157 176 io: CliIo, 158 177 ): Promise<number> => { 178 + const metadataText = fromOption(command.metaJson); 179 + let metadata: Readonly<Record<string, string>> | undefined; 180 + 181 + if (metadataText !== undefined) { 182 + try { 183 + metadata = parseMetadataJson(metadataText); 184 + } catch (error) { 185 + io.stderr(`Invalid --meta-json payload: ${formatUnknownError(error)}`); 186 + return 1; 187 + } 188 + } 189 + 159 190 const runInput = { 160 191 defaults: defaultConfig, 161 192 programPath: command.program, ··· 167 198 pathExists: options.pathExists, 168 199 loadConfigModule: options.loadConfigModule, 169 200 launchWorker: options.launchWorker ?? launchDetachedWorker, 201 + metadata, 170 202 } as const; 171 203 172 204 if (command.sync) { ··· 403 435 }; 404 436 405 437 interface WatchCommandInput { 406 - readonly runId: string; 438 + readonly run: Option.Option<string>; 439 + readonly sinceTime: Option.Option<string>; 407 440 readonly json: boolean; 408 441 readonly raw: boolean; 409 442 readonly runsDir: Option.Option<string>; ··· 417 450 ): Promise<number> => { 418 451 await watchRun({ 419 452 defaults: defaultConfig, 420 - runId: command.runId, 453 + runId: fromOption(command.run), 454 + sinceTimeIso: fromOption(command.sinceTime), 421 455 raw: command.raw, 422 456 cwd: options.cwd, 423 457 homeDirectory: options.homeDirectory, ··· 570 604 runsDir: optionalTextOption("runs-dir"), 571 605 driver: optionalTextOption("driver"), 572 606 executor: optionalTextOption("executor"), 607 + metaJson: optionalTextOption("meta-json"), 573 608 }, 574 609 (command) => toCliEffect(runCommand(command, options, io)), 575 610 ).pipe(CliCommand.withDescription("Run a mill program.")); ··· 613 648 const watch = CliCommand.make( 614 649 "watch", 615 650 { 616 - runId: Args.text({ name: "runId" }), 651 + run: optionalTextOption("run"), 652 + sinceTime: optionalTextOption("since-time"), 617 653 json: Options.boolean("json"), 618 654 raw: Options.boolean("raw"), 619 655 runsDir: optionalTextOption("runs-dir"), 620 656 driver: optionalTextOption("driver"), 621 657 }, 622 658 (command) => toCliEffect(watchCommand(command, options, io)), 623 - ).pipe(CliCommand.withDescription("Stream run events.")); 659 + ).pipe( 660 + CliCommand.withDescription( 661 + "Stream run events. Use --run <runId> for scoped watch, omit for global watch.", 662 + ), 663 + ); 624 664 625 665 const inspect = CliCommand.make( 626 666 "inspect", ··· 693 733 run <program.ts> Run a mill program 694 734 status <runId> Show run state 695 735 wait <runId> --timeout <s> Wait for terminal state 696 - watch <runId> Stream run events 736 + watch [--run <runId>] Stream run events (global if --run omitted) 697 737 inspect <ref> Inspect run or spawn detail 698 738 cancel <runId> Cancel a running execution 699 739 ls List runs
+7 -5
packages/cli/src/public/index.e2e.test.ts
··· 38 38 submit: Schema.String, 39 39 status: Schema.String, 40 40 wait: Schema.String, 41 + watch: Schema.String, 41 42 }), 42 43 }), 43 44 ); ··· 157 158 const payload = Schema.decodeUnknownSync(DiscoveryEnvelope)(output); 158 159 expect(payload.discoveryVersion).toBe(1); 159 160 expect(payload.programApi.spawnRequired).toEqual(["agent", "systemPrompt", "prompt"]); 160 - expect((payload.drivers.pi?.models.length ?? 0) > 0).toBe(true); 161 + expect(Array.isArray(payload.drivers.pi?.models)).toBe(true); 161 162 expect(Array.isArray(payload.drivers.claude?.models)).toBe(true); 162 163 expect(Array.isArray(payload.drivers.codex?.models)).toBe(true); 163 164 expect(payload.executors.direct?.description).toBe("Local direct executor"); ··· 234 235 } finally { 235 236 await rm(tempDirectory, { recursive: true, force: true }); 236 237 } 237 - }); 238 + }, 20_000); 238 239 239 240 it("submits async run by default, then status/wait observes completion", async () => { 240 241 const tempDirectory = await mkdtemp(join(tmpdir(), "mill-cli-async-e2e-")); ··· 349 350 } finally { 350 351 await rm(tempDirectory, { recursive: true, force: true }); 351 352 } 352 - }); 353 + }, 20_000); 353 354 354 355 it("executes run --sync and wait --timeout returns terminal result", async () => { 355 356 const tempDirectory = await mkdtemp(join(tmpdir(), "mill-cli-e2e-")); ··· 442 443 } finally { 443 444 await rm(tempDirectory, { recursive: true, force: true }); 444 445 } 445 - }); 446 + }, 20_000); 446 447 447 448 it("runs inspect/session/cancel/watch matrix across concurrent runs", async () => { 448 449 const tempDirectory = await mkdtemp(join(tmpdir(), "mill-cli-matrix-e2e-")); ··· 561 562 "run", 562 563 "packages/cli/src/bin/mill.ts", 563 564 "watch", 565 + "--run", 564 566 completeRun.runId, 565 567 "--json", 566 568 "--runs-dir", ··· 643 645 } finally { 644 646 await rm(tempDirectory, { recursive: true, force: true }); 645 647 } 646 - }); 648 + }, 20_000); 647 649 648 650 it("wait timeout exits non-zero", async () => { 649 651 const tempDirectory = await mkdtemp(join(tmpdir(), "mill-cli-wait-timeout-e2e-"));
+7
packages/core/src/domain/run.schema.ts
··· 24 24 }); 25 25 export type RunPaths = Schema.Schema.Type<typeof RunPaths>; 26 26 27 + export const RunMetadata = Schema.Record({ 28 + key: Schema.String, 29 + value: Schema.String, 30 + }); 31 + export type RunMetadata = Schema.Schema.Type<typeof RunMetadata>; 32 + 27 33 export const RunRecord = Schema.Struct({ 28 34 id: RunId, 29 35 status: RunStatus, ··· 32 38 executor: Schema.NonEmptyString, 33 39 createdAt: Schema.String, 34 40 updatedAt: Schema.String, 41 + metadata: Schema.optional(RunMetadata), 35 42 paths: RunPaths, 36 43 }); 37 44 export type RunRecord = Schema.Schema.Type<typeof RunRecord>;
+77
packages/core/src/internal/engine.effect.ts
··· 33 33 publishRawEvent, 34 34 publishTier1Event, 35 35 watchRawLive, 36 + watchTier1GlobalLive, 36 37 watchTier1Live, 37 38 } from "./observer-hub.effect"; 38 39 ··· 52 53 export interface RunSubmitInput { 53 54 readonly runId: RunId; 54 55 readonly programPath: string; 56 + readonly metadata?: Readonly<Record<string, string>>; 55 57 } 56 58 57 59 export interface RunSyncInput extends RunSubmitInput { ··· 115 117 status?: RunSyncOutput["run"]["status"], 116 118 ) => Effect.Effect<ReadonlyArray<RunSyncOutput["run"]>, PersistenceError>; 117 119 readonly watch: (runId: RunId) => Stream.Stream<MillEvent, RunNotFoundError | PersistenceError>; 120 + readonly watchAll: (sinceTimeIso?: string) => Stream.Stream<MillEvent, PersistenceError>; 118 121 readonly watchRaw: (runId: RunId) => Stream.Stream<string, RunNotFoundError | PersistenceError>; 119 122 readonly inspect: ( 120 123 ref: InspectRef, ··· 377 380 const isRunTerminalStatus = (status: RunSyncOutput["run"]["status"]): boolean => 378 381 status === "complete" || status === "failed" || status === "cancelled"; 379 382 383 + const isSinceTimeIso = (value: string): boolean => { 384 + const parsed = Date.parse(value); 385 + 386 + if (Number.isNaN(parsed)) { 387 + return false; 388 + } 389 + 390 + return new Date(parsed).toISOString() === value; 391 + }; 392 + 393 + const isEventAtOrAfter = (event: MillEvent, sinceTimeIso: string | undefined): boolean => { 394 + if (sinceTimeIso === undefined) { 395 + return true; 396 + } 397 + 398 + return event.timestamp >= sinceTimeIso; 399 + }; 400 + 401 + const compareMillEvents = (left: MillEvent, right: MillEvent): number => { 402 + const byTime = left.timestamp.localeCompare(right.timestamp); 403 + 404 + if (byTime !== 0) { 405 + return byTime; 406 + } 407 + 408 + const byRun = left.runId.localeCompare(right.runId); 409 + 410 + if (byRun !== 0) { 411 + return byRun; 412 + } 413 + 414 + return left.sequence - right.sequence; 415 + }; 416 + 380 417 const waitForRunTerminal = ( 381 418 runStore: RunStore, 382 419 runId: RunId, ··· 385 422 RunNotFoundError | PersistenceError | LifecycleInvariantError 386 423 > => 387 424 Effect.gen(function* () { 425 + // Check if run is already terminal before entering polling loop 426 + const initialRun = yield* runStore.getRun(runId); 427 + 428 + if (isRunTerminalStatus(initialRun.status)) { 429 + return initialRun; 430 + } 431 + 388 432 let observedEvents = 0; 389 433 let terminalObserved = false; 390 434 let lifecycleState = initialLifecycleGuardState; ··· 516 560 executor: input.executorName, 517 561 timestamp: submittedAt, 518 562 status: "pending", 563 + metadata: submitInput.metadata, 519 564 }); 520 565 }), 521 566 ··· 539 584 executor: input.executorName, 540 585 timestamp: startedAt, 541 586 status: "running", 587 + metadata: runInput.metadata, 542 588 }); 543 589 } 544 590 ··· 946 992 Effect.map(runStore.readEvents(runId), (persistedEvents) => 947 993 Stream.concat(Stream.fromIterable(persistedEvents), watchTier1Live(runId)), 948 994 ), 995 + ), 996 + 997 + watchAll: (sinceTimeIso) => 998 + Stream.unwrapScoped( 999 + Effect.gen(function* () { 1000 + if (sinceTimeIso !== undefined && !isSinceTimeIso(sinceTimeIso)) { 1001 + return Stream.fail( 1002 + new PersistenceError({ 1003 + path: "watch.since-time", 1004 + message: `Invalid --since-time value '${sinceTimeIso}'. Expected ISO timestamp.`, 1005 + }), 1006 + ); 1007 + } 1008 + 1009 + const runs = yield* runStore.listRuns(); 1010 + const eventsByRun = yield* Effect.forEach(runs, (run) => runStore.readEvents(run.id), { 1011 + concurrency: "unbounded", 1012 + }); 1013 + 1014 + const persistedEvents = eventsByRun 1015 + .flat() 1016 + .filter((event) => isEventAtOrAfter(event, sinceTimeIso)) 1017 + .sort(compareMillEvents); 1018 + 1019 + const persistedStream = Stream.fromIterable(persistedEvents); 1020 + const liveStream = Stream.filter(watchTier1GlobalLive(), (event) => 1021 + isEventAtOrAfter(event, sinceTimeIso), 1022 + ); 1023 + 1024 + return Stream.concat(persistedStream, liveStream); 1025 + }), 949 1026 ), 950 1027 951 1028 watchRaw: (runId) =>
+24 -2
packages/core/src/internal/observer-hub.effect.ts
··· 3 3 4 4 const tier1PubSubByRun = new Map<string, PubSub.PubSub<MillEvent>>(); 5 5 const rawPubSubByRun = new Map<string, PubSub.PubSub<string>>(); 6 + let tier1GlobalPubSub: PubSub.PubSub<MillEvent> | undefined; 6 7 7 8 const ensureTier1PubSub = (runId: string): Effect.Effect<PubSub.PubSub<MillEvent>> => 8 9 Effect.suspend(() => { ··· 34 35 ); 35 36 }); 36 37 38 + const ensureTier1GlobalPubSub = (): Effect.Effect<PubSub.PubSub<MillEvent>> => 39 + Effect.suspend(() => { 40 + if (tier1GlobalPubSub !== undefined) { 41 + return Effect.succeed(tier1GlobalPubSub); 42 + } 43 + 44 + return Effect.tap(PubSub.unbounded<MillEvent>(), (pubSub) => 45 + Effect.sync(() => { 46 + tier1GlobalPubSub = pubSub; 47 + }), 48 + ); 49 + }); 50 + 37 51 export const publishTier1Event = (runId: string, event: MillEvent): Effect.Effect<void> => 38 - Effect.asVoid( 39 - Effect.flatMap(ensureTier1PubSub(runId), (pubSub) => PubSub.publish(pubSub, event)), 52 + Effect.zipRight( 53 + Effect.asVoid( 54 + Effect.flatMap(ensureTier1PubSub(runId), (pubSub) => PubSub.publish(pubSub, event)), 55 + ), 56 + Effect.asVoid( 57 + Effect.flatMap(ensureTier1GlobalPubSub(), (pubSub) => PubSub.publish(pubSub, event)), 58 + ), 40 59 ); 41 60 42 61 export const publishRawEvent = (runId: string, raw: string): Effect.Effect<void> => ··· 47 66 48 67 export const watchRawLive = (runId: string): Stream.Stream<string, never> => 49 68 Stream.unwrapScoped(Effect.map(ensureRawPubSub(runId), (pubSub) => Stream.fromPubSub(pubSub))); 69 + 70 + export const watchTier1GlobalLive = (): Stream.Stream<MillEvent, never> => 71 + Stream.unwrapScoped(Effect.map(ensureTier1GlobalPubSub(), (pubSub) => Stream.fromPubSub(pubSub)));
+2
packages/core/src/internal/run-store.effect.ts
··· 25 25 readonly executor?: string; 26 26 readonly timestamp: string; 27 27 readonly status?: RunRecord["status"]; 28 + readonly metadata?: Readonly<Record<string, string>>; 28 29 } 29 30 30 31 export interface RunStore { ··· 119 120 executor: createInput.executor ?? "direct", 120 121 createdAt: createInput.timestamp, 121 122 updatedAt: createInput.timestamp, 123 + metadata: createInput.metadata, 122 124 paths, 123 125 }; 124 126
+1
packages/core/src/public/discovery.api.test.ts
··· 83 83 submit: "mill run <program.ts> --json", 84 84 status: "mill status <runId> --json", 85 85 wait: "mill wait <runId> --timeout 30 --json", 86 + watch: "mill watch [--run <runId>] [--since-time <ISO-8601>] --json", 86 87 }); 87 88 }); 88 89
+1
packages/core/src/public/discovery.api.ts
··· 63 63 submit: "mill run <program.ts> --json", 64 64 status: "mill status <runId> --json", 65 65 wait: "mill wait <runId> --timeout 30 --json", 66 + watch: "mill watch [--run <runId>] [--since-time <ISO-8601>] --json", 66 67 }, 67 68 }; 68 69 };
+50 -4
packages/core/src/public/run.api.ts
··· 31 31 export interface SubmitRunInput extends BaseRunInput { 32 32 readonly programPath: string; 33 33 readonly launchWorker: (input: LaunchWorkerInput) => Promise<void>; 34 + readonly metadata?: Readonly<Record<string, string>>; 34 35 } 35 36 36 37 export interface RunProgramSyncInput extends SubmitRunInput { ··· 50 51 readonly timeoutSeconds: number; 51 52 } 52 53 53 - export interface WatchRunInput extends GetRunStatusInput { 54 + export interface WatchRunInput extends Omit< 55 + GetRunStatusInput, 56 + "runId" | "pathExists" | "loadConfigModule" 57 + > { 58 + readonly runId?: string; 54 59 readonly raw?: boolean; 60 + readonly sinceTimeIso?: string; 61 + readonly pathExists?: (path: string) => Promise<boolean>; 62 + readonly loadConfigModule?: (path: string) => Promise<unknown>; 55 63 readonly onEvent: (line: string) => void; 56 64 } 57 65 ··· 214 222 const isRunTerminalEvent = (eventType: string): boolean => 215 223 eventType === "run:complete" || eventType === "run:failed" || eventType === "run:cancelled"; 216 224 225 + const isSinceTimeIso = (value: string): boolean => { 226 + const parsed = Date.parse(value); 227 + 228 + if (Number.isNaN(parsed)) { 229 + return false; 230 + } 231 + 232 + return new Date(parsed).toISOString() === value; 233 + }; 234 + 217 235 export interface InspectSessionOutput extends DriverSessionPointer { 218 236 readonly runId: string; 219 237 readonly spawnId: string; ··· 230 248 engineContext.engine.submit({ 231 249 runId, 232 250 programPath, 251 + metadata: input.metadata, 233 252 }), 234 253 ); 235 254 ··· 344 363 }; 345 364 346 365 export const watchRun = async (input: WatchRunInput): Promise<void> => { 366 + if (input.sinceTimeIso !== undefined && !isSinceTimeIso(input.sinceTimeIso)) { 367 + return Promise.reject( 368 + new Error(`Invalid --since-time value '${input.sinceTimeIso}'. Expected ISO timestamp.`), 369 + ); 370 + } 371 + 347 372 const engineContext = await makeEngineForConfig(input); 373 + 374 + if (input.runId === undefined) { 375 + if (input.raw === true) { 376 + return Promise.reject(new Error("watch --raw requires a runId.")); 377 + } 378 + 379 + await runWithBunContext( 380 + Effect.scoped( 381 + Stream.runForEach(engineContext.engine.watchAll(input.sinceTimeIso), (event) => 382 + Effect.sync(() => { 383 + input.onEvent(JSON.stringify(event)); 384 + }), 385 + ), 386 + ), 387 + ); 388 + 389 + return; 390 + } 391 + 348 392 const runId = decodeRunIdSync(input.runId); 349 393 350 394 if (input.raw === true) { ··· 364 408 return; 365 409 } 366 410 411 + const watchStream = Stream.filter(engineContext.engine.watch(runId), (event) => 412 + input.sinceTimeIso === undefined ? true : event.timestamp >= input.sinceTimeIso, 413 + ); 414 + 367 415 await runWithBunContext( 368 416 Effect.scoped( 369 417 Stream.runForEach( 370 - Stream.takeUntil(engineContext.engine.watch(runId), (event) => 371 - isRunTerminalEvent(event.type), 372 - ), 418 + Stream.takeUntil(watchStream, (event) => isRunTerminalEvent(event.type)), 373 419 (event) => 374 420 Effect.sync(() => { 375 421 input.onEvent(JSON.stringify(event));
+1
packages/core/src/public/types.ts
··· 157 157 readonly submit: string; 158 158 readonly status: string; 159 159 readonly wait: string; 160 + readonly watch: string; 160 161 }; 161 162 } 162 163
+1 -4
packages/driver-claude/src/internal/process-driver.effect.ts
··· 20 20 return parts[parts.length - 1] ?? model; 21 21 }; 22 22 23 - const commandForSpawn = ( 24 - config: DriverProcessConfig, 25 - input: DriverSpawnInput, 26 - ): Command.Command => { 23 + const commandForSpawn = (config: DriverProcessConfig, input: DriverSpawnInput): Command.Command => { 27 24 const command = Command.make( 28 25 config.command, 29 26 ...config.args,
+5 -1
packages/driver-codex/src/internal/codex.codec.ts
··· 148 148 continue; 149 149 } 150 150 151 - if (eventType === "item.started" || eventType === "item.completed" || eventType === "item.updated") { 151 + if ( 152 + eventType === "item.started" || 153 + eventType === "item.completed" || 154 + eventType === "item.updated" 155 + ) { 152 156 const item = decoded.item; 153 157 154 158 if (!isRecord(item)) {
+7 -7
packages/pi-mill/README.md
··· 88 88 }; 89 89 ``` 90 90 91 - | Option | Description | 92 - |---|---| 93 - | `maxDepth` | Subagent nesting limit. `1` = agents can spawn subagents, but those subagents cannot spawn their own. `0` = disabled. | 94 - | `millCommand` | Executable name or path for mill. | 95 - | `millArgs` | Extra args prepended to every mill invocation. | 96 - | `millRunsDir` | Override for `--runs-dir`. | 97 - | `prompt` | Additional guidance appended to the tool description (model selection hints, project conventions, etc). | 91 + | Option | Description | 92 + | ------------- | --------------------------------------------------------------------------------------------------------------------- | 93 + | `maxDepth` | Subagent nesting limit. `1` = agents can spawn subagents, but those subagents cannot spawn their own. `0` = disabled. | 94 + | `millCommand` | Executable name or path for mill. | 95 + | `millArgs` | Extra args prepended to every mill invocation. | 96 + | `millRunsDir` | Override for `--runs-dir`. | 97 + | `prompt` | Additional guidance appended to the tool description (model selection hints, project conventions, etc). | 98 98 99 99 ## Context flow 100 100
+2 -3
packages/pi-mill/index.ts
··· 81 81 prompt: string; 82 82 } 83 83 84 - 85 - 86 84 function readEnabledModelsFallback(): string[] { 87 85 try { 88 86 const p = path.join(os.homedir(), ".pi", "agent", "settings.json"); ··· 283 281 // Model discovery is deferred to avoid a boot cycle: 284 282 // pi → mill discovery → pi --list-models → pi (with extensions) → mill discovery → … 285 283 const enabledModels = readEnabledModelsFallback(); 286 - const modelsText = enabledModels.length > 0 ? enabledModels.join(", ") : "(use mill discovery to list)"; 284 + const modelsText = 285 + enabledModels.length > 0 ? enabledModels.join(", ") : "(use mill discovery to list)"; 287 286 288 287 // Keep a reference to the current context for widget/notification updates 289 288 let currentCtx: ExtensionContext | undefined;
+10 -10
packages/pi-mill/package.json
··· 3 3 "version": "0.1.1", 4 4 "description": "Pi extension package that routes subagent execution through mill", 5 5 "keywords": [ 6 - "pi-package", 6 + "extension", 7 + "mill", 7 8 "pi", 8 - "extension", 9 - "mill" 9 + "pi-package" 10 10 ], 11 - "license": "MIT", 12 - "type": "module", 13 11 "homepage": "https://github.com/laulauland/mill/tree/main/packages/pi-mill", 12 + "bugs": { 13 + "url": "https://github.com/laulauland/mill/issues" 14 + }, 15 + "license": "MIT", 14 16 "repository": { 15 17 "type": "git", 16 18 "url": "git+https://github.com/laulauland/mill.git", 17 19 "directory": "packages/pi-mill" 18 - }, 19 - "bugs": { 20 - "url": "https://github.com/laulauland/mill/issues" 21 20 }, 22 21 "files": [ 23 22 "*.ts", 23 + ".pi-skills", 24 24 "executors", 25 - ".pi-skills", 25 + "package.json", 26 26 "README.md", 27 - "package.json", 28 27 "scripts" 29 28 ], 29 + "type": "module", 30 30 "scripts": { 31 31 "clean": "rm -rf .pi-skills", 32 32 "build": "echo 'nothing to build'",
+69 -66
packages/pi-mill/runtime.ts
··· 146 146 }; 147 147 } 148 148 149 - interface MillWatchEvent { 150 - type?: string; 151 - payload?: { 152 - message?: string; 153 - toolName?: string; 154 - }; 155 - } 156 - 157 149 interface CommandCapture { 158 150 code: number; 159 151 stdout: string; ··· 190 182 } catch { 191 183 continue; 192 184 } 193 - } 194 - 195 - return undefined; 196 - }; 197 - 198 - const parseJsonObjectFromLine = (line: string): Record<string, unknown> | undefined => { 199 - try { 200 - const parsed = JSON.parse(line) as unknown; 201 - if (typeof parsed === "object" && parsed !== null) { 202 - return parsed as Record<string, unknown>; 203 - } 204 - } catch { 205 - // ignore 206 185 } 207 186 208 187 return undefined; ··· 410 389 return undefined; 411 390 }; 412 391 392 + const extractRunStatus = (payload: Record<string, unknown>): string | undefined => { 393 + const direct = payload.status; 394 + if (typeof direct === "string" && direct.length > 0) { 395 + return direct; 396 + } 397 + 398 + const nestedRun = payload.run; 399 + if (typeof nestedRun === "object" && nestedRun !== null) { 400 + const nestedStatus = (nestedRun as { status?: unknown }).status; 401 + if (typeof nestedStatus === "string" && nestedStatus.length > 0) { 402 + return nestedStatus; 403 + } 404 + } 405 + 406 + return undefined; 407 + }; 408 + 413 409 export function spawnSubagent(input: SpawnInput): Promise<ExecutionResult> { 414 410 return runSubagentProcess(input); 415 411 } ··· 485 481 }; 486 482 487 483 try { 488 - const submitArgs = [...input.millArgs, "run", tempProgram.filePath, "--json"]; 484 + const metadata = JSON.stringify({ 485 + source: "pi-mill", 486 + parentRunId: input.runId, 487 + parentTaskId: input.taskId, 488 + parentAgent: input.agent, 489 + }); 490 + 491 + const submitArgs = [ 492 + ...input.millArgs, 493 + "run", 494 + tempProgram.filePath, 495 + "--json", 496 + "--meta-json", 497 + metadata, 498 + ]; 489 499 if (input.millRunsDir && input.millRunsDir.trim().length > 0) { 490 500 submitArgs.push("--runs-dir", input.millRunsDir); 491 501 } ··· 500 510 appendCommandLog(stdoutPath, input.millCommand, submitArgs, submitted); 501 511 502 512 if (submitted.aborted || aborted) { 503 - throw new FactoryError({ code: "CANCELLED", message: "Subagent aborted.", recoverable: true }); 513 + throw new FactoryError({ 514 + code: "CANCELLED", 515 + message: "Subagent aborted.", 516 + recoverable: true, 517 + }); 504 518 } 505 519 506 520 if (submitted.code !== 0) { ··· 539 553 childRunId: submittedRunId, 540 554 }); 541 555 542 - let terminalEvent: string | undefined; 543 - const watchArgs = [...input.millArgs, "watch", submittedRunId, "--json"]; 556 + const waitArgs = [...input.millArgs, "wait", submittedRunId, "--timeout", "31536000", "--json"]; 544 557 if (input.millRunsDir && input.millRunsDir.trim().length > 0) { 545 - watchArgs.push("--runs-dir", input.millRunsDir); 558 + waitArgs.push("--runs-dir", input.millRunsDir); 546 559 } 547 560 548 - const watched = await runCommandCapture({ 561 + const waited = await runCommandCapture({ 549 562 command: input.millCommand, 550 - args: watchArgs, 563 + args: waitArgs, 551 564 cwd: input.cwd, 552 565 env: process.env, 553 566 signal: input.signal, 554 - onLine: (line, stream) => { 555 - const event = parseJsonObjectFromLine(line) as MillWatchEvent | undefined; 556 - if (!event || typeof event.type !== "string") { 557 - if (stream === "stderr") { 558 - input.obs.push(input.runId, "warning", `watch:${input.taskId}`, { line }); 559 - } 560 - return; 561 - } 562 - 563 - if (event.type === "spawn:milestone") { 564 - input.obs.push(input.runId, "info", `spawn:milestone:${input.taskId}`, { 565 - message: event.payload?.message, 566 - }); 567 - } 568 - 569 - if (event.type === "spawn:tool_call") { 570 - input.obs.push(input.runId, "info", `spawn:tool_call:${input.taskId}`, { 571 - toolName: event.payload?.toolName, 572 - }); 573 - } 574 - 575 - if ( 576 - event.type === "run:complete" || 577 - event.type === "run:failed" || 578 - event.type === "run:cancelled" 579 - ) { 580 - terminalEvent = event.type; 581 - } 582 - }, 583 567 }); 584 - appendCommandLog(stdoutPath, input.millCommand, watchArgs, watched); 568 + appendCommandLog(stdoutPath, input.millCommand, waitArgs, waited); 585 569 586 - if (watched.aborted || aborted) { 570 + if (waited.aborted || aborted) { 587 571 await requestRunCancel(); 588 - throw new FactoryError({ code: "CANCELLED", message: "Subagent aborted.", recoverable: true }); 572 + throw new FactoryError({ 573 + code: "CANCELLED", 574 + message: "Subagent aborted.", 575 + recoverable: true, 576 + }); 589 577 } 590 578 591 - if (watched.code !== 0) { 579 + if (waited.code !== 0) { 592 580 throw new FactoryError({ 593 581 code: "RUNTIME", 594 582 message: 595 - watched.combined.trim().length > 0 596 - ? `mill watch failed:\n${watched.combined.trim()}` 597 - : "mill watch failed.", 583 + waited.combined.trim().length > 0 584 + ? `mill wait failed:\n${waited.combined.trim()}` 585 + : "mill wait failed.", 598 586 recoverable: false, 599 587 }); 600 588 } 601 589 602 - if (terminalEvent === "run:cancelled") { 590 + const waitPayload = parseJsonObjectFromText([waited.stdout, waited.stderr].join("\n")); 591 + const terminalStatus = waitPayload ? extractRunStatus(waitPayload) : undefined; 592 + 593 + if (terminalStatus === "cancelled") { 603 594 throw new FactoryError({ 604 595 code: "CANCELLED", 605 596 message: "Subagent run was cancelled.", 606 597 recoverable: true, 598 + }); 599 + } 600 + 601 + if (terminalStatus === "failed") { 602 + throw new FactoryError({ 603 + code: "RUNTIME", 604 + message: "Subagent run failed before inspect.", 605 + recoverable: false, 607 606 }); 608 607 } 609 608 ··· 623 622 624 623 if (inspected.aborted || aborted) { 625 624 await requestRunCancel(); 626 - throw new FactoryError({ code: "CANCELLED", message: "Subagent aborted.", recoverable: true }); 625 + throw new FactoryError({ 626 + code: "CANCELLED", 627 + message: "Subagent aborted.", 628 + recoverable: true, 629 + }); 627 630 } 628 631 629 632 if (inspected.code !== 0) {
+6 -6
packages/pi-mill/scripts/migrate-factory-to-mill.mjs
··· 124 124 process.exit(1); 125 125 } 126 126 127 - const sessionEntries = fs.readdirSync(sessionsDir).filter((entry) => 128 - isDirectory(path.join(sessionsDir, entry)), 129 - ); 127 + const sessionEntries = fs 128 + .readdirSync(sessionsDir) 129 + .filter((entry) => isDirectory(path.join(sessionsDir, entry))); 130 130 131 131 for (const sessionName of sessionEntries) { 132 132 stat.sessionsScanned += 1; ··· 140 140 continue; 141 141 } 142 142 143 - const runEntries = fs.readdirSync(sourceRoot).filter((entry) => 144 - isDirectory(path.join(sourceRoot, entry)), 145 - ); 143 + const runEntries = fs 144 + .readdirSync(sourceRoot) 145 + .filter((entry) => isDirectory(path.join(sourceRoot, entry))); 146 146 147 147 if (runEntries.length === 0) { 148 148 continue;
+1
packages/skills/README.md
··· 3 3 Shared skills package for mill orchestration guidance. 4 4 5 5 Primary skill: 6 + 6 7 - `mill/` — reusable for pi, Claude Code, and Codex workflows.
+6 -6
packages/skills/package.json
··· 3 3 "version": "0.1.0", 4 4 "description": "Shared skill bundle for pi-mill", 5 5 "keywords": [ 6 - "pi-package", 6 + "mill", 7 7 "pi", 8 - "skills", 9 - "mill" 8 + "pi-package", 9 + "skills" 10 10 ], 11 11 "license": "MIT", 12 - "type": "module", 13 12 "files": [ 13 + "package.json", 14 14 "pi-mill", 15 - "README.md", 16 - "package.json" 15 + "README.md" 17 16 ], 17 + "type": "module", 18 18 "pi": { 19 19 "skills": [ 20 20 "./mill"