personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Switch agent completion tracking from polling to event-based

Replace file-based polling in wait_for_agents() with Callosum event
listening for cortex.finish and cortex.error events. This eliminates
the 1-second polling loop in favor of a fully event-driven approach.

Key changes:
- Rename get_agent_status() to get_agent_log_status() to clarify it's
file-based
- Add new wait_for_agents() to cortex_client.py using CallosumConnection
- Remove old polling-based wait_for_agents() from dream.py
- Initial file check as shortcut, final file check at timeout as backstop
- Log INFO message when event is missed but file check recovers

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+308 -80
+2 -2
apps/chat/routes.py
··· 240 240 """ 241 241 from think.cortex_client import ( 242 242 get_agent_end_state, 243 - get_agent_status, 243 + get_agent_log_status, 244 244 get_agent_thread, 245 245 read_agent_events, 246 246 ) ··· 272 272 end_state = None 273 273 can_continue = False 274 274 if thread: 275 - is_complete = get_agent_status(thread[-1]) == "completed" 275 + is_complete = get_agent_log_status(thread[-1]) == "completed" 276 276 if is_complete: 277 277 end_state = get_agent_end_state(thread[-1]) 278 278 can_continue = end_state == "finish"
+225 -16
tests/test_cortex_client.py
··· 18 18 cortex_agents, 19 19 cortex_request, 20 20 get_agent_end_state, 21 - get_agent_status, 21 + get_agent_log_status, 22 22 get_agent_thread, 23 + wait_for_agents, 23 24 ) 24 25 from think.models import GPT_5 25 26 ··· 142 143 143 144 agent_ids = [] 144 145 for i in range(3): 145 - agent_id = cortex_request( 146 - prompt=f"Test {i}", name="default", provider="openai" 147 - ) 146 + agent_id = cortex_request(prompt=f"Test {i}", name="default", provider="openai") 148 147 agent_ids.append(agent_id) 149 148 time.sleep(0.002) 150 149 ··· 307 306 os.environ["JOURNAL_PATH"] = old_path 308 307 309 308 310 - def test_get_agent_status_completed(tmp_path, monkeypatch): 311 - """Test get_agent_status returns 'completed' for finished agents.""" 309 + def test_get_agent_log_status_completed(tmp_path, monkeypatch): 310 + """Test get_agent_log_status returns 'completed' for finished agents.""" 312 311 monkeypatch.setenv("JOURNAL_PATH", str(tmp_path)) 313 312 agents_dir = tmp_path / "agents" 314 313 agents_dir.mkdir() ··· 316 315 agent_id = "1234567890123" 317 316 (agents_dir / f"{agent_id}.jsonl").write_text('{"event": "finish"}\n') 318 317 319 - assert get_agent_status(agent_id) == "completed" 318 + assert get_agent_log_status(agent_id) == "completed" 320 319 321 320 322 - def test_get_agent_status_running(tmp_path, monkeypatch): 323 - """Test get_agent_status returns 'running' for active agents.""" 321 + def test_get_agent_log_status_running(tmp_path, monkeypatch): 322 + """Test get_agent_log_status returns 'running' for active agents.""" 324 323 monkeypatch.setenv("JOURNAL_PATH", str(tmp_path)) 325 324 agents_dir = tmp_path / "agents" 326 325 agents_dir.mkdir() ··· 328 327 agent_id = "1234567890123" 329 328 (agents_dir / f"{agent_id}_active.jsonl").write_text('{"event": "start"}\n') 330 329 331 - assert get_agent_status(agent_id) == "running" 330 + assert get_agent_log_status(agent_id) == "running" 332 331 333 332 334 - def test_get_agent_status_not_found(tmp_path, monkeypatch): 335 - """Test get_agent_status returns 'not_found' for missing agents.""" 333 + def test_get_agent_log_status_not_found(tmp_path, monkeypatch): 334 + """Test get_agent_log_status returns 'not_found' for missing agents.""" 336 335 monkeypatch.setenv("JOURNAL_PATH", str(tmp_path)) 337 336 (tmp_path / "agents").mkdir() 338 337 339 - assert get_agent_status("nonexistent") == "not_found" 338 + assert get_agent_log_status("nonexistent") == "not_found" 340 339 341 340 342 - def test_get_agent_status_prefers_completed(tmp_path, monkeypatch): 343 - """Test get_agent_status returns 'completed' when both files exist.""" 341 + def test_get_agent_log_status_prefers_completed(tmp_path, monkeypatch): 342 + """Test get_agent_log_status returns 'completed' when both files exist.""" 344 343 monkeypatch.setenv("JOURNAL_PATH", str(tmp_path)) 345 344 agents_dir = tmp_path / "agents" 346 345 agents_dir.mkdir() ··· 350 349 (agents_dir / f"{agent_id}.jsonl").write_text('{"event": "finish"}\n') 351 350 (agents_dir / f"{agent_id}_active.jsonl").write_text('{"event": "start"}\n') 352 351 353 - assert get_agent_status(agent_id) == "completed" 352 + assert get_agent_log_status(agent_id) == "completed" 354 353 355 354 356 355 def test_get_agent_end_state_finish(tmp_path, monkeypatch): ··· 502 501 503 502 with pytest.raises(FileNotFoundError): 504 503 get_agent_thread("nonexistent") 504 + 505 + 506 + # Tests for wait_for_agents 507 + 508 + 509 + def test_wait_for_agents_already_complete(tmp_path, monkeypatch): 510 + """Test wait_for_agents returns immediately if agents already completed.""" 511 + monkeypatch.setenv("JOURNAL_PATH", str(tmp_path)) 512 + agents_dir = tmp_path / "agents" 513 + agents_dir.mkdir() 514 + (tmp_path / "health").mkdir() 515 + 516 + # Create completed agents 517 + agent_ids = ["1000", "2000"] 518 + for agent_id in agent_ids: 519 + (agents_dir / f"{agent_id}.jsonl").write_text('{"event": "finish"}\n') 520 + 521 + completed, timed_out = wait_for_agents(agent_ids, timeout=1) 522 + 523 + assert set(completed) == set(agent_ids) 524 + assert timed_out == [] 525 + 526 + 527 + def test_wait_for_agents_event_completion(callosum_server): 528 + """Test wait_for_agents completes when finish event is received.""" 529 + tmp_path = callosum_server 530 + agents_dir = tmp_path / "agents" 531 + 532 + agent_id = "1234567890123" 533 + 534 + # Start wait in background thread 535 + result = {"completed": None, "timed_out": None} 536 + 537 + def wait_thread(): 538 + result["completed"], result["timed_out"] = wait_for_agents( 539 + [agent_id], timeout=5 540 + ) 541 + 542 + waiter = threading.Thread(target=wait_thread) 543 + waiter.start() 544 + 545 + # Give the waiter time to set up listener 546 + time.sleep(0.2) 547 + 548 + # Create the completed file and emit finish event 549 + (agents_dir / f"{agent_id}.jsonl").write_text('{"event": "finish"}\n') 550 + 551 + # Emit finish event via Callosum 552 + client = CallosumConnection() 553 + client.start() 554 + time.sleep(0.1) 555 + client.emit("cortex", "finish", agent_id=agent_id, result="done") 556 + time.sleep(0.2) 557 + client.stop() 558 + 559 + waiter.join(timeout=3) 560 + 561 + assert result["completed"] == [agent_id] 562 + assert result["timed_out"] == [] 563 + 564 + 565 + def test_wait_for_agents_error_event(callosum_server): 566 + """Test wait_for_agents completes on error event too.""" 567 + tmp_path = callosum_server 568 + agents_dir = tmp_path / "agents" 569 + 570 + agent_id = "1234567890124" 571 + 572 + result = {"completed": None, "timed_out": None} 573 + 574 + def wait_thread(): 575 + result["completed"], result["timed_out"] = wait_for_agents( 576 + [agent_id], timeout=5 577 + ) 578 + 579 + waiter = threading.Thread(target=wait_thread) 580 + waiter.start() 581 + time.sleep(0.2) 582 + 583 + # Create completed file and emit error event 584 + (agents_dir / f"{agent_id}.jsonl").write_text('{"event": "error"}\n') 585 + 586 + client = CallosumConnection() 587 + client.start() 588 + time.sleep(0.1) 589 + client.emit("cortex", "error", agent_id=agent_id, error="something failed") 590 + time.sleep(0.2) 591 + client.stop() 592 + 593 + waiter.join(timeout=3) 594 + 595 + assert result["completed"] == [agent_id] 596 + assert result["timed_out"] == [] 597 + 598 + 599 + def test_wait_for_agents_initial_file_check(tmp_path, monkeypatch): 600 + """Test wait_for_agents finds already-completed agents via initial file check.""" 601 + monkeypatch.setenv("JOURNAL_PATH", str(tmp_path)) 602 + agents_dir = tmp_path / "agents" 603 + agents_dir.mkdir() 604 + (tmp_path / "health").mkdir() 605 + 606 + agent_id = "1234567890125" 607 + 608 + # Agent already completed before we start waiting 609 + (agents_dir / f"{agent_id}.jsonl").write_text('{"event": "finish"}\n') 610 + 611 + completed, timed_out = wait_for_agents([agent_id], timeout=1) 612 + 613 + # Should find via initial file check 614 + assert completed == [agent_id] 615 + assert timed_out == [] 616 + 617 + 618 + def test_wait_for_agents_timeout_actual(tmp_path, monkeypatch): 619 + """Test wait_for_agents times out for agents that never complete.""" 620 + monkeypatch.setenv("JOURNAL_PATH", str(tmp_path)) 621 + agents_dir = tmp_path / "agents" 622 + agents_dir.mkdir() 623 + (tmp_path / "health").mkdir() 624 + 625 + agent_id = "1234567890126" 626 + # Create active file (not completed) 627 + (agents_dir / f"{agent_id}_active.jsonl").write_text('{"event": "start"}\n') 628 + 629 + completed, timed_out = wait_for_agents([agent_id], timeout=1) 630 + 631 + assert completed == [] 632 + assert timed_out == [agent_id] 633 + 634 + 635 + def test_wait_for_agents_partial(callosum_server): 636 + """Test wait_for_agents with some completing and some timing out.""" 637 + tmp_path = callosum_server 638 + agents_dir = tmp_path / "agents" 639 + 640 + completing_agent = "1111" 641 + timeout_agent = "2222" 642 + 643 + # Create active file for timeout agent 644 + (agents_dir / f"{timeout_agent}_active.jsonl").write_text('{"event": "start"}\n') 645 + 646 + result = {"completed": None, "timed_out": None} 647 + 648 + def wait_thread(): 649 + result["completed"], result["timed_out"] = wait_for_agents( 650 + [completing_agent, timeout_agent], timeout=2 651 + ) 652 + 653 + waiter = threading.Thread(target=wait_thread) 654 + waiter.start() 655 + time.sleep(0.2) 656 + 657 + # Complete one agent 658 + (agents_dir / f"{completing_agent}.jsonl").write_text('{"event": "finish"}\n') 659 + 660 + client = CallosumConnection() 661 + client.start() 662 + time.sleep(0.1) 663 + client.emit("cortex", "finish", agent_id=completing_agent, result="done") 664 + time.sleep(0.1) 665 + client.stop() 666 + 667 + waiter.join(timeout=5) 668 + 669 + assert result["completed"] == [completing_agent] 670 + assert result["timed_out"] == [timeout_agent] 671 + 672 + 673 + def test_wait_for_agents_missed_event_recovery(tmp_path, monkeypatch, caplog): 674 + """Test that missed events are recovered via final file check with INFO log.""" 675 + import logging 676 + 677 + monkeypatch.setenv("JOURNAL_PATH", str(tmp_path)) 678 + agents_dir = tmp_path / "agents" 679 + agents_dir.mkdir() 680 + (tmp_path / "health").mkdir() 681 + 682 + agent_id = "1234567890127" 683 + 684 + # Start with active file 685 + (agents_dir / f"{agent_id}_active.jsonl").write_text('{"event": "start"}\n') 686 + 687 + result = {"completed": None, "timed_out": None} 688 + 689 + def wait_and_complete(): 690 + # Wait a bit then "complete" the agent by renaming file 691 + time.sleep(0.3) 692 + (agents_dir / f"{agent_id}_active.jsonl").unlink() 693 + (agents_dir / f"{agent_id}.jsonl").write_text('{"event": "finish"}\n') 694 + 695 + completer = threading.Thread(target=wait_and_complete) 696 + completer.start() 697 + 698 + with caplog.at_level(logging.INFO): 699 + result["completed"], result["timed_out"] = wait_for_agents( 700 + [agent_id], timeout=1 701 + ) 702 + 703 + completer.join() 704 + 705 + # Should recover via final file check 706 + assert result["completed"] == [agent_id] 707 + assert result["timed_out"] == [] 708 + 709 + # Should log about missed event 710 + assert any( 711 + "completion event not received but agent completed" in record.message 712 + for record in caplog.records 713 + )
+2 -2
think/cortex.py
··· 291 291 # Validate and link continue_from if specified 292 292 continue_from = request.get("continue_from") 293 293 if continue_from: 294 - from think.cortex_client import get_agent_status 294 + from think.cortex_client import get_agent_log_status 295 295 296 - status = get_agent_status(continue_from) 296 + status = get_agent_log_status(continue_from) 297 297 if status != "completed": 298 298 error_msg = f"Cannot continue from agent {continue_from}: " + ( 299 299 "agent is still running"
+78 -5
think/cortex_client.py
··· 5 5 6 6 import json 7 7 import logging 8 - import os 8 + import threading 9 9 import time 10 10 from pathlib import Path 11 11 from typing import Any, Dict, Optional 12 12 13 - from think.callosum import callosum_send 13 + from think.callosum import CallosumConnection, callosum_send 14 14 from think.utils import get_journal 15 15 16 16 logger = logging.getLogger(__name__) ··· 128 128 return agent_id 129 129 130 130 131 - def get_agent_status(agent_id: str) -> str: 132 - """Get the status of a specific agent. 131 + def get_agent_log_status(agent_id: str) -> str: 132 + """Get the status of a specific agent from its log file. 133 133 134 134 Args: 135 135 agent_id: The agent ID (timestamp) ··· 148 148 return "not_found" 149 149 150 150 151 + def wait_for_agents( 152 + agent_ids: list[str], 153 + timeout: int = 600, 154 + ) -> tuple[list[str], list[str]]: 155 + """Wait for agents to complete via Callosum events. 156 + 157 + Listens for cortex.finish and cortex.error events. Sets up the event 158 + listener first, then does an initial file check for agents that may have 159 + already completed, and a final file check at timeout as a backstop for 160 + any missed events. 161 + 162 + Args: 163 + agent_ids: List of agent IDs to wait for 164 + timeout: Maximum wait time in seconds (default 600 = 10 minutes) 165 + 166 + Returns: 167 + Tuple of (completed_ids, timed_out_ids) 168 + """ 169 + pending = set(agent_ids) 170 + completed: list[str] = [] 171 + lock = threading.Lock() 172 + all_done = threading.Event() 173 + 174 + def on_message(msg: dict) -> None: 175 + if msg.get("tract") != "cortex": 176 + return 177 + agent_id = msg.get("agent_id") 178 + if not agent_id: 179 + return 180 + 181 + event_type = msg.get("event") 182 + if event_type in ("finish", "error"): 183 + with lock: 184 + if agent_id in pending: 185 + completed.append(agent_id) 186 + pending.discard(agent_id) 187 + if not pending: 188 + all_done.set() 189 + 190 + # Start listener BEFORE initial check to avoid race condition 191 + listener = CallosumConnection() 192 + listener.start(callback=on_message) 193 + 194 + try: 195 + # Initial file check (with lock since callback may be running) 196 + with lock: 197 + for agent_id in list(pending): 198 + if get_agent_log_status(agent_id) == "completed": 199 + completed.append(agent_id) 200 + pending.discard(agent_id) 201 + 202 + if not pending: 203 + return completed, [] 204 + 205 + # Wait for all completions or timeout 206 + all_done.wait(timeout=timeout) 207 + 208 + finally: 209 + listener.stop() 210 + 211 + # Final file check for any remaining (backstop for missed events) 212 + # Listener is stopped, so no lock needed 213 + for agent_id in list(pending): 214 + if get_agent_log_status(agent_id) == "completed": 215 + logger.info( 216 + f"Agent {agent_id} completion event not received but agent completed" 217 + ) 218 + completed.append(agent_id) 219 + pending.discard(agent_id) 220 + 221 + return completed, list(pending) 222 + 223 + 151 224 def get_agent_end_state(agent_id: str) -> str: 152 225 """Get how a completed agent ended (finish or error). 153 226 ··· 160 233 "running" - Agent is still active 161 234 "unknown" - Agent file exists but no terminal event found 162 235 """ 163 - status = get_agent_status(agent_id) 236 + status = get_agent_log_status(agent_id) 164 237 if status == "running": 165 238 return "running" 166 239 if status == "not_found":
+1 -55
think/dream.py
··· 9 9 from pathlib import Path 10 10 11 11 from think.callosum import CallosumConnection 12 - from think.cortex_client import cortex_request, get_agent_status 12 + from think.cortex_client import cortex_request, wait_for_agents 13 13 from think.facets import get_active_facets, get_facets 14 14 from think.runner import run_task 15 15 from think.utils import ( ··· 214 214 """ 215 215 socket_path = Path(get_journal()) / "health" / "callosum.sock" 216 216 return socket_path.exists() 217 - 218 - 219 - def wait_for_agents( 220 - agent_ids: list[str], timeout: int = 600, startup_grace: int = 15 221 - ) -> tuple[list[str], list[str]]: 222 - """Poll until all agents complete or timeout. 223 - 224 - Polls get_agent_status() every 1 second. Agents are spawned asynchronously 225 - via Callosum, so there's a brief window where the agent file doesn't exist 226 - yet. We use a startup grace period to tolerate "not_found" status initially. 227 - 228 - Args: 229 - agent_ids: List of agent IDs to wait for 230 - timeout: Maximum wait time in seconds (default 600 = 10 minutes) 231 - startup_grace: Seconds to wait for agent files to appear (default 15) 232 - 233 - Returns: 234 - Tuple of (completed_ids, timed_out_ids) 235 - """ 236 - start = time.time() 237 - pending_startup = set(agent_ids) # Agents we haven't seen a file for yet 238 - pending_running = set() # Agents we've seen start (file exists) 239 - completed = [] 240 - 241 - while (pending_startup or pending_running) and (time.time() - start) < timeout: 242 - elapsed = time.time() - start 243 - 244 - # Check agents still in startup phase 245 - for agent_id in list(pending_startup): 246 - status = get_agent_status(agent_id) 247 - if status == "completed": 248 - completed.append(agent_id) 249 - pending_startup.discard(agent_id) 250 - elif status == "running": 251 - # File appeared - move to running set 252 - pending_startup.discard(agent_id) 253 - pending_running.add(agent_id) 254 - elif status == "not_found" and elapsed >= startup_grace: 255 - # Grace period expired - agent never started 256 - logging.warning(f"Agent {agent_id} not found (never started)") 257 - pending_startup.discard(agent_id) 258 - 259 - # Check agents that are running 260 - for agent_id in list(pending_running): 261 - status = get_agent_status(agent_id) 262 - if status == "completed": 263 - completed.append(agent_id) 264 - pending_running.discard(agent_id) 265 - 266 - if pending_startup or pending_running: 267 - time.sleep(1) 268 - 269 - timed_out = list(pending_startup | pending_running) 270 - return completed, timed_out 271 217 272 218 273 219 def run_daily_agents(day: str) -> tuple[int, int]: