personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Unify generators into cortex spawning system

Refactor generate.py to use NDJSON protocol like agents, spawned by
cortex rather than standalone CLI. Key changes:

- generate.py: Remove argparse CLI, add NDJSON stdin mode with event
emission (start/finish/error). Remove dead count_tokens function.
- cortex.py: Add routing logic (tools→agents, output→generators) and
consolidate _spawn_agent/_spawn_generator into shared _spawn_subprocess
- dream.py: Run generators via cortex_request sequentially using
wait_for_agents utility
- importer.py: Use cortex_request for import summaries
- models.py: Add generate_with_result to __all__ exports
- docs: Fix stale -c flag reference and CallosumConnection API example
- tests: Update for NDJSON protocol, add test_spawn_generator

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+1104 -587
+37 -2
docs/CORTEX.md
··· 17 17 ### Key Components 18 18 - **Message Bus Integration**: Cortex connects to Callosum to receive requests and broadcast events 19 19 - **Configuration Loading**: Cortex loads and merges agent configuration with request parameters 20 - - **Process Management**: Spawns agent subprocesses via the `sol agents` command with merged configuration 20 + - **Request Routing**: Routes requests based on config fields: 21 + - `tools` field present → spawns `sol agents` (tool-using agent) 22 + - `output` field present (no `tools`) → spawns `sol generate` (generator) 23 + - Neither field → returns error 24 + - **Process Management**: Spawns agent/generator subprocesses with merged configuration 21 25 - **Event Capture**: Monitors agent stdout/stderr and appends to JSONL files 22 26 - **Dual Event Distribution**: Events go to both persistent files and real-time message bus 23 - - **NDJSON Input Mode**: Agent processes accept newline-delimited JSON via stdin containing the full merged configuration 27 + - **NDJSON Input Mode**: Both agent and generator processes accept newline-delimited JSON via stdin containing the full merged configuration 24 28 25 29 ### File States 26 30 - `<timestamp>_active.jsonl`: Agent currently executing (Cortex is appending events) ··· 62 66 The model is automatically resolved based on the agent context (`agent.{app}.{name}`) 63 67 and the configured tier in `journal.json`. Provider can optionally be overridden at 64 68 request time, which will resolve the appropriate model for that provider at the same tier. 69 + 70 + ## Generator Request Format 71 + 72 + Generators are spawned via Cortex when a request has an `output` field but no `tools` field. They produce analysis output (markdown or JSON) from clustered transcripts. 73 + 74 + ```json 75 + { 76 + "event": "request", 77 + "ts": 1234567890123, // Required: millisecond timestamp 78 + "name": "activity", // Required: generator name from muse/*.md 79 + "day": "20250109", // Required: day in YYYYMMDD format 80 + "output": "md", // Required: output format ("md" or "json") 81 + "segment": "120000_300", // Optional: single segment key (HHMMSS_duration) 82 + "segments": ["120000_300", "120500_300"], // Optional: multiple segment keys 83 + "output_path": "/path/to/file.md", // Optional: override output location 84 + "force": false, // Optional: regenerate even if output exists 85 + "provider": "google", // Optional: AI provider override 86 + "model": "gemini-2.0-flash" // Optional: model override 87 + } 88 + ``` 89 + 90 + ### Generator Events 91 + 92 + Generators emit the same event types as agents: 93 + - `start` - When generation begins 94 + - `finish` - On completion, with `result` containing generated content 95 + - `error` - On failure 96 + 97 + The `finish` event may include a `skipped` field when generation is skipped: 98 + - `"no_input"` - Insufficient transcript content to analyze 99 + - `"disabled"` - Generator is marked as disabled in frontmatter 65 100 66 101 ### Conversation Continuations 67 102
+38 -17
docs/THINK.md
··· 14 14 15 15 The package exposes several commands: 16 16 17 - - `sol generate` builds a Markdown summary of a day's recordings using a Gemini prompt. 17 + - `sol generate` runs generator pipelines spawned by Cortex (NDJSON protocol, not for direct CLI use). 18 18 - `sol cluster` groups audio and screen JSON files into report sections. Use `--start` and 19 19 `--length` to limit the report to a specific time range. 20 - - `sol dream` runs the above tools for a single day. 20 + - `sol dream` runs generators and agents for a single day via Cortex. 21 21 - `sol supervisor` monitors observation heartbeats. Use `--no-observers` to disable local capture (sense still runs for remote uploads and imports). 22 22 - `sol mcp` starts an MCP server exposing search capabilities for both summary text and raw transcripts. 23 - - `sol cortex` starts a WebSocket API server for managing AI agent instances. 23 + - `sol cortex` starts a WebSocket API server for managing AI agent instances and generators. 24 24 25 25 ```bash 26 - sol generate YYYYMMDD -f PROMPT [--segment HHMMSS_LEN] [--segments SEG1,SEG2 -o OUT] [--force] [-v] 27 26 sol cluster YYYYMMDD [--start HHMMSS --length MINUTES] 28 27 sol dream [--day YYYYMMDD] [--segment HHMMSS_LEN] [--force] [--skip-generators] [--skip-agents] 29 28 sol supervisor [--no-observers] ··· 31 30 sol cortex [--host HOST] [--port PORT] [--path PATH] 32 31 ``` 33 32 34 - Use `--segment` to process a single segment, or `--segments` with `-o` to process 35 - multiple specific segments (comma-separated). Use `-o` to override the output path 36 - for any mode. 37 - 38 - Use `-c` to count tokens only, `--force` to overwrite existing files, and `-v` for 39 - verbose logs. 33 + Use `--force` to overwrite existing files, and `-v` for verbose logs. 40 34 41 35 Set `GOOGLE_API_KEY` before running any command that contacts Gemini. 42 36 `JOURNAL_PATH` and `GOOGLE_API_KEY` can also be provided in a `.env` file which ··· 84 78 85 79 ### Cortex: Central Agent Manager 86 80 87 - The Cortex service (`sol cortex`) is the central system for managing AI agent instances. It monitors the journal's `agents/` directory for new requests and manages agent execution. All agent spawning should go through Cortex for proper event tracking and management. 81 + The Cortex service (`sol cortex`) is the central system for managing AI agent instances and generators. It monitors the journal's `agents/` directory for new requests and manages execution. All agent spawning should go through Cortex for proper event tracking and management. 82 + 83 + Cortex routes requests based on configuration: 84 + - Requests with `tools` field → tool-using agents (`sol agents`) 85 + - Requests with `output` field (no `tools`) → generators (`sol generate`) 88 86 89 87 To spawn agents programmatically, use the cortex_client functions: 90 88 ··· 109 107 if message.get('event') == 'finish': 110 108 print(f"Result: {message.get('result')}") 111 109 112 - watcher = CallosumConnection(callback=on_event) 113 - watcher.connect() 110 + watcher = CallosumConnection() 111 + watcher.start(callback=on_event) 114 112 # ... later, when done: 115 - watcher.close() 113 + watcher.stop() 114 + ``` 115 + 116 + ### Spawning Generators via Cortex 117 + 118 + Generators can also be spawned via `cortex_request` by including an `output` field: 119 + 120 + ```python 121 + from think.cortex_client import cortex_request, wait_for_agents 122 + 123 + # Spawn a generator 124 + agent_id = cortex_request( 125 + prompt="", # Generators don't use prompts 126 + name="activity", 127 + config={ 128 + "day": "20250109", 129 + "output": "md", 130 + "force": True, # Regenerate even if output exists 131 + } 132 + ) 133 + 134 + # Wait for completion 135 + completed, timed_out = wait_for_agents([agent_id], timeout=300) 116 136 ``` 117 137 118 138 ### Direct CLI Usage (Testing Only) ··· 208 228 209 229 ## Key Components 210 230 211 - - **cortex.py** - Central agent manager, file watcher, event distribution 212 - - **cortex_client.py** - Client functions: `cortex_request()`, `cortex_agents()` 231 + - **cortex.py** - Central agent manager, file watcher, event distribution, routes to agents.py or generate.py 232 + - **cortex_client.py** - Client functions: `cortex_request()`, `cortex_agents()`, `wait_for_agents()` 213 233 - **mcp.py** - FastMCP server with journal search tools 214 - - **agents.py** - CLI entry point and shared event types 234 + - **agents.py** - CLI entry point for tool-using agents (NDJSON protocol) 235 + - **generate.py** - CLI entry point for generators (NDJSON protocol), spawned by Cortex 215 236 - **models.py** - Unified `generate()`/`agenerate()` API, provider routing, token logging 216 237 - **batch.py** - `Batch` class for concurrent LLM requests with dynamic queuing 217 238
+69
tests/test_cortex.py
··· 140 140 141 141 142 142 @patch("think.cortex.subprocess.Popen") 143 + @patch("think.cortex.threading.Thread") 144 + @patch("think.cortex.threading.Timer") 145 + def test_spawn_generator(mock_timer, mock_thread, mock_popen, cortex_service, mock_journal): 146 + """Test spawning a generator subprocess.""" 147 + mock_process = MagicMock() 148 + mock_process.pid = 54321 149 + mock_process.poll.return_value = None 150 + mock_process.stdin = MagicMock() 151 + mock_process.stdout = MagicMock() 152 + mock_process.stderr = MagicMock() 153 + mock_popen.return_value = mock_process 154 + 155 + # Setup mock timer 156 + mock_timer_instance = MagicMock() 157 + mock_timer.return_value = mock_timer_instance 158 + 159 + agent_id = "987654321" 160 + file_path = mock_journal / "agents" / f"{agent_id}_active.jsonl" 161 + 162 + # Generator config has "output" instead of "tools" 163 + config = { 164 + "event": "request", 165 + "ts": 987654321, 166 + "name": "activity", 167 + "day": "20240101", 168 + "output": "md", 169 + } 170 + 171 + cortex_service._spawn_generator( 172 + agent_id, 173 + file_path, 174 + config, 175 + ) 176 + 177 + # Check subprocess was called with generate command 178 + mock_popen.assert_called_once() 179 + call_args = mock_popen.call_args 180 + assert call_args[0][0] == ["sol", "generate"] 181 + assert call_args[1]["stdin"] is not None 182 + assert call_args[1]["stdout"] is not None 183 + assert call_args[1]["stderr"] is not None 184 + 185 + # Check NDJSON was written to stdin 186 + mock_process.stdin.write.assert_called_once() 187 + written_data = mock_process.stdin.write.call_args[0][0] 188 + ndjson = json.loads(written_data.strip()) 189 + assert ndjson["event"] == "request" 190 + assert ndjson["name"] == "activity" 191 + assert ndjson["day"] == "20240101" 192 + assert ndjson["output"] == "md" 193 + 194 + # Check stdin was closed 195 + mock_process.stdin.close.assert_called_once() 196 + 197 + # Check generator was tracked 198 + assert agent_id in cortex_service.running_agents 199 + agent = cortex_service.running_agents[agent_id] 200 + assert agent.agent_id == agent_id 201 + assert agent.log_path == file_path 202 + 203 + # Check monitoring threads were started 204 + assert mock_thread.call_count == 2 # stdout and stderr 205 + 206 + # Check timer was created and started 207 + mock_timer.assert_called_once() 208 + mock_timer_instance.start.assert_called_once() 209 + 210 + 211 + @patch("think.cortex.subprocess.Popen") 143 212 def test_spawn_agent_with_handoff_from(mock_popen, cortex_service, mock_journal): 144 213 """Test spawning an agent with handoff_from parameter.""" 145 214 mock_process = MagicMock()
+198 -49
tests/test_generate_full.py
··· 4 4 """Tests for the generator output pipeline. 5 5 6 6 Tests cover: 7 - - Basic output generation and saving 7 + - Basic output generation via NDJSON protocol 8 8 - Hook invocation with correct context 9 - - Named hook resolution 9 + - Generators without hooks 10 10 """ 11 11 12 12 import importlib 13 + import io 13 14 import json 14 15 import os 15 16 import shutil ··· 34 35 35 36 36 37 # Mock result must be >= MIN_INPUT_CHARS (50) to generate output 37 - MOCK_RESULT = "## Meeting Summary\n\nTeam standup at 9am with Alice and Bob discussing project status." 38 + MOCK_RESULT = { 39 + "text": "## Meeting Summary\n\nTeam standup at 9am with Alice and Bob discussing project status.", 40 + "usage": {"input_tokens": 100, "output_tokens": 50}, 41 + } 42 + 43 + 44 + def run_generator_with_config(mod, config: dict, monkeypatch) -> list[dict]: 45 + """Run generator with NDJSON config and capture output events.""" 46 + # Mock stdin with config 47 + stdin_data = json.dumps(config) + "\n" 48 + monkeypatch.setattr("sys.stdin", io.StringIO(stdin_data)) 38 49 50 + # Capture stdout 51 + captured_output = io.StringIO() 52 + monkeypatch.setattr("sys.stdout", captured_output) 39 53 40 - def test_generate_output(tmp_path, monkeypatch): 41 - """Test basic output generation saves markdown output.""" 54 + # Run main 55 + mod.main() 56 + 57 + # Parse output events 58 + events = [] 59 + captured_output.seek(0) 60 + for line in captured_output: 61 + line = line.strip() 62 + if line: 63 + events.append(json.loads(line)) 64 + 65 + return events 66 + 67 + 68 + def test_generate_output_ndjson(tmp_path, monkeypatch): 69 + """Test basic output generation via NDJSON protocol.""" 42 70 mod = importlib.import_module("think.generate") 43 - day_dir = copy_day(tmp_path) 44 - prompt = tmp_path / "prompt.md" 45 - prompt.write_text('{\n "schedule": "daily"\n}\n\nprompt') 71 + copy_day(tmp_path) 46 72 47 - monkeypatch.setattr( 48 - mod, 49 - "generate_agent_output", 50 - lambda *a, **k: MOCK_RESULT, 73 + # Create a test generator in muse directory 74 + muse_dir = Path(mod.__file__).resolve().parent.parent / "muse" 75 + test_generator = muse_dir / "test_gen.md" 76 + test_generator.write_text( 77 + '{\n "schedule": "daily",\n "output": "md"\n}\n\nTest prompt' 51 78 ) 52 - monkeypatch.setenv("GOOGLE_API_KEY", "x") 53 - monkeypatch.setenv("JOURNAL_PATH", str(tmp_path)) 54 - monkeypatch.setattr("sys.argv", ["sol generate", "20240101", "-f", str(prompt)]) 55 - mod.main() 56 79 57 - md = day_dir / "agents" / "prompt.md" 58 - assert md.read_text() == MOCK_RESULT 80 + try: 81 + monkeypatch.setattr( 82 + mod, 83 + "generate_agent_output", 84 + lambda *a, **k: ( 85 + MOCK_RESULT if k.get("return_result") else MOCK_RESULT["text"] 86 + ), 87 + ) 88 + monkeypatch.setenv("GOOGLE_API_KEY", "x") 89 + monkeypatch.setenv("JOURNAL_PATH", str(tmp_path)) 90 + 91 + config = { 92 + "name": "test_gen", 93 + "day": "20240101", 94 + "output": "md", 95 + "provider": "google", 96 + "model": "gemini-2.0-flash", 97 + } 98 + 99 + events = run_generator_with_config(mod, config, monkeypatch) 100 + 101 + # Should have start and finish events 102 + assert len(events) >= 2 103 + assert events[0]["event"] == "start" 104 + assert events[0]["name"] == "test_gen" 105 + 106 + # Find finish event 107 + finish_events = [e for e in events if e["event"] == "finish"] 108 + assert len(finish_events) == 1 109 + assert finish_events[0]["result"] == MOCK_RESULT["text"] 110 + 111 + finally: 112 + if test_generator.exists(): 113 + test_generator.unlink() 59 114 60 115 61 116 def test_generate_hook_invoked_with_context(tmp_path, monkeypatch): ··· 63 118 mod = importlib.import_module("think.generate") 64 119 copy_day(tmp_path) 65 120 66 - # Create generator with hook 67 - generators_dir = tmp_path / "generators" 68 - generators_dir.mkdir() 69 - 70 - prompt_file = generators_dir / "hooked.md" 71 - prompt_file.write_text( 72 - '{\n "title": "Hooked",\n "schedule": "daily",\n "hook": "test_hook"\n}\n\nTest prompt' 73 - ) 74 - 75 121 # Create the hook file in muse/ directory 76 - hooks_dir = Path(mod.__file__).resolve().parent.parent / "muse" 77 - hook_file = hooks_dir / "test_hook.py" 122 + muse_dir = Path(mod.__file__).resolve().parent.parent / "muse" 123 + hook_file = muse_dir / "test_hook.py" 78 124 hook_file.write_text(""" 79 125 def process(result, context): 80 126 import json 81 127 from pathlib import Path 82 128 # Write context to file for test verification 83 129 out_path = Path(context["output_path"]).parent / "context_captured.json" 130 + out_path.parent.mkdir(parents=True, exist_ok=True) 84 131 ctx_copy = { 85 132 "day": context.get("day"), 86 133 "segment": context.get("segment"), ··· 94 141 return None 95 142 """) 96 143 144 + # Create generator with hook 145 + test_generator = muse_dir / "hooked_gen.md" 146 + test_generator.write_text( 147 + '{\n "title": "Hooked",\n "schedule": "daily",\n "output": "md",\n "hook": "test_hook"\n}\n\nTest prompt' 148 + ) 149 + 97 150 try: 98 151 monkeypatch.setattr( 99 152 mod, 100 153 "generate_agent_output", 101 - lambda *a, **k: MOCK_RESULT, 154 + lambda *a, **k: ( 155 + MOCK_RESULT if k.get("return_result") else MOCK_RESULT["text"] 156 + ), 102 157 ) 103 158 monkeypatch.setenv("GOOGLE_API_KEY", "x") 104 159 monkeypatch.setenv("JOURNAL_PATH", str(tmp_path)) 105 - monkeypatch.setattr( 106 - "sys.argv", ["sol generate", "20240101", "-f", str(prompt_file)] 107 - ) 108 - mod.main() 160 + 161 + config = { 162 + "name": "hooked_gen", 163 + "day": "20240101", 164 + "output": "md", 165 + "provider": "google", 166 + "model": "gemini-2.0-flash", 167 + } 168 + 169 + events = run_generator_with_config(mod, config, monkeypatch) 170 + 171 + # Should have start and finish events 172 + finish_events = [e for e in events if e["event"] == "finish"] 173 + assert len(finish_events) == 1 109 174 110 175 # Read captured context 111 176 captured_path = tmp_path / "20240101" / "agents" / "context_captured.json" ··· 114 179 assert captured["day"] == "20240101" 115 180 assert captured["segment"] is None 116 181 assert captured["multi_segment"] is False 117 - assert captured["name"] == "hooked" 182 + assert captured["name"] == "hooked_gen" 118 183 assert captured["has_transcript"] is True 119 184 assert captured["has_meta"] is True 120 185 121 186 finally: 122 - # Clean up test hook 187 + # Clean up test files 123 188 if hook_file.exists(): 124 189 hook_file.unlink() 190 + if test_generator.exists(): 191 + test_generator.unlink() 125 192 126 193 127 194 def test_generate_without_hook_succeeds(tmp_path, monkeypatch): 128 195 """Test that generators without hooks still work correctly.""" 129 196 mod = importlib.import_module("think.generate") 130 - day_dir = copy_day(tmp_path) 197 + copy_day(tmp_path) 131 198 132 199 # Create generator without hook 133 - prompt = tmp_path / "nohook.md" 134 - prompt.write_text('{\n "schedule": "daily"\n}\n\nNo hook prompt') 200 + muse_dir = Path(mod.__file__).resolve().parent.parent / "muse" 201 + test_generator = muse_dir / "nohook_gen.md" 202 + test_generator.write_text( 203 + '{\n "schedule": "daily",\n "output": "md"\n}\n\nNo hook prompt' 204 + ) 205 + 206 + try: 207 + monkeypatch.setattr( 208 + mod, 209 + "generate_agent_output", 210 + lambda *a, **k: ( 211 + MOCK_RESULT if k.get("return_result") else MOCK_RESULT["text"] 212 + ), 213 + ) 214 + monkeypatch.setenv("GOOGLE_API_KEY", "x") 215 + monkeypatch.setenv("JOURNAL_PATH", str(tmp_path)) 216 + 217 + config = { 218 + "name": "nohook_gen", 219 + "day": "20240101", 220 + "output": "md", 221 + "provider": "google", 222 + "model": "gemini-2.0-flash", 223 + } 224 + 225 + events = run_generator_with_config(mod, config, monkeypatch) 226 + 227 + # Should have start and finish events 228 + assert len(events) >= 2 229 + finish_events = [e for e in events if e["event"] == "finish"] 230 + assert len(finish_events) == 1 231 + assert finish_events[0]["result"] == MOCK_RESULT["text"] 232 + 233 + finally: 234 + if test_generator.exists(): 235 + test_generator.unlink() 236 + 237 + 238 + def test_generate_error_event_on_missing_generator(tmp_path, monkeypatch): 239 + """Test that missing generator name emits error event.""" 240 + mod = importlib.import_module("think.generate") 241 + copy_day(tmp_path) 242 + 243 + monkeypatch.setenv("JOURNAL_PATH", str(tmp_path)) 244 + 245 + config = { 246 + "name": "nonexistent_generator", 247 + "day": "20240101", 248 + "output": "md", 249 + } 250 + 251 + events = run_generator_with_config(mod, config, monkeypatch) 252 + 253 + # Should have an error event 254 + error_events = [e for e in events if e["event"] == "error"] 255 + assert len(error_events) == 1 256 + assert "not found" in error_events[0]["error"].lower() 257 + 258 + 259 + def test_generate_skipped_on_no_input(tmp_path, monkeypatch): 260 + """Test that generator emits skipped finish when no input.""" 261 + mod = importlib.import_module("think.generate") 262 + 263 + # Create empty day directory (no transcripts) 264 + os.environ["JOURNAL_PATH"] = str(tmp_path) 265 + day_dir = day_path("20240101") 266 + day_dir.mkdir(parents=True, exist_ok=True) 135 267 136 - monkeypatch.setattr( 137 - mod, 138 - "generate_agent_output", 139 - lambda *a, **k: MOCK_RESULT, 268 + # Create a test generator 269 + muse_dir = Path(mod.__file__).resolve().parent.parent / "muse" 270 + test_generator = muse_dir / "empty_gen.md" 271 + test_generator.write_text( 272 + '{\n "schedule": "daily",\n "output": "md"\n}\n\nTest prompt' 140 273 ) 141 - monkeypatch.setenv("GOOGLE_API_KEY", "x") 142 - monkeypatch.setenv("JOURNAL_PATH", str(tmp_path)) 143 - monkeypatch.setattr("sys.argv", ["sol generate", "20240101", "-f", str(prompt)]) 144 - mod.main() 274 + 275 + try: 276 + monkeypatch.setenv("GOOGLE_API_KEY", "x") 277 + monkeypatch.setenv("JOURNAL_PATH", str(tmp_path)) 145 278 146 - md = day_dir / "agents" / "nohook.md" 147 - assert md.read_text() == MOCK_RESULT 279 + config = { 280 + "name": "empty_gen", 281 + "day": "20240101", 282 + "output": "md", 283 + "provider": "google", 284 + "model": "gemini-2.0-flash", 285 + } 286 + 287 + events = run_generator_with_config(mod, config, monkeypatch) 288 + 289 + # Should have start and finish with skipped 290 + finish_events = [e for e in events if e["event"] == "finish"] 291 + assert len(finish_events) == 1 292 + assert finish_events[0].get("skipped") == "no_input" 293 + 294 + finally: 295 + if test_generator.exists(): 296 + test_generator.unlink() 148 297 149 298 150 299 def test_named_hook_resolution(tmp_path, monkeypatch):
+33 -15
tests/test_importer.py
··· 246 246 247 247 248 248 def test_run_import_summary(tmp_path, monkeypatch): 249 - """Test _run_import_summary calls sol generate correctly.""" 249 + """Test _run_import_summary calls cortex_request correctly.""" 250 250 mod = importlib.import_module("think.importer") 251 251 252 252 import_dir = tmp_path / "imports" / "20240101_120000" 253 253 import_dir.mkdir(parents=True) 254 254 255 - # Mock subprocess.run to simulate successful sol generate 256 - def mock_run(cmd, *args, **kwargs): 257 - # Create the summary file like sol generate would 255 + captured_request = {} 256 + 257 + def mock_cortex_request(prompt, name, config): 258 + captured_request.update({"prompt": prompt, "name": name, "config": config}) 259 + # Create the summary file like the generator would 258 260 summary_path = import_dir / "summary.md" 259 261 summary_path.write_text("# Test Summary\n\nContent here.") 260 - mock_result = MagicMock() 261 - mock_result.returncode = 0 262 - return mock_result 262 + return "mock_agent_id" 263 263 264 - with patch("subprocess.run", side_effect=mock_run) as mock_subprocess: 264 + def mock_wait_for_agents(agent_ids, timeout): 265 + return (agent_ids, []) # All completed, none timed out 266 + 267 + def mock_get_agent_end_state(agent_id): 268 + return "finish" 269 + 270 + with ( 271 + patch( 272 + "think.cortex_client.cortex_request", side_effect=mock_cortex_request 273 + ), 274 + patch( 275 + "think.cortex_client.wait_for_agents", side_effect=mock_wait_for_agents 276 + ), 277 + patch( 278 + "think.cortex_client.get_agent_end_state", 279 + side_effect=mock_get_agent_end_state, 280 + ), 281 + ): 265 282 result = mod._run_import_summary( 266 283 import_dir, 267 284 "20240101", ··· 271 288 assert result is True 272 289 assert (import_dir / "summary.md").exists() 273 290 274 - # Verify correct command was called 275 - call_args = mock_subprocess.call_args[0][0] 276 - assert "sol" in call_args 277 - assert "insight" in call_args 278 - assert "importer" in call_args 279 - assert "--segments" in call_args 280 - assert "120000_300,120500_300" in call_args 291 + # Verify cortex_request was called with correct config 292 + assert captured_request["name"] == "importer" 293 + assert captured_request["config"]["day"] == "20240101" 294 + assert captured_request["config"]["segments"] == ["120000_300", "120500_300"] 295 + assert captured_request["config"]["output"] == "md" 296 + assert ( 297 + str(import_dir / "summary.md") in captured_request["config"]["output_path"] 298 + ) 281 299 282 300 283 301 def test_run_import_summary_no_segments(tmp_path):
+137 -123
tests/test_output_hooks.py
··· 1 1 # SPDX-License-Identifier: AGPL-3.0-only 2 2 # Copyright (c) 2026 sol pbc 3 3 4 + """Tests for the generator output hooks system. 5 + 6 + Tests cover: 7 + - Hook loading and validation 8 + - Hook invocation via NDJSON protocol 9 + - Hook error handling 10 + """ 11 + 4 12 import importlib 13 + import io 5 14 import json 6 15 import os 7 16 import shutil ··· 24 33 return dest 25 34 26 35 27 - MOCK_RESULT = "## Original Result\n\nThis is the original output content." 36 + MOCK_RESULT = { 37 + "text": "## Original Result\n\nThis is the original output content.", 38 + "usage": {"input_tokens": 100, "output_tokens": 50}, 39 + } 40 + 41 + 42 + def run_generator_with_config(mod, config: dict, monkeypatch) -> list[dict]: 43 + """Run generator with NDJSON config and capture output events.""" 44 + stdin_data = json.dumps(config) + "\n" 45 + monkeypatch.setattr("sys.stdin", io.StringIO(stdin_data)) 46 + 47 + captured_output = io.StringIO() 48 + monkeypatch.setattr("sys.stdout", captured_output) 49 + 50 + mod.main() 51 + 52 + events = [] 53 + captured_output.seek(0) 54 + for line in captured_output: 55 + line = line.strip() 56 + if line: 57 + events.append(json.loads(line)) 58 + 59 + return events 28 60 29 61 30 62 def test_load_output_hook_success(tmp_path): ··· 112 144 def test_output_hook_invocation(tmp_path, monkeypatch): 113 145 """Test that generate.py invokes hook and uses transformed result.""" 114 146 mod = importlib.import_module("think.generate") 115 - day_dir = copy_day(tmp_path) 147 + copy_day(tmp_path) 116 148 117 - # Create generator with hook 118 - generators_dir = tmp_path / "generators" 119 - generators_dir.mkdir() 149 + # Create generator with hook in muse directory 150 + muse_dir = Path(mod.__file__).resolve().parent.parent / "muse" 120 151 121 - prompt_file = generators_dir / "hooked.md" 152 + prompt_file = muse_dir / "hooked_test.md" 122 153 prompt_file.write_text( 123 - '{\n "title": "Hooked",\n "occurrences": false,\n "schedule": "daily"\n}\n\nTest prompt' 154 + '{\n "title": "Hooked",\n "schedule": "daily",\n "output": "md"\n}\n\nTest prompt' 124 155 ) 125 156 126 - hook_file = generators_dir / "hooked.py" 157 + hook_file = muse_dir / "hooked_test.py" 127 158 hook_file.write_text(""" 128 159 def process(result, context): 129 160 # Verify context has expected fields ··· 133 164 return result + "\\n\\n## Hook was here" 134 165 """) 135 166 136 - monkeypatch.setattr( 137 - mod, 138 - "generate_agent_output", 139 - lambda *a, **k: MOCK_RESULT, 140 - ) 141 - monkeypatch.setenv("GOOGLE_API_KEY", "x") 142 - monkeypatch.setenv("JOURNAL_PATH", str(tmp_path)) 143 - monkeypatch.setattr( 144 - "sys.argv", ["sol generate", "20240101", "-f", str(prompt_file)] 145 - ) 167 + try: 168 + monkeypatch.setattr( 169 + mod, 170 + "generate_agent_output", 171 + lambda *a, **k: ( 172 + MOCK_RESULT if k.get("return_result") else MOCK_RESULT["text"] 173 + ), 174 + ) 175 + monkeypatch.setenv("GOOGLE_API_KEY", "x") 176 + monkeypatch.setenv("JOURNAL_PATH", str(tmp_path)) 146 177 147 - mod.main() 178 + config = { 179 + "name": "hooked_test", 180 + "day": "20240101", 181 + "output": "md", 182 + "provider": "google", 183 + "model": "gemini-2.0-flash", 184 + } 185 + 186 + events = run_generator_with_config(mod, config, monkeypatch) 187 + 188 + # Find finish event 189 + finish_events = [e for e in events if e["event"] == "finish"] 190 + assert len(finish_events) == 1 191 + 192 + content = finish_events[0]["result"] 193 + assert "## Original Result" in content 194 + assert "## Hook was here" in content 148 195 149 - md = day_dir / "agents" / "hooked.md" 150 - content = md.read_text() 151 - assert "## Original Result" in content 152 - assert "## Hook was here" in content 196 + finally: 197 + if hook_file.exists(): 198 + hook_file.unlink() 199 + if prompt_file.exists(): 200 + prompt_file.unlink() 153 201 154 202 155 203 def test_output_hook_returns_none(tmp_path, monkeypatch): 156 204 """Test that hook returning None uses original result.""" 157 205 mod = importlib.import_module("think.generate") 158 - day_dir = copy_day(tmp_path) 206 + copy_day(tmp_path) 159 207 160 - generators_dir = tmp_path / "generators" 161 - generators_dir.mkdir() 208 + muse_dir = Path(mod.__file__).resolve().parent.parent / "muse" 162 209 163 - prompt_file = generators_dir / "noop.md" 210 + prompt_file = muse_dir / "noop_test.md" 164 211 prompt_file.write_text( 165 - '{\n "title": "Noop",\n "occurrences": false,\n "schedule": "daily"\n}\n\nTest prompt' 212 + '{\n "title": "Noop",\n "schedule": "daily",\n "output": "md"\n}\n\nTest prompt' 166 213 ) 167 214 168 - hook_file = generators_dir / "noop.py" 215 + hook_file = muse_dir / "noop_test.py" 169 216 hook_file.write_text(""" 170 217 def process(result, context): 171 218 return None # Signal to use original 172 219 """) 173 220 174 - monkeypatch.setattr( 175 - mod, 176 - "generate_agent_output", 177 - lambda *a, **k: MOCK_RESULT, 178 - ) 179 - monkeypatch.setenv("GOOGLE_API_KEY", "x") 180 - monkeypatch.setenv("JOURNAL_PATH", str(tmp_path)) 181 - monkeypatch.setattr( 182 - "sys.argv", ["sol generate", "20240101", "-f", str(prompt_file)] 183 - ) 221 + try: 222 + monkeypatch.setattr( 223 + mod, 224 + "generate_agent_output", 225 + lambda *a, **k: ( 226 + MOCK_RESULT if k.get("return_result") else MOCK_RESULT["text"] 227 + ), 228 + ) 229 + monkeypatch.setenv("GOOGLE_API_KEY", "x") 230 + monkeypatch.setenv("JOURNAL_PATH", str(tmp_path)) 184 231 185 - mod.main() 232 + config = { 233 + "name": "noop_test", 234 + "day": "20240101", 235 + "output": "md", 236 + "provider": "google", 237 + "model": "gemini-2.0-flash", 238 + } 239 + 240 + events = run_generator_with_config(mod, config, monkeypatch) 241 + 242 + finish_events = [e for e in events if e["event"] == "finish"] 243 + assert len(finish_events) == 1 244 + assert finish_events[0]["result"] == MOCK_RESULT["text"] 186 245 187 - md = day_dir / "agents" / "noop.md" 188 - content = md.read_text() 189 - assert content == MOCK_RESULT # Original, not modified 246 + finally: 247 + if hook_file.exists(): 248 + hook_file.unlink() 249 + if prompt_file.exists(): 250 + prompt_file.unlink() 190 251 191 252 192 253 def test_output_hook_error_fallback(tmp_path, monkeypatch): 193 254 """Test that hook errors fall back to original result.""" 194 255 mod = importlib.import_module("think.generate") 195 - day_dir = copy_day(tmp_path) 256 + copy_day(tmp_path) 196 257 197 - generators_dir = tmp_path / "generators" 198 - generators_dir.mkdir() 258 + muse_dir = Path(mod.__file__).resolve().parent.parent / "muse" 199 259 200 - prompt_file = generators_dir / "broken.md" 260 + prompt_file = muse_dir / "broken_test.md" 201 261 prompt_file.write_text( 202 - '{\n "title": "Broken",\n "occurrences": false,\n "schedule": "daily"\n}\n\nTest prompt' 262 + '{\n "title": "Broken",\n "schedule": "daily",\n "output": "md"\n}\n\nTest prompt' 203 263 ) 204 264 205 - hook_file = generators_dir / "broken.py" 265 + hook_file = muse_dir / "broken_test.py" 206 266 hook_file.write_text(""" 207 267 def process(result, context): 208 268 raise RuntimeError("Hook exploded!") 209 269 """) 210 270 211 - monkeypatch.setattr( 212 - mod, 213 - "generate_agent_output", 214 - lambda *a, **k: MOCK_RESULT, 215 - ) 216 - monkeypatch.setenv("GOOGLE_API_KEY", "x") 217 - monkeypatch.setenv("JOURNAL_PATH", str(tmp_path)) 218 - monkeypatch.setattr( 219 - "sys.argv", ["sol generate", "20240101", "-f", str(prompt_file)] 220 - ) 221 - 222 - # Should not raise, should fall back gracefully 223 - mod.main() 224 - 225 - md = day_dir / "agents" / "broken.md" 226 - content = md.read_text() 227 - assert content == MOCK_RESULT # Original result preserved 228 - 229 - 230 - def test_output_hook_context_fields(tmp_path, monkeypatch): 231 - """Test that hook receives complete context dict.""" 232 - mod = importlib.import_module("think.generate") 233 - copy_day(tmp_path) 234 - 235 - generators_dir = tmp_path / "generators" 236 - generators_dir.mkdir() 237 - 238 - prompt_file = generators_dir / "context_check.md" 239 - prompt_file.write_text( 240 - '{\n "title": "Context Check",\n "occurrences": false,\n "schedule": "daily"\n}\n\nTest prompt' 241 - ) 242 - 243 - # Write captured context to a file for verification 244 - hook_file = generators_dir / "context_check.py" 245 - hook_file.write_text(""" 246 - import json 247 - from pathlib import Path 271 + try: 272 + monkeypatch.setattr( 273 + mod, 274 + "generate_agent_output", 275 + lambda *a, **k: ( 276 + MOCK_RESULT if k.get("return_result") else MOCK_RESULT["text"] 277 + ), 278 + ) 279 + monkeypatch.setenv("GOOGLE_API_KEY", "x") 280 + monkeypatch.setenv("JOURNAL_PATH", str(tmp_path)) 248 281 249 - def process(result, context): 250 - # Write context to file for test verification 251 - out_path = Path(context["output_path"]).parent / "context_captured.json" 252 - with open(out_path, "w") as f: 253 - # Remove transcript for brevity, just check it exists 254 - ctx_copy = dict(context) 255 - ctx_copy["has_transcript"] = bool(ctx_copy.get("transcript")) 256 - ctx_copy["has_meta"] = bool(ctx_copy.get("meta")) 257 - del ctx_copy["transcript"] 258 - del ctx_copy["meta"] 259 - json.dump(ctx_copy, f) 260 - return result 261 - """) 282 + config = { 283 + "name": "broken_test", 284 + "day": "20240101", 285 + "output": "md", 286 + "provider": "google", 287 + "model": "gemini-2.0-flash", 288 + } 262 289 263 - monkeypatch.setattr( 264 - mod, 265 - "generate_agent_output", 266 - lambda *a, **k: MOCK_RESULT, 267 - ) 268 - monkeypatch.setenv("GOOGLE_API_KEY", "x") 269 - monkeypatch.setenv("JOURNAL_PATH", str(tmp_path)) 270 - monkeypatch.setattr( 271 - "sys.argv", ["sol generate", "20240101", "-f", str(prompt_file)] 272 - ) 290 + # Should not raise, should fall back gracefully 291 + events = run_generator_with_config(mod, config, monkeypatch) 273 292 274 - mod.main() 275 - 276 - # Read captured context 277 - captured_path = tmp_path / "20240101" / "agents" / "context_captured.json" 278 - captured = json.loads(captured_path.read_text()) 293 + finish_events = [e for e in events if e["event"] == "finish"] 294 + assert len(finish_events) == 1 295 + assert finish_events[0]["result"] == MOCK_RESULT["text"] 279 296 280 - assert captured["day"] == "20240101" 281 - assert captured["segment"] is None 282 - assert captured["name"] == "context_check" # stem of the prompt file 283 - assert captured["has_transcript"] is True 284 - assert captured["has_meta"] is True 285 - assert "output_path" in captured 297 + finally: 298 + if hook_file.exists(): 299 + hook_file.unlink() 300 + if prompt_file.exists(): 301 + prompt_file.unlink() 286 302 287 303 288 304 def test_named_hook_resolution_takes_precedence(tmp_path): ··· 323 339 324 340 meta = utils._load_prompt_metadata(md_file) 325 341 326 - # Named hook doesn't exist, so no hook_path should be set (co-located not checked when named specified) 327 - # Actually the current implementation checks co-located only if hook field is not set 328 - # So with a nonexistent named hook, no hook_path should be set 342 + # Named hook doesn't exist, so no hook_path should be set 329 343 assert "hook_path" not in meta
+114 -50
think/cortex.py
··· 362 362 else: 363 363 self.agent_handoffs.pop(agent_id, None) 364 364 365 - # Expand tools if it's a string (tool pack name) 366 - tools_config = config.get("tools") 367 - if isinstance(tools_config, str): 368 - pack_names = [p.strip() for p in tools_config.split(",") if p.strip()] 369 - if not pack_names: 370 - pack_names = ["default"] 365 + # Route based on config type: 366 + # - tools present -> agent (sol agents) 367 + # - output present (no tools) -> generator (sol generate) 368 + # - neither -> error 369 + has_tools = bool(config.get("tools")) 370 + has_output = bool(config.get("output")) 371 371 372 - expanded: list[str] = [] 373 - for pack in pack_names: 374 - try: 375 - for tool in get_tools(pack): 376 - if tool not in expanded: 377 - expanded.append(tool) 378 - except KeyError as e: 379 - self.logger.warning( 380 - f"Invalid tool pack '{pack}': {e}, using default" 381 - ) 382 - for tool in get_tools("default"): 383 - if tool not in expanded: 384 - expanded.append(tool) 372 + if has_tools: 373 + # Expand tools if it's a string (tool pack name) 374 + tools_config = config.get("tools") 375 + if isinstance(tools_config, str): 376 + pack_names = [ 377 + p.strip() for p in tools_config.split(",") if p.strip() 378 + ] 379 + if not pack_names: 380 + pack_names = ["default"] 381 + 382 + expanded: list[str] = [] 383 + for pack in pack_names: 384 + try: 385 + for tool in get_tools(pack): 386 + if tool not in expanded: 387 + expanded.append(tool) 388 + except KeyError as e: 389 + self.logger.warning( 390 + f"Invalid tool pack '{pack}': {e}, using default" 391 + ) 392 + for tool in get_tools("default"): 393 + if tool not in expanded: 394 + expanded.append(tool) 395 + 396 + config["tools"] = expanded 385 397 386 - config["tools"] = expanded 398 + # Spawn the agent process with the merged config 399 + self._spawn_agent(agent_id, file_path, config) 387 400 388 - # Spawn the agent process with the merged config 389 - self._spawn_agent(agent_id, file_path, config) 401 + elif has_output: 402 + # Generator: has output format but no tools 403 + self._spawn_generator(agent_id, file_path, config) 404 + 405 + else: 406 + # Neither tools nor output - invalid config 407 + self.logger.error( 408 + f"Invalid agent config for {agent_id}: " 409 + "must have 'tools' or 'output' field" 410 + ) 411 + self._write_error_and_complete( 412 + file_path, 413 + "Invalid agent config: must have 'tools' or 'output' field", 414 + ) 390 415 391 416 except json.JSONDecodeError as e: 392 417 self.logger.error(f"Invalid JSON in request file {file_path}: {e}") ··· 402 427 config: Dict[str, Any], 403 428 ) -> None: 404 429 """Spawn an agent subprocess and monitor its output using the merged config.""" 405 - try: 406 - if self.mcp_server_url and not config.get("disable_mcp", False): 407 - config.setdefault("mcp_server_url", self.mcp_server_url) 430 + if self.mcp_server_url and not config.get("disable_mcp", False): 431 + config.setdefault("mcp_server_url", self.mcp_server_url) 432 + self._spawn_subprocess(agent_id, file_path, config, ["sol", "agents"], "agent") 433 + 434 + def _spawn_generator( 435 + self, 436 + agent_id: str, 437 + file_path: Path, 438 + config: Dict[str, Any], 439 + ) -> None: 440 + """Spawn a generator subprocess and monitor its output. 441 + 442 + Generators are like agents but process transcripts instead of using tools. 443 + They have 'output' field (format) but no 'tools' field. 444 + """ 445 + self._spawn_subprocess( 446 + agent_id, file_path, config, ["sol", "generate"], "generator" 447 + ) 408 448 409 - # Store the config for later use (e.g., for output field) - thread safe 449 + def _spawn_subprocess( 450 + self, 451 + agent_id: str, 452 + file_path: Path, 453 + config: Dict[str, Any], 454 + cmd: list[str], 455 + process_type: str, 456 + ) -> None: 457 + """Spawn a subprocess (agent or generator) and monitor its output. 458 + 459 + Args: 460 + agent_id: Unique identifier for this process 461 + file_path: Path to the JSONL log file 462 + config: Configuration dict to pass via NDJSON stdin 463 + cmd: Command to run (e.g., ["sol", "agents"] or ["sol", "generate"]) 464 + process_type: Label for logging ("agent" or "generator") 465 + """ 466 + try: 467 + # Store the config for later use - thread safe 410 468 with self.lock: 411 469 self.agent_requests[agent_id] = config 412 470 413 - # Pass the full config through to the agent as NDJSON 471 + # Pass the full config through as NDJSON 414 472 ndjson_input = json.dumps(config) 415 473 416 474 # Prepare environment - apply config overrides first, then force JOURNAL_PATH ··· 420 478 env.update({k: str(v) for k, v in env_overrides.items()}) 421 479 env["JOURNAL_PATH"] = str(self.journal_path) 422 480 423 - # Spawn the agent process 424 - cmd = ["sol", "agents"] 425 - self.logger.info(f"Spawning agent {agent_id}: {cmd}") 481 + # Spawn the subprocess 482 + self.logger.info(f"Spawning {process_type} {agent_id}: {cmd}") 426 483 self.logger.debug(f"NDJSON input: {ndjson_input}") 427 484 428 485 process = subprocess.Popen( ··· 439 496 process.stdin.write(ndjson_input + "\n") 440 497 process.stdin.close() 441 498 442 - # Track the running agent 499 + # Track the running process 443 500 agent = AgentProcess(agent_id, process, file_path) 444 501 with self.lock: 445 502 self.running_agents[agent_id] = agent 446 503 447 504 # Set up timeout (default to 10 minutes if not specified) 448 - timeout_seconds = config.get( 449 - "timeout_seconds", 600 450 - ) # 600 seconds = 10 minutes 505 + timeout_seconds = config.get("timeout_seconds", 600) 451 506 agent.timeout_timer = threading.Timer( 452 507 timeout_seconds, 453 508 lambda: self._timeout_agent(agent_id, agent, timeout_seconds), ··· 464 519 ).start() 465 520 466 521 self.logger.info( 467 - f"Agent {agent_id} spawned successfully (PID: {process.pid})" 522 + f"{process_type.capitalize()} {agent_id} spawned successfully " 523 + f"(PID: {process.pid})" 468 524 ) 469 525 470 526 except Exception as e: 471 - self.logger.exception(f"Failed to spawn agent {agent_id}: {e}") 472 - self._write_error_and_complete(file_path, f"Failed to spawn agent: {e}") 527 + self.logger.exception(f"Failed to spawn {process_type} {agent_id}: {e}") 528 + self._write_error_and_complete( 529 + file_path, f"Failed to spawn {process_type}: {e}" 530 + ) 473 531 474 532 def _timeout_agent( 475 533 self, agent_id: str, agent: AgentProcess, timeout_seconds: int ··· 728 786 def _write_output(self, agent_id: str, result: str, config: Dict[str, Any]) -> None: 729 787 """Write agent output to the appropriate location. 730 788 731 - Output path is derived from name + output format + schedule: 732 - - Daily agents: YYYYMMDD/agents/{name}.{ext} 733 - - Segment agents: YYYYMMDD/{segment}/{name}.{ext} 789 + Output path is either: 790 + - Explicit: config["output_path"] (for multi-segment and custom paths) 791 + - Derived: from name + output format + schedule: 792 + - Daily agents: YYYYMMDD/agents/{name}.{ext} 793 + - Segment agents: YYYYMMDD/{segment}/{name}.{ext} 734 794 """ 735 795 try: 736 796 from think.utils import day_path, get_output_path 737 797 738 - output_format = config.get("output", "md") 739 - name = config.get("name", "default") 740 - segment = config.get("segment") # Set by dream.py for segment agents 741 - day = config.get("day") 798 + # Check for explicit output_path override first 799 + if config.get("output_path"): 800 + output_path = Path(config["output_path"]) 801 + else: 802 + output_format = config.get("output", "md") 803 + name = config.get("name", "default") 804 + segment = config.get("segment") # Set for segment agents 805 + day = config.get("day") 742 806 743 - # Get day directory 744 - day_dir = day_path(day) 807 + # Get day directory 808 + day_dir = day_path(day) 745 809 746 - # Derive output path using shared utility 747 - output_path = get_output_path( 748 - day_dir, name, segment=segment, output_format=output_format 749 - ) 810 + # Derive output path using shared utility 811 + output_path = get_output_path( 812 + day_dir, name, segment=segment, output_format=output_format 813 + ) 750 814 751 815 # Ensure parent directory exists 752 816 output_path.parent.mkdir(parents=True, exist_ok=True)
+128 -41
think/dream.py
··· 125 125 listener.stop() 126 126 127 127 128 - def build_commands( 129 - day: str, force: bool, verbose: bool = False, segment: str | None = None 128 + def build_pre_generator_commands( 129 + day: str, verbose: bool = False, segment: str | None = None 130 130 ) -> list[list[str]]: 131 - """Build processing commands for a day or specific segment. 131 + """Build pre-generator commands (sense repair for daily mode). 132 132 133 133 Args: 134 134 day: YYYYMMDD format 135 - segment: Optional HHMMSS_LEN format (e.g., "163045_300") 136 - force: Overwrite existing files 135 + segment: Optional HHMMSS_LEN format (if set, skip sense) 137 136 verbose: Verbose logging 138 137 """ 139 138 commands: list[list[str]] = [] 140 139 141 - # Determine target schedule and what to run 142 - if segment: 143 - logging.info("Running segment processing for %s/%s", day, segment) 144 - target_schedule = "segment" 145 - # No sense repair for segments (already processed during observation) 146 - 147 - else: 148 - logging.info("Running daily processing for %s", day) 149 - target_schedule = "daily" 150 - # Daily-only: repair routines 140 + if not segment: 141 + # Daily-only: repair routines run before generators 151 142 cmd = ["sol", "sense", "--day", day] 152 143 if verbose: 153 144 cmd.append("-v") 154 145 commands.append(cmd) 155 146 156 - # Run generators filtered by schedule (skips disabled and invalid) 157 - generators = get_generator_agents_by_schedule(target_schedule) 158 - for generator_name, generator_data in generators.items(): 159 - cmd = ["sol", "generate", day, "-f", generator_data["path"]] 160 - if segment: 161 - cmd.extend(["--segment", segment]) 162 - if verbose: 163 - cmd.append("--verbose") 164 - if force: 165 - cmd.append("--force") 166 - commands.append(cmd) 147 + return commands 148 + 149 + 150 + def build_post_generator_commands( 151 + day: str, verbose: bool = False, segment: str | None = None 152 + ) -> list[list[str]]: 153 + """Build post-generator commands (indexer, journal-stats). 154 + 155 + Args: 156 + day: YYYYMMDD format 157 + segment: Optional HHMMSS_LEN format 158 + verbose: Verbose logging 159 + """ 160 + commands: list[list[str]] = [] 167 161 168 162 # Re-index (light mode: excludes historical days, mtime-cached) 169 163 indexer_cmd = ["sol", "indexer", "--rescan"] ··· 179 173 commands.append(stats_cmd) 180 174 181 175 return commands 176 + 177 + 178 + def run_generators_via_cortex( 179 + day: str, force: bool, segment: str | None = None 180 + ) -> tuple[int, int]: 181 + """Run generators via cortex requests sequentially. 182 + 183 + Args: 184 + day: YYYYMMDD format 185 + segment: Optional HHMMSS_LEN format 186 + 187 + Returns: 188 + Tuple of (success_count, fail_count) 189 + """ 190 + from think.cortex_client import get_agent_end_state 191 + 192 + target_schedule = "segment" if segment else "daily" 193 + generators = get_generator_agents_by_schedule(target_schedule) 194 + 195 + if not generators: 196 + logging.info("No generators found for schedule: %s", target_schedule) 197 + return (0, 0) 198 + 199 + logging.info( 200 + "Running %d generators for %s via cortex: %s", 201 + len(generators), 202 + day, 203 + list(generators.keys()), 204 + ) 205 + 206 + success_count = 0 207 + fail_count = 0 208 + 209 + # Run generators sequentially 210 + for generator_name, generator_data in generators.items(): 211 + logging.info("Starting generator: %s", generator_name) 212 + 213 + # Build config for cortex request 214 + config = { 215 + "day": day, 216 + "output": generator_data.get("output", "md"), 217 + } 218 + if segment: 219 + config["segment"] = segment 220 + if force: 221 + config["force"] = True 222 + 223 + try: 224 + # Spawn via cortex 225 + agent_id = cortex_request( 226 + prompt="", # Generators don't use prompt 227 + name=generator_name, 228 + config=config, 229 + ) 230 + logging.info("Spawned generator %s (ID: %s)", generator_name, agent_id) 231 + 232 + # Wait for completion 233 + completed, timed_out = wait_for_agents([agent_id], timeout=600) 234 + 235 + if timed_out: 236 + logging.error( 237 + "Generator %s timed out (ID: %s)", generator_name, agent_id 238 + ) 239 + fail_count += 1 240 + elif completed: 241 + # Check if it finished successfully or with error 242 + end_state = get_agent_end_state(agent_id) 243 + if end_state == "finish": 244 + logging.info("Generator %s completed successfully", generator_name) 245 + success_count += 1 246 + else: 247 + logging.error( 248 + "Generator %s ended with state: %s", generator_name, end_state 249 + ) 250 + fail_count += 1 251 + else: 252 + logging.error("Generator %s did not complete", generator_name) 253 + fail_count += 1 254 + 255 + except Exception as e: 256 + logging.error("Failed to run generator %s: %s", generator_name, e) 257 + fail_count += 1 258 + 259 + return (success_count, fail_count) 182 260 183 261 184 262 def parse_args() -> argparse.ArgumentParser: ··· 453 531 # Emit started event 454 532 emit("started", **event_fields()) 455 533 456 - # Phase 1: Generators 534 + # Phase 1: Generators (pre-commands, generators via cortex, post-commands) 457 535 if not args.skip_generators: 458 - commands = build_commands( 459 - day, args.force, verbose=args.verbose, segment=args.segment 536 + # Run pre-generator commands (e.g., sense repair) 537 + pre_commands = build_pre_generator_commands( 538 + day, verbose=args.verbose, segment=args.segment 460 539 ) 540 + for cmd in pre_commands: 541 + day_log(day, f"starting: {' '.join(cmd)}") 542 + if not run_command(cmd, day): 543 + generator_fail_count += 1 461 544 462 - # Build command names list for logging 463 - command_names = [cmd[1] for cmd in commands] 464 - logging.info(f"Running {len(commands)} generator commands: {command_names}") 545 + # Run generators via cortex 546 + gen_success, gen_fail = run_generators_via_cortex( 547 + day, args.force, segment=args.segment 548 + ) 549 + generator_fail_count += gen_fail 465 550 466 - success_count = 0 467 - for index, cmd in enumerate(commands): 468 - # Log every command attempt 551 + # Run post-generator commands (indexer, journal-stats) 552 + post_commands = build_post_generator_commands( 553 + day, verbose=args.verbose, segment=args.segment 554 + ) 555 + for index, cmd in enumerate(post_commands): 469 556 day_log(day, f"starting: {' '.join(cmd)}") 470 557 471 558 # Emit command event 472 559 emit( 473 560 "command", 474 - **event_fields(command=cmd[1], index=index, total=len(commands)), 561 + **event_fields( 562 + command=cmd[1], index=index, total=len(post_commands) 563 + ), 475 564 ) 476 565 477 566 # Route indexer commands through supervisor queue for serialization ··· 481 570 else: 482 571 success = run_command(cmd, day) 483 572 484 - if success: 485 - success_count += 1 486 - else: 573 + if not success: 487 574 generator_fail_count += 1 488 575 489 576 # Emit generators_completed event 490 577 emit( 491 578 "generators_completed", 492 579 **event_fields( 493 - success=success_count, 580 + success=gen_success, 494 581 failed=generator_fail_count, 495 582 duration_ms=int((time.time() - start_time) * 1000), 496 583 ), 497 584 ) 498 585 499 586 logging.info( 500 - f"Generators completed: {success_count} succeeded, {generator_fail_count} failed" 587 + f"Generators completed: {gen_success} succeeded, {generator_fail_count} failed" 501 588 ) 502 589 503 590 # Exit early if generators failed and agents are requested
+260 -267
think/generate.py
··· 1 1 # SPDX-License-Identifier: AGPL-3.0-only 2 2 # Copyright (c) 2026 sol pbc 3 3 4 - import argparse 4 + """Generator pipeline for transcript analysis. 5 + 6 + Spawned by cortex when a request has 'output' field (no 'tools'). 7 + Reads NDJSON config from stdin, emits JSONL events to stdout. 8 + """ 9 + 10 + import json 5 11 import logging 6 12 import os 13 + import sys 14 + import time 15 + from collections.abc import Callable 7 16 from datetime import datetime 8 17 from pathlib import Path 9 18 10 19 from google import genai 11 20 from google.genai import types 12 21 22 + from think.agents import GenerateResult, JSONEventWriter 13 23 from think.cluster import cluster, cluster_period, cluster_segments_multi 14 - from think.models import generate 15 24 from think.utils import ( 16 - PromptNotFoundError, 17 - _load_prompt_metadata, 18 25 compose_instructions, 19 26 day_log, 20 27 day_path, ··· 22 29 format_segment_times, 23 30 get_generator_agents, 24 31 get_output_path, 25 - get_output_topic, 26 32 load_output_hook, 27 33 load_prompt, 28 34 segment_parse, 29 - setup_cli, 30 35 ) 31 36 32 37 ··· 52 57 return {"processed": sorted(processed), "repairable": sorted(pending)} 53 58 54 59 55 - def count_tokens(markdown: str, prompt: str, api_key: str, model: str) -> None: 56 - client = genai.Client(api_key=api_key) 57 - 58 - total_tokens = client.models.count_tokens( 59 - model=model, 60 - contents=[markdown], 61 - ) 62 - print(f"Token count: {total_tokens}") 63 - 64 - 65 60 def _get_or_create_cache( 66 61 client: genai.Client, 67 62 model: str, ··· 111 106 system_instruction: str | None = None, 112 107 thinking_budget: int | None = None, 113 108 max_output_tokens: int | None = None, 114 - ) -> str: 109 + return_result: bool = False, 110 + ) -> str | GenerateResult: 115 111 """Send clustered transcript to LLM for agent output generation. 116 112 117 113 Args: ··· 126 122 from journal.md via compose_instructions(). 127 123 thinking_budget: Token budget for model thinking. If None, uses default. 128 124 max_output_tokens: Maximum output tokens. If None, uses default. 125 + return_result: If True, return full GenerateResult with usage data. 129 126 130 127 Returns: 131 - Generated agent output content (markdown or JSON string). 128 + Generated agent output content (markdown or JSON string), or 129 + GenerateResult dict if return_result=True. 132 130 """ 131 + from think.models import generate_with_result, resolve_provider 132 + 133 133 # Use provided system_instruction or fall back to default 134 134 if system_instruction is None: 135 135 instructions = compose_instructions(include_datetime=False) ··· 147 147 148 148 # Try to use cache if display name provided 149 149 # Note: caching is Google-specific, so we check provider first 150 - from think.models import resolve_provider 151 - 152 150 provider, model = resolve_provider(context) 153 151 154 152 client = None ··· 162 160 if cache_name: 163 161 # Cache hit: content already in cache, just send prompt. 164 162 # Google-specific params (cached_content, client) are passed via kwargs. 165 - return generate( 163 + result = generate_with_result( 166 164 contents=[prompt], 167 165 context=context, 168 166 temperature=0.3, ··· 175 173 ) 176 174 else: 177 175 # No cache: use unified generate() 178 - return generate( 176 + result = generate_with_result( 179 177 contents=[transcript, prompt], 180 178 context=context, 181 179 temperature=0.3, ··· 185 183 json_output=json_output, 186 184 ) 187 185 186 + if return_result: 187 + return result 188 + return result["text"] 189 + 188 190 189 191 # Minimum content length for insight generation 190 192 MIN_INPUT_CHARS = 50 191 193 192 194 193 - def main() -> None: 194 - parser = argparse.ArgumentParser( 195 - description="Send a day's clustered Markdown to Gemini for analysis." 196 - ) 197 - parser.add_argument( 198 - "day", 199 - help="Day in YYYYMMDD format", 200 - ) 201 - parser.add_argument( 202 - "-f", 203 - "--topic", 204 - "--prompt", 205 - dest="topic", 206 - required=True, 207 - help="Generator key (e.g., 'activity', 'chat:sentiment') or path to .md file", 208 - ) 209 - parser.add_argument( 210 - "-c", 211 - "--count", 212 - action="store_true", 213 - help="Count tokens only and exit", 214 - ) 215 - parser.add_argument( 216 - "--force", 217 - action="store_true", 218 - help="Overwrite output file if it already exists", 219 - ) 220 - parser.add_argument( 221 - "--segment", 222 - help="Segment key in HHMMSS_LEN format (processes only this segment within the day)", 223 - ) 224 - parser.add_argument( 225 - "--segments", 226 - help="Comma-separated segment keys (e.g., '090000_300,100000_600'). Requires -o.", 227 - ) 228 - parser.add_argument( 229 - "-o", 230 - "--output", 231 - help="Output file path (overrides default; required with --segments)", 232 - ) 233 - args = setup_cli(parser) 195 + def _run_generator(config: dict, emit_event: Callable[[dict], None]) -> None: 196 + """Execute generator pipeline with config from cortex. 234 197 235 - # Validate mutual exclusivity of --segment and --segments 236 - if args.segment and args.segments: 237 - parser.error("--segment and --segments are mutually exclusive") 198 + Args: 199 + config: Merged config from cortex containing: 200 + - name: Generator key (e.g., 'activity', 'chat:sentiment') 201 + - day: Day in YYYYMMDD format 202 + - segment: Optional single segment key 203 + - segments: Optional list of segment keys 204 + - output: Output format ('md' or 'json') 205 + - output_path: Optional custom output path 206 + - force: Whether to regenerate existing output 207 + - provider: AI provider 208 + - model: Model name 209 + emit_event: Callback to emit JSONL events 210 + """ 211 + name = config.get("name", "default") 212 + day = config.get("day") 213 + segment = config.get("segment") 214 + segments = config.get("segments") # List of segment keys 215 + output_format = config.get("output", "md") 216 + output_path_override = config.get("output_path") 217 + force = config.get("force", False) 218 + provider = config.get("provider", "google") 219 + model = config.get("model") 238 220 239 - # Validate -o is required with --segments 240 - if args.segments and not args.output: 241 - parser.error("--segments requires -o/--output to specify output file path") 221 + if not day: 222 + raise ValueError("Missing 'day' field in generator config") 223 + 224 + # Emit start event 225 + emit_event( 226 + { 227 + "event": "start", 228 + "ts": int(time.time() * 1000), 229 + "prompt": "", # Generators don't have user prompts 230 + "name": name, 231 + "model": model or "unknown", 232 + "provider": provider, 233 + } 234 + ) 242 235 243 236 # Set segment key for token usage logging 244 - if args.segment: 245 - os.environ["SEGMENT_KEY"] = args.segment 246 - elif args.segments: 247 - # Use first segment for logging context 248 - first_segment = args.segments.split(",")[0].strip() 249 - os.environ["SEGMENT_KEY"] = first_segment 237 + if segment: 238 + os.environ["SEGMENT_KEY"] = segment 239 + elif segments: 240 + os.environ["SEGMENT_KEY"] = segments[0] 250 241 251 - # Resolve generator key or path to metadata 242 + # Load generator metadata 252 243 all_generators = get_generator_agents() 253 - topic_arg = args.topic 254 - 255 - # Check if it's a known generator key first 256 - if topic_arg in all_generators: 257 - name = topic_arg 244 + if name in all_generators: 258 245 meta = all_generators[name] 259 246 agent_path = Path(meta["path"]) 260 - elif Path(topic_arg).exists(): 261 - # Fall back to treating it as a file path (backwards compat) 262 - agent_path = Path(topic_arg) 263 - # Try to find matching key by path 264 - name = agent_path.stem 265 - found_in_registry = False 266 - for key, m in all_generators.items(): 267 - if m.get("path") == str(agent_path): 268 - name = key 269 - found_in_registry = True 270 - break 271 - if found_in_registry: 272 - meta = all_generators[name] 273 - else: 274 - # Load metadata directly from file for ad-hoc generators 275 - meta = _load_prompt_metadata(agent_path) 276 247 else: 277 - parser.error( 278 - f"Generator not found: {topic_arg}. " 279 - f"Available: {', '.join(sorted(all_generators.keys()))}" 280 - ) 248 + raise ValueError(f"Generator not found: {name}") 281 249 282 - # Check if generator is disabled via journal config 250 + # Check if generator is disabled 283 251 if meta.get("disabled"): 284 - logging.info("Generator %s is disabled in journal config, skipping", name) 285 - day_log(args.day, f"generate {get_output_topic(topic_arg)} skipped (disabled)") 252 + logging.info("Generator %s is disabled, skipping", name) 253 + emit_event( 254 + { 255 + "event": "finish", 256 + "ts": int(time.time() * 1000), 257 + "result": "", 258 + "skipped": "disabled", 259 + } 260 + ) 286 261 return 287 - 288 - output_format = meta.get("output") # "json" or None (markdown) 289 - success = False 290 262 291 263 # Extract instructions config for source filtering and system prompt 292 264 instructions_config = meta.get("instructions") 293 - 294 - # Use compose_instructions to get sources config and system instruction 295 265 instructions = compose_instructions( 296 266 include_datetime=False, 297 267 config_overrides=instructions_config, ··· 300 270 system_prompt_name = instructions.get("system_prompt_name", "journal") 301 271 system_instruction = instructions["system_instruction"] 302 272 303 - # Track multi-segment mode for hook context 304 - multi_segment_mode = bool(args.segments) 273 + # Track multi-segment mode 274 + multi_segment_mode = bool(segments) 305 275 306 - # Choose clustering function based on mode, passing sources config 307 - if args.segments: 308 - segment_list = [s.strip() for s in args.segments.split(",")] 309 - try: 310 - markdown, file_count = cluster_segments_multi( 311 - args.day, segment_list, sources=sources 312 - ) 313 - except ValueError as e: 314 - parser.error(str(e)) 315 - elif args.segment: 316 - markdown, file_count = cluster_period(args.day, args.segment, sources=sources) 276 + # Build transcript via clustering 277 + if segments: 278 + markdown, file_count = cluster_segments_multi(day, segments, sources=sources) 279 + elif segment: 280 + markdown, file_count = cluster_period(day, segment, sources=sources) 317 281 else: 318 - markdown, file_count = cluster(args.day, sources=sources) 319 - day_dir = str(day_path(args.day)) 282 + markdown, file_count = cluster(day, sources=sources) 283 + 284 + day_dir = str(day_path(day)) 320 285 321 286 # Skip generation when there's nothing to analyze 322 287 if file_count == 0 or len(markdown.strip()) < MIN_INPUT_CHARS: 323 288 logging.info( 324 - "Insufficient input (files=%d, chars=%d), skipping generation", 289 + "Insufficient input (files=%d, chars=%d), skipping", 325 290 file_count, 326 291 len(markdown.strip()), 327 292 ) 328 - day_log(args.day, f"generate {get_output_topic(topic_arg)} skipped (no input)") 293 + emit_event( 294 + { 295 + "event": "finish", 296 + "ts": int(time.time() * 1000), 297 + "result": "", 298 + "skipped": "no_input", 299 + } 300 + ) 301 + day_log(day, f"generate {name} skipped (no input)") 329 302 return 330 303 331 304 # Prepend input context note for limited recordings ··· 336 309 ) 337 310 markdown = input_note + markdown 338 311 339 - try: 340 - if args.verbose: 341 - print("Verbose mode enabled") 342 - api_key = os.getenv("GOOGLE_API_KEY") 343 - if not api_key: 344 - parser.error("GOOGLE_API_KEY not found in environment") 312 + # Build context for template substitution 313 + prompt_context: dict[str, str] = { 314 + "day": day, 315 + "date": format_day(day), 316 + } 345 317 346 - # Build context for template substitution 347 - prompt_context: dict[str, str] = { 348 - "day": args.day, 349 - "date": format_day(args.day), 350 - } 318 + # Add segment context 319 + if segment: 320 + start_str, end_str = format_segment_times(segment) 321 + if start_str and end_str: 322 + prompt_context["segment"] = segment 323 + prompt_context["segment_start"] = start_str 324 + prompt_context["segment_end"] = end_str 325 + elif segments: 326 + all_times = [] 327 + for seg in segments: 328 + start_time, end_time = segment_parse(seg) 329 + if start_time and end_time: 330 + all_times.append((start_time, end_time)) 351 331 352 - # Add segment context 353 - if args.segment: 354 - # Single segment mode 355 - start_str, end_str = format_segment_times(args.segment) 356 - if start_str and end_str: 357 - prompt_context["segment"] = args.segment 358 - prompt_context["segment_start"] = start_str 359 - prompt_context["segment_end"] = end_str 360 - elif args.segments: 361 - # Multi-segment mode: compute earliest start and latest end 362 - segment_list = [s.strip() for s in args.segments.split(",")] 363 - all_times = [] 364 - for seg in segment_list: 365 - start_time, end_time = segment_parse(seg) 366 - if start_time and end_time: 367 - all_times.append((start_time, end_time)) 368 - 369 - if all_times: 370 - earliest_start = min(t[0] for t in all_times) 371 - latest_end = max(t[1] for t in all_times) 372 - # Use lstrip('0') for cross-platform compatibility (%-I is Unix-only) 373 - start_str = ( 374 - datetime.combine(datetime.today(), earliest_start) 375 - .strftime("%I:%M %p") 376 - .lstrip("0") 377 - ) 378 - end_str = ( 379 - datetime.combine(datetime.today(), latest_end) 380 - .strftime("%I:%M %p") 381 - .lstrip("0") 382 - ) 383 - prompt_context["segment_start"] = start_str 384 - prompt_context["segment_end"] = end_str 385 - 386 - try: 387 - agent_prompt = load_prompt( 388 - agent_path.stem, base_dir=agent_path.parent, context=prompt_context 332 + if all_times: 333 + earliest_start = min(t[0] for t in all_times) 334 + latest_end = max(t[1] for t in all_times) 335 + start_str = ( 336 + datetime.combine(datetime.today(), earliest_start) 337 + .strftime("%I:%M %p") 338 + .lstrip("0") 339 + ) 340 + end_str = ( 341 + datetime.combine(datetime.today(), latest_end) 342 + .strftime("%I:%M %p") 343 + .lstrip("0") 389 344 ) 390 - except PromptNotFoundError: 391 - parser.error(f"Agent file not found: {agent_path}") 345 + prompt_context["segment_start"] = start_str 346 + prompt_context["segment_end"] = end_str 392 347 393 - prompt = agent_prompt.text 348 + # Load prompt 349 + agent_prompt = load_prompt( 350 + agent_path.stem, base_dir=agent_path.parent, context=prompt_context 351 + ) 352 + prompt = agent_prompt.text 394 353 395 - # Resolve provider for display (must match context used in generate_agent_output) 396 - from think.models import resolve_provider 397 - 398 - display_output_type = "json" if output_format == "json" else "markdown" 399 - _, model = resolve_provider(f"agent.{name}.{display_output_type}") 400 - day = args.day 401 - size_kb = len(markdown.encode("utf-8")) / 1024 402 - 403 - print( 404 - f"Topic: {name} | Model: {model} | Day: {day} | Files: {file_count} | Size: {size_kb:.1f}KB" 354 + # Determine output path 355 + is_json_output = output_format == "json" 356 + if output_path_override: 357 + output_path = Path(output_path_override) 358 + else: 359 + output_path = get_output_path( 360 + day_dir, name, segment=segment, output_format=output_format 405 361 ) 406 362 407 - if args.count: 408 - count_tokens(markdown, prompt, api_key, model) 409 - return 363 + # Check if output exists (force check happens in cortex, but we handle it here too) 364 + output_exists = output_path.exists() and output_path.stat().st_size > 0 410 365 411 - is_json_output = output_format == "json" 366 + # Determine cache settings 367 + if multi_segment_mode: 368 + cache_display_name = None 369 + elif segment: 370 + cache_display_name = f"{system_prompt_name}_{day}_{segment}" 371 + else: 372 + cache_display_name = f"{system_prompt_name}_{day}" 412 373 413 - # Determine output path: -o overrides default for any mode 414 - if args.output: 415 - output_path = Path(args.output) 416 - else: 417 - output_path = get_output_path( 418 - day_dir, name, segment=args.segment, output_format=output_format 419 - ) 374 + # Extract generation parameters from metadata 375 + meta_thinking_budget = meta.get("thinking_budget") 376 + meta_max_output_tokens = meta.get("max_output_tokens") 420 377 421 - # Determine cache settings: skip for multi-segment, otherwise scope to day/segment 422 - # Include system prompt name in cache key for proper isolation 423 - if multi_segment_mode: 424 - cache_display_name = None 425 - elif args.segment: 426 - cache_display_name = f"{system_prompt_name}_{day}_{args.segment}" 427 - else: 428 - cache_display_name = f"{system_prompt_name}_{day}" 378 + # Get API key 379 + api_key = os.getenv("GOOGLE_API_KEY", "") 429 380 430 - # Check if output file already exists 431 - output_exists = output_path.exists() and output_path.stat().st_size > 0 381 + usage_data = None 432 382 433 - # Extract optional generation parameters from metadata 434 - meta_thinking_budget = meta.get("thinking_budget") 435 - meta_max_output_tokens = meta.get("max_output_tokens") 383 + if output_exists and not force: 384 + # Load existing content (no LLM call) 385 + logging.info("Output exists, loading: %s", output_path) 386 + with open(output_path, "r") as f: 387 + result = f.read() 388 + else: 389 + # Generate new content 390 + if output_exists and force: 391 + logging.info("Force regenerating: %s", output_path) 436 392 437 - if output_exists and not args.force: 438 - print( 439 - f"Output file already exists: {output_path}. Loading existing content." 440 - ) 441 - with open(output_path, "r") as f: 442 - result = f.read() 443 - elif output_exists and args.force: 444 - print("Output file exists but --force specified. Regenerating.") 445 - result = generate_agent_output( 446 - markdown, 447 - prompt, 448 - api_key, 449 - cache_display_name=cache_display_name, 450 - name=name, 451 - json_output=is_json_output, 452 - system_instruction=system_instruction, 453 - thinking_budget=meta_thinking_budget, 454 - max_output_tokens=meta_max_output_tokens, 455 - ) 456 - else: 457 - result = generate_agent_output( 458 - markdown, 459 - prompt, 460 - api_key, 461 - cache_display_name=cache_display_name, 462 - name=name, 463 - json_output=is_json_output, 464 - system_instruction=system_instruction, 465 - thinking_budget=meta_thinking_budget, 466 - max_output_tokens=meta_max_output_tokens, 467 - ) 393 + gen_result = generate_agent_output( 394 + markdown, 395 + prompt, 396 + api_key, 397 + cache_display_name=cache_display_name, 398 + name=name, 399 + json_output=is_json_output, 400 + system_instruction=system_instruction, 401 + thinking_budget=meta_thinking_budget, 402 + max_output_tokens=meta_max_output_tokens, 403 + return_result=True, 404 + ) 405 + result = gen_result["text"] 406 + usage_data = gen_result.get("usage") 468 407 469 - # Check if we got a valid response 470 - if result is None: 471 - print("Error: No text content in response") 472 - return 473 - 474 - # Run post-processing hook if present (only for newly generated results) 475 - if (not output_exists or args.force) and meta.get("hook_path"): 408 + # Run post-processing hook if present 409 + if meta.get("hook_path"): 476 410 hook_path = meta["hook_path"] 477 411 try: 478 412 hook_process = load_output_hook(hook_path) 479 413 hook_context = { 480 - "day": args.day, 481 - "segment": args.segment, 414 + "day": day, 415 + "segment": segment, 482 416 "multi_segment": multi_segment_mode, 483 417 "name": name, 484 418 "output_path": str(output_path), ··· 489 423 if hook_result is not None: 490 424 result = hook_result 491 425 logging.info("Hook %s transformed result", hook_path) 492 - else: 493 - logging.info( 494 - "Hook %s returned None, using original result", hook_path 495 - ) 496 426 except Exception as exc: 497 427 logging.error("Hook %s failed: %s", hook_path, exc) 498 - # Continue with original result on hook failure 428 + 429 + # Emit finish event with result (cortex handles file writing) 430 + finish_event = { 431 + "event": "finish", 432 + "ts": int(time.time() * 1000), 433 + "result": result, 434 + } 435 + if usage_data: 436 + finish_event["usage"] = usage_data 437 + 438 + emit_event(finish_event) 439 + 440 + # Log completion 441 + msg = f"generate {name} ok" 442 + if force: 443 + msg += " --force" 444 + day_log(day, msg) 445 + 446 + 447 + def main() -> None: 448 + """NDJSON-based CLI for generator pipeline. 449 + 450 + Reads config from stdin, emits JSONL events to stdout. 451 + Spawned by cortex when request has 'output' field (no 'tools'). 452 + """ 453 + import traceback 454 + 455 + # Configure basic logging (no argparse needed for NDJSON mode) 456 + logging.basicConfig(level=logging.INFO) 457 + 458 + # Always write to stdout only 459 + event_writer = JSONEventWriter(None) 460 + 461 + def emit_event(data: dict) -> None: 462 + if "ts" not in data: 463 + data["ts"] = int(time.time() * 1000) 464 + event_writer.emit(data) 465 + 466 + try: 467 + # NDJSON input mode from stdin 468 + for line in sys.stdin: 469 + line = line.strip() 470 + if not line: 471 + continue 499 472 500 - # Only write output if it was newly generated 501 - if not output_exists or args.force: 502 - os.makedirs(output_path.parent, exist_ok=True) 503 - with open(output_path, "w") as f: 504 - f.write(result) 505 - print(f"Results saved to: {output_path}") 473 + try: 474 + config = json.loads(line) 475 + 476 + _run_generator(config, emit_event) 506 477 507 - success = True 478 + except json.JSONDecodeError as e: 479 + emit_event( 480 + { 481 + "event": "error", 482 + "error": f"Invalid JSON: {str(e)}", 483 + "ts": int(time.time() * 1000), 484 + } 485 + ) 486 + except Exception as e: 487 + emit_event( 488 + { 489 + "event": "error", 490 + "error": str(e), 491 + "trace": traceback.format_exc(), 492 + "ts": int(time.time() * 1000), 493 + } 494 + ) 508 495 496 + except Exception as exc: 497 + emit_event( 498 + { 499 + "event": "error", 500 + "error": str(exc), 501 + "trace": traceback.format_exc(), 502 + } 503 + ) 504 + raise 509 505 finally: 510 - msg = f"generate {name} {'ok' if success else 'failed'}" 511 - if args.force: 512 - msg += " --force" 513 - day_log(args.day, msg) 506 + event_writer.close() 514 507 515 508 516 509 if __name__ == "__main__":
+37 -23
think/importer.py
··· 440 440 day: str, 441 441 segments: list[str], 442 442 ) -> bool: 443 - """Create a summary for imported segments using sol generate. 443 + """Create a summary for imported segments using cortex generator. 444 444 445 445 Args: 446 446 import_dir: Directory where the summary will be saved ··· 450 450 Returns: 451 451 True if summary was created successfully, False otherwise 452 452 """ 453 + from think.cortex_client import cortex_request, get_agent_end_state, wait_for_agents 454 + 453 455 if not segments: 454 456 logger.info("No segments to summarize") 455 457 return False 456 458 457 459 summary_path = import_dir / "summary.md" 458 - segments_arg = ",".join(segments) 459 - 460 - cmd = [ 461 - "sol", 462 - "insight", 463 - "importer", 464 - "--day", 465 - day, 466 - "--segments", 467 - segments_arg, 468 - "-o", 469 - str(summary_path), 470 - ] 471 460 472 461 try: 473 - logger.info(f"Creating summary for {len(segments)} segments via sol generate") 474 - subprocess.run(cmd, capture_output=True, text=True, check=True) 475 - if summary_path.exists(): 476 - logger.info(f"Created import summary: {summary_path}") 477 - return True 478 - else: 479 - logger.warning("sol generate completed but summary file not created") 462 + logger.info(f"Creating summary for {len(segments)} segments via cortex") 463 + 464 + # Spawn generator via cortex 465 + agent_id = cortex_request( 466 + prompt="", # Generators don't use prompt 467 + name="importer", 468 + config={ 469 + "day": day, 470 + "segments": segments, 471 + "output": "md", 472 + "output_path": str(summary_path), 473 + }, 474 + ) 475 + 476 + # Wait for completion 477 + completed, timed_out = wait_for_agents([agent_id], timeout=300) 478 + 479 + if timed_out: 480 + logger.error(f"Import summary timed out (ID: {agent_id})") 480 481 return False 481 - except subprocess.CalledProcessError as e: 482 - logger.error(f"Failed to create summary: {e.stderr}") 482 + 483 + if completed: 484 + end_state = get_agent_end_state(agent_id) 485 + if end_state == "finish" and summary_path.exists(): 486 + logger.info(f"Created import summary: {summary_path}") 487 + return True 488 + else: 489 + logger.warning( 490 + f"Generator ended with state {end_state}, " 491 + f"summary exists: {summary_path.exists()}" 492 + ) 493 + return False 494 + 495 + logger.error("Generator did not complete") 483 496 return False 497 + 484 498 except Exception as e: 485 499 logger.error(f"Failed to create summary: {e}") 486 500 return False
+53
think/models.py
··· 1019 1019 return result["text"] 1020 1020 1021 1021 1022 + def generate_with_result( 1023 + contents: Union[str, List[Any]], 1024 + context: str, 1025 + temperature: float = 0.3, 1026 + max_output_tokens: int = 8192 * 2, 1027 + system_instruction: Optional[str] = None, 1028 + json_output: bool = False, 1029 + thinking_budget: Optional[int] = None, 1030 + timeout_s: Optional[float] = None, 1031 + **kwargs: Any, 1032 + ) -> dict: 1033 + """Generate text and return full result with usage data. 1034 + 1035 + Same as generate() but returns the full GenerateResult dict instead of 1036 + just the text. Used by cortex-managed generators that need usage data 1037 + for event emission. 1038 + 1039 + Returns 1040 + ------- 1041 + dict 1042 + GenerateResult with: text, usage, finish_reason, thinking. 1043 + """ 1044 + from think.providers import get_provider_module 1045 + 1046 + model_override = kwargs.pop("model", None) 1047 + 1048 + provider, model = resolve_provider(context) 1049 + if model_override: 1050 + model = model_override 1051 + 1052 + provider_mod = get_provider_module(provider) 1053 + 1054 + result = provider_mod.run_generate( 1055 + contents=contents, 1056 + model=model, 1057 + temperature=temperature, 1058 + max_output_tokens=max_output_tokens, 1059 + system_instruction=system_instruction, 1060 + json_output=json_output, 1061 + thinking_budget=thinking_budget, 1062 + timeout_s=timeout_s, 1063 + **kwargs, 1064 + ) 1065 + 1066 + _validate_json_response(result, json_output) 1067 + 1068 + if result.get("usage"): 1069 + log_token_usage(model=model, usage=result["usage"], context=context) 1070 + 1071 + return result 1072 + 1073 + 1022 1074 async def agenerate( 1023 1075 contents: Union[str, List[Any]], 1024 1076 context: str, ··· 1116 1168 "CLAUDE_SONNET_4", 1117 1169 # Unified API 1118 1170 "generate", 1171 + "generate_with_result", 1119 1172 "agenerate", 1120 1173 "resolve_provider", 1121 1174 # Utilities