personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add CLI subprocess runner infrastructure for provider tool agents

Phase 1 of replacing SDK-based run_tools with CLI subprocess wrappers
(claude, codex, gemini). Adds shared infrastructure:

- think/providers/cli.py: CLIRunner (async subprocess with JSONL
translation), ThinkingAggregator (buffers text between tool calls),
assemble_prompt (combines config into stdin-pipeable prompt),
lookup_cli_session_id (enables resume via provider session IDs)
- Extends all Event TypedDicts with optional `raw` field to preserve
original provider JSON events
- Adds cli_session_id and usage fields to FinishEvent

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+570 -15
+191
tests/test_cli_provider.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Tests for think.providers.cli — CLI subprocess runner infrastructure.""" 5 + 6 + import json 7 + 8 + import pytest 9 + 10 + from think.providers.cli import ThinkingAggregator, assemble_prompt, lookup_cli_session_id 11 + from think.providers.shared import JSONEventCallback 12 + 13 + 14 + # --------------------------------------------------------------------------- 15 + # assemble_prompt 16 + # --------------------------------------------------------------------------- 17 + 18 + 19 + class TestAssemblePrompt: 20 + def test_all_fields(self): 21 + config = { 22 + "transcript": "Speaker A: hello", 23 + "extra_context": "Today is Monday", 24 + "user_instruction": "Summarize the transcript", 25 + "prompt": "What happened?", 26 + "system_instruction": "You are a helpful assistant", 27 + } 28 + body, system = assemble_prompt(config) 29 + assert "Speaker A: hello" in body 30 + assert "Today is Monday" in body 31 + assert "Summarize the transcript" in body 32 + assert "What happened?" in body 33 + assert system == "You are a helpful assistant" 34 + # Parts joined with double newlines 35 + assert body.count("\n\n") == 3 36 + 37 + def test_prompt_only(self): 38 + config = {"prompt": "hello"} 39 + body, system = assemble_prompt(config) 40 + assert body == "hello" 41 + assert system is None 42 + 43 + def test_empty_config(self): 44 + body, system = assemble_prompt({}) 45 + assert body == "" 46 + assert system is None 47 + 48 + def test_skips_empty_values(self): 49 + config = { 50 + "transcript": "", 51 + "extra_context": None, 52 + "user_instruction": "Do something", 53 + "prompt": "Go", 54 + } 55 + body, system = assemble_prompt(config) 56 + assert body == "Do something\n\nGo" 57 + assert system is None 58 + 59 + def test_system_instruction_empty_string(self): 60 + config = {"prompt": "test", "system_instruction": ""} 61 + _, system = assemble_prompt(config) 62 + assert system is None 63 + 64 + 65 + # --------------------------------------------------------------------------- 66 + # ThinkingAggregator 67 + # --------------------------------------------------------------------------- 68 + 69 + 70 + class TestThinkingAggregator: 71 + def _make_aggregator(self): 72 + """Create aggregator with event capture.""" 73 + events = [] 74 + cb = JSONEventCallback(events.append) 75 + agg = ThinkingAggregator(cb, model="test-model") 76 + return agg, events 77 + 78 + def test_accumulate_and_flush_as_thinking(self): 79 + agg, events = self._make_aggregator() 80 + agg.accumulate("hello ") 81 + agg.accumulate("world") 82 + agg.flush_as_thinking(raw_events=[{"type": "message"}]) 83 + 84 + assert len(events) == 1 85 + assert events[0]["event"] == "thinking" 86 + assert events[0]["summary"] == "hello world" 87 + assert events[0]["model"] == "test-model" 88 + assert events[0]["raw"] == [{"type": "message"}] 89 + 90 + def test_flush_thinking_empty_buffer_is_noop(self): 91 + agg, events = self._make_aggregator() 92 + agg.flush_as_thinking() 93 + assert len(events) == 0 94 + 95 + def test_flush_thinking_whitespace_only_is_noop(self): 96 + agg, events = self._make_aggregator() 97 + agg.accumulate(" ") 98 + agg.flush_as_thinking() 99 + assert len(events) == 0 100 + 101 + def test_flush_as_result(self): 102 + agg, events = self._make_aggregator() 103 + agg.accumulate("final answer") 104 + result = agg.flush_as_result() 105 + assert result == "final answer" 106 + # No events emitted for result flush 107 + assert len(events) == 0 108 + # Buffer is cleared 109 + assert agg.flush_as_result() == "" 110 + 111 + def test_multiple_thinking_flushes(self): 112 + """Simulate text -> tool -> text -> tool -> text pattern.""" 113 + agg, events = self._make_aggregator() 114 + 115 + # First text chunk (before first tool call) 116 + agg.accumulate("Let me check...") 117 + agg.flush_as_thinking() 118 + 119 + # Second text chunk (between tool calls) 120 + agg.accumulate("Now let me verify...") 121 + agg.flush_as_thinking() 122 + 123 + # Final text (the result) 124 + agg.accumulate("The answer is 42") 125 + result = agg.flush_as_result() 126 + 127 + assert len(events) == 2 128 + assert events[0]["summary"] == "Let me check..." 129 + assert events[1]["summary"] == "Now let me verify..." 130 + assert result == "The answer is 42" 131 + 132 + def test_has_content(self): 133 + agg, _ = self._make_aggregator() 134 + assert not agg.has_content 135 + agg.accumulate("x") 136 + assert agg.has_content 137 + agg.flush_as_result() 138 + assert not agg.has_content 139 + 140 + def test_no_raw_events(self): 141 + agg, events = self._make_aggregator() 142 + agg.accumulate("thinking") 143 + agg.flush_as_thinking() 144 + assert "raw" not in events[0] 145 + 146 + def test_strips_whitespace(self): 147 + agg, events = self._make_aggregator() 148 + agg.accumulate(" padded ") 149 + agg.flush_as_thinking() 150 + assert events[0]["summary"] == "padded" 151 + 152 + 153 + # --------------------------------------------------------------------------- 154 + # lookup_cli_session_id 155 + # --------------------------------------------------------------------------- 156 + 157 + 158 + class TestLookupCliSessionId: 159 + def test_finds_session_id(self, tmp_path, monkeypatch): 160 + monkeypatch.setenv("JOURNAL_PATH", str(tmp_path)) 161 + agents_dir = tmp_path / "agents" 162 + agents_dir.mkdir() 163 + 164 + events = [ 165 + {"event": "start", "ts": 1000, "prompt": "hi", "name": "test", "model": "m", "provider": "p"}, 166 + {"event": "finish", "ts": 2000, "result": "done", "cli_session_id": "abc-123"}, 167 + ] 168 + log_file = agents_dir / "agent42.jsonl" 169 + log_file.write_text("\n".join(json.dumps(e) for e in events) + "\n") 170 + 171 + result = lookup_cli_session_id("agent42") 172 + assert result == "abc-123" 173 + 174 + def test_returns_none_when_no_session_id(self, tmp_path, monkeypatch): 175 + monkeypatch.setenv("JOURNAL_PATH", str(tmp_path)) 176 + agents_dir = tmp_path / "agents" 177 + agents_dir.mkdir() 178 + 179 + events = [ 180 + {"event": "start", "ts": 1000, "prompt": "hi", "name": "test", "model": "m", "provider": "p"}, 181 + {"event": "finish", "ts": 2000, "result": "done"}, 182 + ] 183 + log_file = agents_dir / "agent42.jsonl" 184 + log_file.write_text("\n".join(json.dumps(e) for e in events) + "\n") 185 + 186 + result = lookup_cli_session_id("agent42") 187 + assert result is None 188 + 189 + def test_returns_none_when_not_found(self): 190 + result = lookup_cli_session_id("nonexistent-agent-id") 191 + assert result is None
+355
think/providers/cli.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """CLI subprocess runner for AI provider tool agents. 5 + 6 + Spawns provider CLI tools (claude, codex, gemini) in JSON streaming mode 7 + and translates their JSONL output into our standard Event format. 8 + 9 + Each provider module implements a translate() function that converts 10 + provider-specific JSONL events into our Event TypedDicts. The CLIRunner 11 + handles subprocess lifecycle, stdin piping, and event emission. 12 + """ 13 + 14 + from __future__ import annotations 15 + 16 + import asyncio 17 + import json 18 + import logging 19 + import shutil 20 + from pathlib import Path 21 + from typing import Any, Callable, Optional 22 + 23 + from think.providers.shared import JSONEventCallback 24 + from think.utils import now_ms 25 + 26 + LOG = logging.getLogger("think.providers.cli") 27 + 28 + _PROJECT_ROOT = Path(__file__).parent.parent.parent 29 + 30 + 31 + # --------------------------------------------------------------------------- 32 + # Prompt Assembly 33 + # --------------------------------------------------------------------------- 34 + 35 + 36 + def assemble_prompt(config: dict[str, Any]) -> tuple[str, str | None]: 37 + """Combine config fields into a single prompt string and system instruction. 38 + 39 + Joins transcript, extra_context, user_instruction, and prompt with 40 + double newlines. Returns the system_instruction separately for CLIs 41 + that support --system-prompt (Claude); callers for other CLIs should 42 + prepend it to the prompt body. 43 + 44 + Args: 45 + config: Agent config dict with prompt, transcript, etc. 46 + 47 + Returns: 48 + Tuple of (prompt_body, system_instruction). 49 + system_instruction may be None. 50 + """ 51 + parts = [] 52 + for key in ("transcript", "extra_context", "user_instruction", "prompt"): 53 + value = config.get(key) 54 + if value: 55 + parts.append(value) 56 + 57 + prompt_body = "\n\n".join(parts) if parts else "" 58 + system_instruction = config.get("system_instruction") or None 59 + return prompt_body, system_instruction 60 + 61 + 62 + # --------------------------------------------------------------------------- 63 + # Session ID Lookup 64 + # --------------------------------------------------------------------------- 65 + 66 + 67 + def lookup_cli_session_id(agent_id: str) -> str | None: 68 + """Look up the CLI session ID from a previous agent's event log. 69 + 70 + Scans the agent's JSONL events for a finish event containing 71 + a cli_session_id field. 72 + 73 + Args: 74 + agent_id: Previous agent ID to look up. 75 + 76 + Returns: 77 + The CLI session/thread ID, or None if not found. 78 + """ 79 + try: 80 + from think.cortex_client import read_agent_events 81 + 82 + events = read_agent_events(agent_id) 83 + except (FileNotFoundError, Exception): 84 + LOG.warning("Cannot look up cli_session_id from %s", agent_id) 85 + return None 86 + 87 + for event in events: 88 + if event.get("event") == "finish" and event.get("cli_session_id"): 89 + return event["cli_session_id"] 90 + 91 + return None 92 + 93 + 94 + # --------------------------------------------------------------------------- 95 + # Thinking Aggregator 96 + # --------------------------------------------------------------------------- 97 + 98 + 99 + class ThinkingAggregator: 100 + """Buffers assistant text between tool calls for thinking/result classification. 101 + 102 + All assistant text that arrives between tool calls is treated as "thinking". 103 + Only the final text after all tool activity completes is the "result". 104 + 105 + Usage: 106 + agg = ThinkingAggregator(callback, model) 107 + # As text arrives: 108 + agg.accumulate("some text") 109 + # When a tool_start arrives, flush buffered text as thinking: 110 + agg.flush_as_thinking(raw_events=[...]) 111 + # When done (no more tool calls), get the final result: 112 + result = agg.flush_as_result() 113 + """ 114 + 115 + def __init__( 116 + self, 117 + callback: JSONEventCallback, 118 + model: str | None = None, 119 + ) -> None: 120 + self._buffer: list[str] = [] 121 + self._callback = callback 122 + self._model = model 123 + 124 + def accumulate(self, text: str) -> None: 125 + """Add text to the buffer.""" 126 + if text: 127 + self._buffer.append(text) 128 + 129 + def flush_as_thinking( 130 + self, raw_events: list[dict[str, Any]] | None = None 131 + ) -> None: 132 + """Emit buffered text as a thinking event and clear the buffer. 133 + 134 + Does nothing if the buffer is empty. 135 + """ 136 + text = "".join(self._buffer).strip() 137 + self._buffer.clear() 138 + if not text: 139 + return 140 + 141 + event: dict[str, Any] = { 142 + "event": "thinking", 143 + "summary": text, 144 + "ts": now_ms(), 145 + } 146 + if self._model: 147 + event["model"] = self._model 148 + if raw_events: 149 + event["raw"] = raw_events 150 + self._callback.emit(event) 151 + 152 + def flush_as_result(self) -> str: 153 + """Return buffered text as the final result and clear the buffer.""" 154 + text = "".join(self._buffer).strip() 155 + self._buffer.clear() 156 + return text 157 + 158 + @property 159 + def has_content(self) -> bool: 160 + """Whether the buffer has any content.""" 161 + return bool(self._buffer) 162 + 163 + 164 + # --------------------------------------------------------------------------- 165 + # CLI Runner 166 + # --------------------------------------------------------------------------- 167 + 168 + 169 + class CLIRunner: 170 + """Spawn a CLI subprocess and translate its JSONL output to our events. 171 + 172 + The runner pipes a prompt to stdin, reads JSONL from stdout line by line, 173 + and calls a provider-specific translate function for each line. 174 + 175 + Args: 176 + cmd: Command to run (e.g., ["claude", "-p", "-", ...]). 177 + prompt_text: Text to pipe to stdin. 178 + translate: Function that receives (raw_event_dict, aggregator, callback) 179 + and emits our Event types. Must return the cli_session_id from the 180 + init event (or None for non-init events). 181 + callback: JSONEventCallback for emitting events. 182 + aggregator: ThinkingAggregator for text buffering. 183 + cwd: Working directory for the subprocess. Defaults to project root. 184 + env: Optional environment overrides (merged with os.environ). 185 + timeout: Subprocess timeout in seconds. Default 600. 186 + """ 187 + 188 + def __init__( 189 + self, 190 + cmd: list[str], 191 + prompt_text: str, 192 + translate: Callable[ 193 + [dict[str, Any], ThinkingAggregator, JSONEventCallback], 194 + str | None, 195 + ], 196 + callback: JSONEventCallback, 197 + aggregator: ThinkingAggregator, 198 + cwd: Path | None = None, 199 + env: dict[str, str] | None = None, 200 + timeout: int = 600, 201 + ) -> None: 202 + self.cmd = cmd 203 + self.prompt_text = prompt_text 204 + self.translate = translate 205 + self.callback = callback 206 + self.aggregator = aggregator 207 + self.cwd = cwd or _PROJECT_ROOT 208 + self.env = env 209 + self.timeout = timeout 210 + self.cli_session_id: str | None = None 211 + 212 + async def run(self) -> str: 213 + """Spawn the CLI process, stream events, and return the final result. 214 + 215 + Returns: 216 + The final result text from the agent. 217 + 218 + Raises: 219 + RuntimeError: If the CLI binary is not found or process fails. 220 + """ 221 + binary = self.cmd[0] 222 + if not shutil.which(binary): 223 + raise RuntimeError( 224 + f"CLI tool '{binary}' not found. " 225 + f"Install it and ensure it's on PATH." 226 + ) 227 + 228 + import os 229 + 230 + proc_env = os.environ.copy() 231 + if self.env: 232 + proc_env.update(self.env) 233 + 234 + LOG.info("Spawning CLI: %s (cwd=%s)", " ".join(self.cmd), self.cwd) 235 + 236 + process = await asyncio.create_subprocess_exec( 237 + *self.cmd, 238 + stdin=asyncio.subprocess.PIPE, 239 + stdout=asyncio.subprocess.PIPE, 240 + stderr=asyncio.subprocess.PIPE, 241 + cwd=str(self.cwd), 242 + env=proc_env, 243 + ) 244 + 245 + # Pipe prompt to stdin and close 246 + if process.stdin: 247 + process.stdin.write(self.prompt_text.encode("utf-8")) 248 + process.stdin.close() 249 + 250 + # Read stdout line by line, translate each JSONL event 251 + stderr_lines: list[str] = [] 252 + 253 + async def _read_stderr() -> None: 254 + if not process.stderr: 255 + return 256 + async for raw_line in process.stderr: 257 + line = raw_line.decode("utf-8", errors="replace").rstrip() 258 + if line: 259 + stderr_lines.append(line) 260 + LOG.debug("[%s stderr] %s", binary, line) 261 + 262 + stderr_task = asyncio.create_task(_read_stderr()) 263 + 264 + try: 265 + await asyncio.wait_for( 266 + self._process_stdout(process), 267 + timeout=self.timeout, 268 + ) 269 + except asyncio.TimeoutError: 270 + LOG.error("CLI process timed out after %ds, killing", self.timeout) 271 + process.kill() 272 + self.callback.emit( 273 + { 274 + "event": "error", 275 + "error": f"CLI process timed out after {self.timeout}s", 276 + "ts": now_ms(), 277 + } 278 + ) 279 + raise RuntimeError(f"CLI process timed out after {self.timeout}s") 280 + finally: 281 + # Wait for stderr reader to finish 282 + await stderr_task 283 + 284 + # Wait for process to exit 285 + return_code = await process.wait() 286 + 287 + if return_code != 0: 288 + stderr_text = "\n".join(stderr_lines[-20:]) # Last 20 lines 289 + LOG.error( 290 + "CLI process exited with code %d: %s", return_code, stderr_text 291 + ) 292 + 293 + # Get final result from aggregator 294 + result = self.aggregator.flush_as_result() 295 + return result or "Done." 296 + 297 + async def _process_stdout(self, process: asyncio.subprocess.Process) -> None: 298 + """Read and translate JSONL lines from stdout.""" 299 + if not process.stdout: 300 + return 301 + 302 + async for raw_line in process.stdout: 303 + line = raw_line.decode("utf-8", errors="replace").strip() 304 + if not line: 305 + continue 306 + 307 + try: 308 + event_data = json.loads(line) 309 + except json.JSONDecodeError: 310 + LOG.warning("Non-JSON stdout line: %s", line[:200]) 311 + continue 312 + 313 + try: 314 + session_id = self.translate( 315 + event_data, self.aggregator, self.callback 316 + ) 317 + if session_id: 318 + self.cli_session_id = session_id 319 + except Exception: 320 + LOG.exception("Error translating CLI event: %s", line[:200]) 321 + 322 + 323 + # --------------------------------------------------------------------------- 324 + # CLI Binary Check 325 + # --------------------------------------------------------------------------- 326 + 327 + 328 + def check_cli_binary(name: str) -> str: 329 + """Check that a CLI binary is available on PATH. 330 + 331 + Args: 332 + name: Binary name (e.g., "claude", "codex", "gemini"). 333 + 334 + Returns: 335 + The full path to the binary. 336 + 337 + Raises: 338 + RuntimeError: If the binary is not found. 339 + """ 340 + path = shutil.which(name) 341 + if not path: 342 + raise RuntimeError( 343 + f"CLI tool '{name}' not found on PATH. " 344 + f"Install it and ensure it's accessible." 345 + ) 346 + return path 347 + 348 + 349 + __all__ = [ 350 + "CLIRunner", 351 + "ThinkingAggregator", 352 + "assemble_prompt", 353 + "check_cli_binary", 354 + "lookup_cli_session_id", 355 + ]
+24 -15
think/providers/shared.py
··· 31 31 tool: str 32 32 args: Optional[dict[str, Any]] 33 33 call_id: Optional[str] # Unique ID to pair with tool_end event 34 + raw: Optional[list[dict[str, Any]]] # Original provider JSON event(s) 34 35 35 36 36 37 class ToolEndEvent(TypedDict, total=False): ··· 42 43 args: Optional[dict[str, Any]] 43 44 result: Any 44 45 call_id: Optional[str] # Matches the call_id from tool_start 46 + raw: Optional[list[dict[str, Any]]] # Original provider JSON event(s) 45 47 46 48 47 - class StartEvent(TypedDict): 49 + class StartEvent(TypedDict, total=False): 48 50 """Event emitted when an agent run begins.""" 49 51 50 - event: Literal["start"] 51 - ts: int 52 - prompt: str 53 - name: str 54 - model: str 55 - provider: str 52 + event: Required[Literal["start"]] 53 + ts: Required[int] 54 + prompt: Required[str] 55 + name: Required[str] 56 + model: Required[str] 57 + provider: Required[str] 58 + raw: Optional[list[dict[str, Any]]] # Original provider JSON event(s) 56 59 57 60 58 - class FinishEvent(TypedDict): 61 + class FinishEvent(TypedDict, total=False): 59 62 """Event emitted when an agent run finishes successfully.""" 60 63 61 - event: Literal["finish"] 62 - ts: int 63 - result: str 64 + event: Required[Literal["finish"]] 65 + ts: Required[int] 66 + result: Required[str] 67 + usage: Optional[dict[str, Any]] 68 + cli_session_id: Optional[str] # Provider CLI session/thread ID for resume 69 + raw: Optional[list[dict[str, Any]]] # Original provider JSON event(s) 64 70 65 71 66 72 class ErrorEvent(TypedDict, total=False): ··· 70 76 ts: int 71 77 error: str 72 78 trace: Optional[str] 79 + raw: Optional[list[dict[str, Any]]] # Original provider JSON event(s) 73 80 74 81 75 - class AgentUpdatedEvent(TypedDict): 82 + class AgentUpdatedEvent(TypedDict, total=False): 76 83 """Event emitted when the agent context changes.""" 77 84 78 - event: Literal["agent_updated"] 79 - ts: int 80 - agent: str 85 + event: Required[Literal["agent_updated"]] 86 + ts: Required[int] 87 + agent: Required[str] 88 + raw: Optional[list[dict[str, Any]]] # Original provider JSON event(s) 81 89 82 90 83 91 class ThinkingEvent(TypedDict, total=False): ··· 95 103 model: Optional[str] 96 104 signature: Optional[str] # Anthropic thinking block signature 97 105 redacted_data: Optional[str] # Encrypted data for redacted thinking 106 + raw: Optional[list[dict[str, Any]]] # Original provider JSON event(s) 98 107 99 108 100 109 Event = Union[