personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at d18a7c02359cd827d0ff15058861de5c2600a96f 832 lines 34 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""Callosum-based talent process manager for solstone. 5 6Cortex listens for talent requests via the Callosum message bus and manages 7talent process lifecycle: 8- Receives requests via Callosum (tract="cortex", event="request") 9- Creates <talent>/<timestamp>_active.jsonl files to track active uses 10- Spawns talent processes and captures their stdout events 11- Broadcasts all talent events back to Callosum 12- Renames to <talent>/<timestamp>.jsonl when complete 13 14Talent files provide persistence and historical record, while Callosum provides 15real-time event distribution to all interested services. 16""" 17 18from __future__ import annotations 19 20import json 21import logging 22import os 23import subprocess 24import sys 25import threading 26import time 27from pathlib import Path 28from typing import Any, Dict, Optional 29 30from think.callosum import CallosumConnection 31from think.runner import _atomic_symlink 32from think.talents import TALENT_EXECUTION_MODULE 33from think.utils import get_journal, get_project_root, get_rev, now_ms 34 35 36class TalentProcess: 37 """Manages a running talent subprocess.""" 38 39 def __init__(self, use_id: str, process: subprocess.Popen, log_path: Path): 40 self.use_id = use_id 41 self.process = process 42 self.log_path = log_path 43 self.stop_event = threading.Event() 44 self.timeout_timer = None # For timeout support 45 self.start_time = time.time() # Track when agent started 46 47 def is_running(self) -> bool: 48 """Check if the agent process is still running.""" 49 return self.process.poll() is None and not self.stop_event.is_set() 50 51 def stop(self) -> None: 52 """Stop the agent process gracefully.""" 53 self.stop_event.set() 54 55 # Cancel timeout timer if it exists 56 if self.timeout_timer: 57 self.timeout_timer.cancel() 58 59 if self.process.poll() is None: 60 # First try SIGTERM for graceful shutdown 61 self.process.terminate() 62 try: 63 self.process.wait(timeout=10) # Give more time for graceful shutdown 64 except subprocess.TimeoutExpired: 65 logging.getLogger(__name__).warning( 66 f"Talent {self.use_id} didn't stop gracefully, killing" 67 ) 68 self.process.kill() 69 self.process.wait() # Ensure zombie is reaped 70 71 72class CortexService: 73 """Callosum-based talent process manager.""" 74 75 def __init__(self, journal_path: Optional[str] = None): 76 self.journal_path = Path(journal_path or get_journal()) 77 self.talents_dir = self.journal_path / "talents" 78 self.talents_dir.mkdir(parents=True, exist_ok=True) 79 80 self.logger = logging.getLogger(__name__) 81 self.running_uses: Dict[str, TalentProcess] = {} 82 self.use_requests: Dict[str, Dict[str, Any]] = {} # Store use requests 83 self.lock = threading.RLock() 84 self.stop_event = threading.Event() 85 self.shutdown_requested = threading.Event() 86 87 # Callosum connection for receiving requests and broadcasting events 88 self.callosum = CallosumConnection(defaults={"rev": get_rev()}) 89 90 def _create_error_event( 91 self, 92 use_id: str, 93 error: str, 94 trace: Optional[str] = None, 95 exit_code: Optional[int] = None, 96 ) -> Dict[str, Any]: 97 """Create standardized error event.""" 98 event = { 99 "event": "error", 100 "ts": now_ms(), 101 "use_id": use_id, 102 "error": error, 103 } 104 if trace: 105 event["trace"] = trace 106 if exit_code is not None: 107 event["exit_code"] = exit_code 108 return event 109 110 def _recover_orphaned_uses(self, active_files: list) -> None: 111 """Recover orphaned active talent files from a previous crash. 112 113 Appends an error event to each file and renames to completed. 114 """ 115 for file_path in active_files: 116 use_id = file_path.stem.replace("_active", "") 117 try: 118 error_event = self._create_error_event( 119 use_id, "Recovered: Cortex restarted while talent was running" 120 ) 121 with open(file_path, "a") as f: 122 f.write(json.dumps(error_event) + "\n") 123 124 completed_path = file_path.parent / f"{use_id}.jsonl" 125 file_path.rename(completed_path) 126 self.logger.warning(f"Recovered orphaned talent: {use_id}") 127 except Exception as e: 128 self.logger.error(f"Failed to recover talent {use_id}: {e}") 129 130 def start(self) -> None: 131 """Start listening for talent requests via Callosum.""" 132 # Recover any orphaned active files from previous crash 133 active_files = list(self.talents_dir.glob("*/*_active.jsonl")) 134 if active_files: 135 self.logger.warning( 136 f"Found {len(active_files)} orphaned talent use(s), recovering..." 137 ) 138 self._recover_orphaned_uses(active_files) 139 140 # Connect to Callosum to receive requests 141 try: 142 self.callosum.start(callback=self._handle_callosum_message) 143 self.logger.info("Connected to Callosum message bus") 144 self.callosum.emit( 145 "supervisor", "request", cmd=["sol", "providers", "check"] 146 ) 147 self.logger.info("Requested providers health check via supervisor") 148 except Exception as e: 149 self.logger.error(f"Failed to connect to Callosum: {e}") 150 sys.exit(1) 151 152 # Start status emission thread 153 threading.Thread( 154 target=self._emit_periodic_status, 155 name="cortex-status", 156 daemon=True, 157 ).start() 158 159 self.logger.info("Cortex service started, listening for talent requests") 160 161 while True: 162 try: 163 while not self.stop_event.is_set(): 164 time.sleep(1) 165 # Exit when idle during shutdown 166 if self.shutdown_requested.is_set(): 167 with self.lock: 168 if len(self.running_uses) == 0: 169 self.logger.info( 170 "No talent uses running, exiting gracefully" 171 ) 172 return 173 break 174 except KeyboardInterrupt: 175 self.logger.info("Shutdown requested, will exit when idle") 176 self.shutdown_requested.set() 177 178 def _handle_callosum_message(self, message: Dict[str, Any]) -> None: 179 """Handle incoming Callosum messages (callback).""" 180 # Filter for cortex tract and request event 181 if message.get("tract") != "cortex" or message.get("event") != "request": 182 return 183 184 # Handle the request 185 try: 186 self._handle_request(message) 187 except Exception as e: 188 self.logger.exception(f"Error handling request: {e}") 189 190 def _handle_request(self, request: Dict[str, Any]) -> None: 191 """Handle a new talent request from Callosum. 192 193 Cortex is a minimal process manager - it only handles: 194 - File lifecycle (<talent>/<id>_active.jsonl -> <talent>/<id>.jsonl) 195 - Process spawning and monitoring 196 - Event relay to Callosum 197 198 All config loading, validation, and hydration is done by think.talents. 199 Cortex only resolves talent cwd early so the child process starts in 200 the correct working directory. 201 """ 202 use_id = request.get("use_id") 203 if not use_id: 204 self.logger.error("Received request without use_id") 205 return 206 207 # Skip if this use is already being processed 208 with self.lock: 209 if use_id in self.running_uses: 210 self.logger.debug(f"Talent use {use_id} already running, skipping") 211 return 212 213 # Create _active.jsonl file (exclusive creation to prevent race conditions) 214 name = request.get("name", "unified") 215 safe_name = name.replace(":", "--") 216 talent_subdir = self.talents_dir / safe_name 217 talent_subdir.mkdir(parents=True, exist_ok=True) 218 file_path = talent_subdir / f"{use_id}_active.jsonl" 219 if file_path.exists(): 220 self.logger.debug(f"Talent use {use_id} already claimed by another process") 221 return 222 223 try: 224 with open(file_path, "x") as f: # 'x' mode fails if file exists 225 f.write(json.dumps(request) + "\n") 226 except FileExistsError: 227 return 228 229 self.logger.info(f"Processing talent request: {use_id}") 230 231 # Store request for later use (output writing) 232 with self.lock: 233 self.use_requests[use_id] = request 234 235 # Spawn talent process - it handles all validation/hydration 236 try: 237 self._spawn_subprocess( 238 use_id, 239 file_path, 240 request, 241 [sys.executable, "-m", TALENT_EXECUTION_MODULE], 242 "talent", 243 ) 244 except Exception as e: 245 self.logger.exception(f"Failed to spawn talent {use_id}: {e}") 246 self._write_error_and_complete(file_path, f"Failed to spawn talent: {e}") 247 248 def _spawn_subprocess( 249 self, 250 use_id: str, 251 file_path: Path, 252 config: Dict[str, Any], 253 cmd: list[str], 254 process_type: str, 255 ) -> None: 256 """Spawn a subprocess and monitor its output. 257 258 Args: 259 use_id: Unique identifier for this process 260 file_path: Path to the JSONL log file 261 config: Configuration dict to pass via NDJSON stdin 262 cmd: Command to run (e.g., [sys.executable, "-m", TALENT_EXECUTION_MODULE]) 263 process_type: Label for logging ("talent") 264 """ 265 try: 266 # Store the config for later use - thread safe 267 with self.lock: 268 self.use_requests[use_id] = config 269 270 # Pass the full config through as NDJSON 271 ndjson_input = json.dumps(config) 272 273 # Prepare environment 274 env = os.environ.copy() 275 276 # Promote top-level config keys to environment so tools can read 277 # them as defaults (e.g., sol call todos add uses SOL_FACET). 278 # Explicit env overrides below take precedence. 279 if config.get("facet"): 280 env["SOL_FACET"] = str(config["facet"]) 281 if config.get("day"): 282 env["SOL_DAY"] = str(config["day"]) 283 284 # Apply explicit env overrides (from thinking.py etc.) — these win 285 env_overrides = config.get("env") 286 if env_overrides and isinstance(env_overrides, dict): 287 env.update({k: str(v) for k, v in env_overrides.items()}) 288 289 # Spawn the subprocess 290 self.logger.info(f"Spawning {process_type} {use_id}: {cmd}") 291 self.logger.debug(f"NDJSON input: {ndjson_input}") 292 subprocess_cwd = None 293 if process_type == "talent": 294 from think.talent import get_talent 295 296 talent_key = str(config.get("name", "unified")) 297 talent_config = get_talent(talent_key) 298 if talent_config.get("type") == "cogitate": 299 # Resolve here because prepare_config() runs inside think.talents. 300 cwd_value = talent_config.get("cwd") 301 if cwd_value == "journal": 302 try: 303 subprocess_cwd = str(Path(get_journal())) 304 except Exception as exc: 305 raise RuntimeError( 306 f"Cannot resolve cwd for talent '{talent_key}'" 307 ) from exc 308 elif cwd_value == "repo": 309 subprocess_cwd = get_project_root() 310 else: 311 raise RuntimeError( 312 f"Cannot resolve cwd for talent '{talent_key}'" 313 ) 314 315 process = subprocess.Popen( 316 cmd, 317 stdin=subprocess.PIPE, 318 stdout=subprocess.PIPE, 319 stderr=subprocess.PIPE, 320 text=True, 321 env=env, 322 bufsize=1, 323 cwd=subprocess_cwd, 324 ) 325 326 # Send input and close stdin 327 process.stdin.write(ndjson_input + "\n") 328 process.stdin.close() 329 330 # Track the running process 331 agent = TalentProcess(use_id, process, file_path) 332 with self.lock: 333 self.running_uses[use_id] = agent 334 335 # Set up timeout (default to 10 minutes if not specified) 336 timeout_seconds = config.get("timeout_seconds", 600) 337 agent.timeout_timer = threading.Timer( 338 timeout_seconds, 339 lambda: self._timeout_talent(use_id, agent, timeout_seconds), 340 ) 341 agent.timeout_timer.start() 342 343 # Start monitoring threads 344 threading.Thread( 345 target=self._monitor_stdout, args=(agent,), daemon=True 346 ).start() 347 348 threading.Thread( 349 target=self._monitor_stderr, args=(agent,), daemon=True 350 ).start() 351 352 self.logger.info( 353 f"{process_type.capitalize()} {use_id} spawned successfully " 354 f"(PID: {process.pid})" 355 ) 356 357 except Exception as e: 358 self.logger.exception(f"Failed to spawn {process_type} {use_id}: {e}") 359 self._write_error_and_complete( 360 file_path, f"Failed to spawn {process_type}: {e}" 361 ) 362 363 def _timeout_talent( 364 self, use_id: str, agent: TalentProcess, timeout_seconds: int 365 ) -> None: 366 """Handle talent timeout.""" 367 if agent.is_running(): 368 self.logger.warning( 369 f"Talent {use_id} timed out after {timeout_seconds} seconds" 370 ) 371 error_event = self._create_error_event( 372 use_id, f"Talent timed out after {timeout_seconds} seconds" 373 ) 374 try: 375 with open(agent.log_path, "a") as f: 376 f.write(json.dumps(error_event) + "\n") 377 except Exception as e: 378 self.logger.error(f"Failed to write timeout event: {e}") 379 380 # Broadcast to callosum so wait_for_uses detects immediately 381 try: 382 event_copy = error_event.copy() 383 event_type = event_copy.pop("event", "error") 384 self.callosum.emit("cortex", event_type, **event_copy) 385 except Exception: 386 pass 387 388 agent.stop() 389 390 def _monitor_stdout(self, agent: TalentProcess) -> None: 391 """Monitor talent stdout and append events to the JSONL file.""" 392 if not agent.process.stdout: 393 return 394 395 try: 396 with agent.process.stdout: 397 for line in agent.process.stdout: 398 if not line: 399 continue 400 401 line = line.strip() 402 if not line: 403 continue 404 405 try: 406 # Parse JSON event 407 event = json.loads(line) 408 409 # Ensure event has timestamp and use_id 410 if "ts" not in event: 411 event["ts"] = now_ms() 412 if "use_id" not in event: 413 event["use_id"] = agent.use_id 414 415 # Inject agent name for WebSocket consumers 416 with self.lock: 417 _req = self.use_requests.get(agent.use_id) 418 if _req and "name" not in event: 419 event["name"] = _req.get("name", "") 420 # Inject display mode for triage talent finish events 421 if event.get("event") == "finish" and _req: 422 try: 423 from apps.home.events import TRIAGE_AGENT_NAMES 424 from convey.triage import compute_display_mode 425 426 if _req.get("name", "") in TRIAGE_AGENT_NAMES: 427 event["display"] = compute_display_mode( 428 event.get("result", "") 429 ) 430 except Exception: 431 pass # Display is cosmetic; don't break finish handling 432 433 # Append to JSONL file 434 with open(agent.log_path, "a") as f: 435 f.write(json.dumps(event) + "\n") 436 437 # Broadcast event to Callosum 438 try: 439 event_copy = event.copy() 440 event_type = event_copy.pop("event", "unknown") 441 self.callosum.emit("cortex", event_type, **event_copy) 442 except Exception as e: 443 self.logger.info( 444 f"Failed to broadcast event to Callosum: {e}" 445 ) 446 447 # Handle start event 448 if event.get("event") == "start": 449 # Capture model and provider for status reporting 450 with self.lock: 451 if agent.use_id in self.use_requests: 452 model = event.get("model") 453 if model: 454 self.use_requests[agent.use_id]["model"] = model 455 provider = event.get("provider") 456 if provider: 457 self.use_requests[agent.use_id]["provider"] = ( 458 provider 459 ) 460 461 # Handle finish or error event 462 if event.get("event") in ["finish", "error"]: 463 # Check for output (only on finish) 464 if event.get("event") == "finish": 465 result = event.get("result", "") 466 467 # Get original request (thread-safe access) 468 with self.lock: 469 original_request = self.use_requests.get( 470 agent.use_id 471 ) 472 473 # Log token usage if available 474 usage_data = event.get("usage") 475 if usage_data and original_request: 476 try: 477 from think.models import log_token_usage 478 from think.talent import key_to_context 479 480 model = original_request.get("model", "unknown") 481 name = original_request.get("name", "unknown") 482 context = key_to_context(name) 483 484 # Extract segment from env if set (flat merge puts env at top level) 485 env_config = original_request.get("env", {}) 486 segment = ( 487 env_config.get("SOL_SEGMENT") 488 if env_config 489 else None 490 ) 491 492 log_token_usage( 493 model=model, 494 usage=usage_data, 495 context=context, 496 segment=segment, 497 type="cogitate", 498 ) 499 except Exception as e: 500 self.logger.warning( 501 f"Failed to log token usage for talent {agent.use_id}: {e}" 502 ) 503 504 # Write output if requested 505 if original_request and original_request.get("output"): 506 self._write_output( 507 agent.use_id, 508 result, 509 original_request, 510 ) 511 512 # Break to trigger cleanup 513 break 514 515 except json.JSONDecodeError: 516 # Non-JSON output becomes info event 517 info_event = { 518 "event": "info", 519 "ts": now_ms(), 520 "message": line, 521 "use_id": agent.use_id, 522 } 523 with open(agent.log_path, "a") as f: 524 f.write(json.dumps(info_event) + "\n") 525 526 except Exception as e: 527 self.logger.error(f"Error monitoring stdout for agent {agent.use_id}: {e}") 528 finally: 529 # Wait for process to fully exit (reaps zombie) 530 exit_code = agent.process.wait() 531 self.logger.info(f"Talent {agent.use_id} exited with code {exit_code}") 532 533 # Check if finish event was emitted 534 has_finish = self._has_finish_event(agent.log_path) 535 536 if not has_finish: 537 # Write error event if no finish using standardized format 538 error_event = self._create_error_event( 539 agent.use_id, 540 f"Talent exited with code {exit_code} without finish event", 541 exit_code=exit_code, 542 ) 543 with open(agent.log_path, "a") as f: 544 f.write(json.dumps(error_event) + "\n") 545 546 # Complete the file (rename from _active.jsonl to .jsonl) 547 self._complete_use_file(agent.use_id, agent.log_path) 548 549 # Remove from running agents and clean up stored request (thread-safe) 550 with self.lock: 551 if agent.use_id in self.running_uses: 552 del self.running_uses[agent.use_id] 553 # Clean up stored request 554 if agent.use_id in self.use_requests: 555 del self.use_requests[agent.use_id] 556 557 def _monitor_stderr(self, agent: TalentProcess) -> None: 558 """Monitor talent stderr for errors.""" 559 if not agent.process.stderr: 560 return 561 562 stderr_lines = [] 563 try: 564 with agent.process.stderr: 565 for line in agent.process.stderr: 566 if not line: 567 continue 568 stripped = line.strip() 569 if stripped: 570 stderr_lines.append(stripped) 571 # Pass through to cortex stderr with talent prefix for traceability 572 print( 573 f"[talent:{agent.use_id}:stderr] {stripped}", 574 file=sys.stderr, 575 flush=True, 576 ) 577 578 except Exception as e: 579 self.logger.error(f"Error monitoring stderr for agent {agent.use_id}: {e}") 580 finally: 581 # If process failed with stderr output, write error event 582 if stderr_lines: 583 exit_code = agent.process.poll() 584 if exit_code is not None and exit_code != 0: 585 error_event = self._create_error_event( 586 agent.use_id, 587 "Process failed with stderr output", 588 trace="\n".join(stderr_lines), 589 exit_code=exit_code, 590 ) 591 try: 592 with open(agent.log_path, "a") as f: 593 f.write(json.dumps(error_event) + "\n") 594 except Exception as e: 595 self.logger.warning(f"Failed to write stderr event: {e}") 596 597 def _has_finish_event(self, file_path: Path) -> bool: 598 """Check if the JSONL file contains a finish or error event.""" 599 try: 600 with open(file_path, "r") as f: 601 for line in f: 602 try: 603 event = json.loads(line) 604 if event.get("event") in ["finish", "error"]: 605 return True 606 except json.JSONDecodeError as exc: 607 self.logger.warning( 608 "Malformed event in %s while scanning for finish: %s", 609 file_path, 610 exc, 611 ) 612 continue 613 except FileNotFoundError: 614 self.logger.debug("Use log disappeared before finish scan: %s", file_path) 615 except OSError as exc: 616 self.logger.warning( 617 "Failed to scan %s for finish events: %s", file_path, exc 618 ) 619 return False 620 621 def _complete_use_file(self, use_id: str, file_path: Path) -> None: 622 """Complete a talent use by renaming the file from _active.jsonl to .jsonl.""" 623 try: 624 completed_path = file_path.parent / f"{use_id}.jsonl" 625 file_path.rename(completed_path) 626 self.logger.info(f"Completed talent use {use_id}: {completed_path}") 627 628 # Create convenience symlink: {name}.log -> {name}/{use_id}.jsonl 629 request = self.use_requests.get(use_id) 630 if request: 631 name = request.get("name") 632 if name: 633 safe_name = name.replace(":", "--") 634 link_path = self.talents_dir / f"{safe_name}.log" 635 _atomic_symlink(link_path, f"{safe_name}/{use_id}.jsonl") 636 self.logger.debug( 637 f"Symlinked {safe_name}.log -> {safe_name}/{use_id}.jsonl" 638 ) 639 640 # Append summary to day index 641 self._append_day_index(use_id, request, completed_path) 642 else: 643 self.logger.debug( 644 f"No name in request for {use_id}, skipping symlink" 645 ) 646 except Exception as e: 647 self.logger.error(f"Failed to complete talent file {use_id}: {e}") 648 649 def _append_day_index( 650 self, use_id: str, request: Dict[str, Any], completed_path: Path 651 ) -> None: 652 """Append talent-use summary to the day index file.""" 653 try: 654 # Determine day from request or use_id timestamp 655 day = request.get("day") 656 if not day: 657 from datetime import datetime 658 659 ts_seconds = int(use_id) / 1000 660 day = datetime.fromtimestamp(ts_seconds).strftime("%Y%m%d") 661 662 start_ts = request.get("ts", 0) 663 664 # Read last few lines to find finish/error event for runtime 665 runtime_seconds = None 666 status = "completed" 667 try: 668 with open(completed_path, "r") as f: 669 lines = f.readlines() 670 for line in reversed(lines[-10:]): 671 line = line.strip() 672 if not line: 673 continue 674 try: 675 event = json.loads(line) 676 event_type = event.get("event") 677 if event_type == "finish": 678 end_ts = event.get("ts", 0) 679 if end_ts and start_ts: 680 runtime_seconds = round((end_ts - start_ts) / 1000.0, 1) 681 break 682 if event_type == "error": 683 status = "error" 684 end_ts = event.get("ts", 0) 685 if end_ts and start_ts: 686 runtime_seconds = round((end_ts - start_ts) / 1000.0, 1) 687 break 688 except json.JSONDecodeError: 689 continue 690 except Exception: 691 pass 692 693 summary = { 694 "use_id": use_id, 695 "name": request.get("name", "unified"), 696 "day": day, 697 "facet": request.get("facet"), 698 "ts": start_ts, 699 "status": status, 700 "runtime_seconds": runtime_seconds, 701 "provider": request.get("provider"), 702 "model": request.get("model"), 703 "schedule": request.get("schedule"), 704 } 705 706 day_index_path = self.talents_dir / f"{day}.jsonl" 707 with open(day_index_path, "a") as f: 708 f.write(json.dumps(summary) + "\n") 709 f.flush() 710 711 except Exception as e: 712 self.logger.error(f"Failed to append day index for {use_id}: {e}") 713 714 def _write_error_and_complete(self, file_path: Path, error_message: str) -> None: 715 """Write an error event to the file and mark it as complete.""" 716 try: 717 use_id = file_path.stem.replace("_active", "") 718 error_event = self._create_error_event(use_id, error_message) 719 with open(file_path, "a") as f: 720 f.write(json.dumps(error_event) + "\n") 721 722 # Complete the file 723 self._complete_use_file(use_id, file_path) 724 except Exception as e: 725 self.logger.error(f"Failed to write error and complete: {e}") 726 727 def _write_output(self, use_id: str, result: str, config: Dict[str, Any]) -> None: 728 """Write talent output to config["output_path"]. 729 730 The output path is set by the caller — either derived by 731 prepare_config in think.talents (day/segment talents) or computed 732 by thinking.py via get_activity_output_path (activity talents). 733 Cortex does not derive paths itself. 734 """ 735 output_path_str = config.get("output_path") 736 if not output_path_str: 737 return 738 739 try: 740 output_path = Path(output_path_str) 741 output_path.parent.mkdir(parents=True, exist_ok=True) 742 743 with open(output_path, "w", encoding="utf-8") as f: 744 f.write(result) 745 746 self.logger.info(f"Wrote talent {use_id} output to {output_path}") 747 748 except Exception as e: 749 self.logger.error(f"Failed to write talent {use_id} output: {e}") 750 751 def stop(self) -> None: 752 """Stop the Cortex service.""" 753 self.stop_event.set() 754 755 # Close Callosum connection 756 if self.callosum: 757 self.callosum.stop() 758 759 # Stop all running talent uses 760 with self.lock: 761 for agent in self.running_uses.values(): 762 agent.stop() 763 764 def _emit_periodic_status(self) -> None: 765 """Emit status events every 5 seconds (runs in background thread).""" 766 while not self.stop_event.is_set(): 767 try: 768 with self.lock: 769 uses = [] 770 for use_id, agent_proc in self.running_uses.items(): 771 config = self.use_requests.get(use_id, {}) 772 uses.append( 773 { 774 "use_id": use_id, 775 "name": config.get("name", "unknown"), 776 "provider": config.get("provider", "unknown"), 777 "elapsed_seconds": int( 778 time.time() - agent_proc.start_time 779 ), 780 } 781 ) 782 783 # Only emit status when there are active talent uses 784 if uses: 785 self.callosum.emit( 786 "cortex", 787 "status", 788 running_uses=len(uses), 789 uses=uses, 790 ) 791 except Exception as e: 792 self.logger.debug(f"Status emission failed: {e}") 793 794 time.sleep(5) 795 796 def get_status(self) -> Dict[str, Any]: 797 """Get service status information.""" 798 with self.lock: 799 return { 800 "running_uses": len(self.running_uses), 801 "use_ids": list(self.running_uses.keys()), 802 } 803 804 805def main() -> None: 806 """CLI entry point for the Cortex service.""" 807 import argparse 808 809 from think.utils import require_solstone, setup_cli 810 811 parser = argparse.ArgumentParser(description="solstone Cortex Talent Manager") 812 args = setup_cli(parser) 813 require_solstone() 814 815 # Set up logging 816 logging.basicConfig( 817 level=logging.INFO if not args.verbose else logging.DEBUG, 818 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", 819 ) 820 821 # Start the service 822 cortex = CortexService() 823 824 try: 825 cortex.start() 826 except KeyboardInterrupt: 827 logging.getLogger(__name__).info("Shutting down Cortex service") 828 cortex.stop() 829 830 831if __name__ == "__main__": 832 main()