personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""Callosum-based talent process manager for solstone.
5
6Cortex listens for talent requests via the Callosum message bus and manages
7talent process lifecycle:
8- Receives requests via Callosum (tract="cortex", event="request")
9- Creates <talent>/<timestamp>_active.jsonl files to track active uses
10- Spawns talent processes and captures their stdout events
11- Broadcasts all talent events back to Callosum
12- Renames to <talent>/<timestamp>.jsonl when complete
13
14Talent files provide persistence and historical record, while Callosum provides
15real-time event distribution to all interested services.
16"""
17
18from __future__ import annotations
19
20import json
21import logging
22import os
23import subprocess
24import sys
25import threading
26import time
27from pathlib import Path
28from typing import Any, Dict, Optional
29
30from think.callosum import CallosumConnection
31from think.runner import _atomic_symlink
32from think.talents import TALENT_EXECUTION_MODULE
33from think.utils import get_journal, get_project_root, get_rev, now_ms
34
35
36class TalentProcess:
37 """Manages a running talent subprocess."""
38
39 def __init__(self, use_id: str, process: subprocess.Popen, log_path: Path):
40 self.use_id = use_id
41 self.process = process
42 self.log_path = log_path
43 self.stop_event = threading.Event()
44 self.timeout_timer = None # For timeout support
45 self.start_time = time.time() # Track when agent started
46
47 def is_running(self) -> bool:
48 """Check if the agent process is still running."""
49 return self.process.poll() is None and not self.stop_event.is_set()
50
51 def stop(self) -> None:
52 """Stop the agent process gracefully."""
53 self.stop_event.set()
54
55 # Cancel timeout timer if it exists
56 if self.timeout_timer:
57 self.timeout_timer.cancel()
58
59 if self.process.poll() is None:
60 # First try SIGTERM for graceful shutdown
61 self.process.terminate()
62 try:
63 self.process.wait(timeout=10) # Give more time for graceful shutdown
64 except subprocess.TimeoutExpired:
65 logging.getLogger(__name__).warning(
66 f"Talent {self.use_id} didn't stop gracefully, killing"
67 )
68 self.process.kill()
69 self.process.wait() # Ensure zombie is reaped
70
71
72class CortexService:
73 """Callosum-based talent process manager."""
74
75 def __init__(self, journal_path: Optional[str] = None):
76 self.journal_path = Path(journal_path or get_journal())
77 self.talents_dir = self.journal_path / "talents"
78 self.talents_dir.mkdir(parents=True, exist_ok=True)
79
80 self.logger = logging.getLogger(__name__)
81 self.running_uses: Dict[str, TalentProcess] = {}
82 self.use_requests: Dict[str, Dict[str, Any]] = {} # Store use requests
83 self.lock = threading.RLock()
84 self.stop_event = threading.Event()
85 self.shutdown_requested = threading.Event()
86
87 # Callosum connection for receiving requests and broadcasting events
88 self.callosum = CallosumConnection(defaults={"rev": get_rev()})
89
90 def _create_error_event(
91 self,
92 use_id: str,
93 error: str,
94 trace: Optional[str] = None,
95 exit_code: Optional[int] = None,
96 ) -> Dict[str, Any]:
97 """Create standardized error event."""
98 event = {
99 "event": "error",
100 "ts": now_ms(),
101 "use_id": use_id,
102 "error": error,
103 }
104 if trace:
105 event["trace"] = trace
106 if exit_code is not None:
107 event["exit_code"] = exit_code
108 return event
109
110 def _recover_orphaned_uses(self, active_files: list) -> None:
111 """Recover orphaned active talent files from a previous crash.
112
113 Appends an error event to each file and renames to completed.
114 """
115 for file_path in active_files:
116 use_id = file_path.stem.replace("_active", "")
117 try:
118 error_event = self._create_error_event(
119 use_id, "Recovered: Cortex restarted while talent was running"
120 )
121 with open(file_path, "a") as f:
122 f.write(json.dumps(error_event) + "\n")
123
124 completed_path = file_path.parent / f"{use_id}.jsonl"
125 file_path.rename(completed_path)
126 self.logger.warning(f"Recovered orphaned talent: {use_id}")
127 except Exception as e:
128 self.logger.error(f"Failed to recover talent {use_id}: {e}")
129
130 def start(self) -> None:
131 """Start listening for talent requests via Callosum."""
132 # Recover any orphaned active files from previous crash
133 active_files = list(self.talents_dir.glob("*/*_active.jsonl"))
134 if active_files:
135 self.logger.warning(
136 f"Found {len(active_files)} orphaned talent use(s), recovering..."
137 )
138 self._recover_orphaned_uses(active_files)
139
140 # Connect to Callosum to receive requests
141 try:
142 self.callosum.start(callback=self._handle_callosum_message)
143 self.logger.info("Connected to Callosum message bus")
144 self.callosum.emit(
145 "supervisor", "request", cmd=["sol", "providers", "check"]
146 )
147 self.logger.info("Requested providers health check via supervisor")
148 except Exception as e:
149 self.logger.error(f"Failed to connect to Callosum: {e}")
150 sys.exit(1)
151
152 # Start status emission thread
153 threading.Thread(
154 target=self._emit_periodic_status,
155 name="cortex-status",
156 daemon=True,
157 ).start()
158
159 self.logger.info("Cortex service started, listening for talent requests")
160
161 while True:
162 try:
163 while not self.stop_event.is_set():
164 time.sleep(1)
165 # Exit when idle during shutdown
166 if self.shutdown_requested.is_set():
167 with self.lock:
168 if len(self.running_uses) == 0:
169 self.logger.info(
170 "No talent uses running, exiting gracefully"
171 )
172 return
173 break
174 except KeyboardInterrupt:
175 self.logger.info("Shutdown requested, will exit when idle")
176 self.shutdown_requested.set()
177
178 def _handle_callosum_message(self, message: Dict[str, Any]) -> None:
179 """Handle incoming Callosum messages (callback)."""
180 # Filter for cortex tract and request event
181 if message.get("tract") != "cortex" or message.get("event") != "request":
182 return
183
184 # Handle the request
185 try:
186 self._handle_request(message)
187 except Exception as e:
188 self.logger.exception(f"Error handling request: {e}")
189
190 def _handle_request(self, request: Dict[str, Any]) -> None:
191 """Handle a new talent request from Callosum.
192
193 Cortex is a minimal process manager - it only handles:
194 - File lifecycle (<talent>/<id>_active.jsonl -> <talent>/<id>.jsonl)
195 - Process spawning and monitoring
196 - Event relay to Callosum
197
198 All config loading, validation, and hydration is done by think.talents.
199 Cortex only resolves talent cwd early so the child process starts in
200 the correct working directory.
201 """
202 use_id = request.get("use_id")
203 if not use_id:
204 self.logger.error("Received request without use_id")
205 return
206
207 # Skip if this use is already being processed
208 with self.lock:
209 if use_id in self.running_uses:
210 self.logger.debug(f"Talent use {use_id} already running, skipping")
211 return
212
213 # Create _active.jsonl file (exclusive creation to prevent race conditions)
214 name = request.get("name", "unified")
215 safe_name = name.replace(":", "--")
216 talent_subdir = self.talents_dir / safe_name
217 talent_subdir.mkdir(parents=True, exist_ok=True)
218 file_path = talent_subdir / f"{use_id}_active.jsonl"
219 if file_path.exists():
220 self.logger.debug(f"Talent use {use_id} already claimed by another process")
221 return
222
223 try:
224 with open(file_path, "x") as f: # 'x' mode fails if file exists
225 f.write(json.dumps(request) + "\n")
226 except FileExistsError:
227 return
228
229 self.logger.info(f"Processing talent request: {use_id}")
230
231 # Store request for later use (output writing)
232 with self.lock:
233 self.use_requests[use_id] = request
234
235 # Spawn talent process - it handles all validation/hydration
236 try:
237 self._spawn_subprocess(
238 use_id,
239 file_path,
240 request,
241 [sys.executable, "-m", TALENT_EXECUTION_MODULE],
242 "talent",
243 )
244 except Exception as e:
245 self.logger.exception(f"Failed to spawn talent {use_id}: {e}")
246 self._write_error_and_complete(file_path, f"Failed to spawn talent: {e}")
247
248 def _spawn_subprocess(
249 self,
250 use_id: str,
251 file_path: Path,
252 config: Dict[str, Any],
253 cmd: list[str],
254 process_type: str,
255 ) -> None:
256 """Spawn a subprocess and monitor its output.
257
258 Args:
259 use_id: Unique identifier for this process
260 file_path: Path to the JSONL log file
261 config: Configuration dict to pass via NDJSON stdin
262 cmd: Command to run (e.g., [sys.executable, "-m", TALENT_EXECUTION_MODULE])
263 process_type: Label for logging ("talent")
264 """
265 try:
266 # Store the config for later use - thread safe
267 with self.lock:
268 self.use_requests[use_id] = config
269
270 # Pass the full config through as NDJSON
271 ndjson_input = json.dumps(config)
272
273 # Prepare environment
274 env = os.environ.copy()
275
276 # Promote top-level config keys to environment so tools can read
277 # them as defaults (e.g., sol call todos add uses SOL_FACET).
278 # Explicit env overrides below take precedence.
279 if config.get("facet"):
280 env["SOL_FACET"] = str(config["facet"])
281 if config.get("day"):
282 env["SOL_DAY"] = str(config["day"])
283
284 # Apply explicit env overrides (from thinking.py etc.) — these win
285 env_overrides = config.get("env")
286 if env_overrides and isinstance(env_overrides, dict):
287 env.update({k: str(v) for k, v in env_overrides.items()})
288
289 # Spawn the subprocess
290 self.logger.info(f"Spawning {process_type} {use_id}: {cmd}")
291 self.logger.debug(f"NDJSON input: {ndjson_input}")
292 subprocess_cwd = None
293 if process_type == "talent":
294 from think.talent import get_talent
295
296 talent_key = str(config.get("name", "unified"))
297 talent_config = get_talent(talent_key)
298 if talent_config.get("type") == "cogitate":
299 # Resolve here because prepare_config() runs inside think.talents.
300 cwd_value = talent_config.get("cwd")
301 if cwd_value == "journal":
302 try:
303 subprocess_cwd = str(Path(get_journal()))
304 except Exception as exc:
305 raise RuntimeError(
306 f"Cannot resolve cwd for talent '{talent_key}'"
307 ) from exc
308 elif cwd_value == "repo":
309 subprocess_cwd = get_project_root()
310 else:
311 raise RuntimeError(
312 f"Cannot resolve cwd for talent '{talent_key}'"
313 )
314
315 process = subprocess.Popen(
316 cmd,
317 stdin=subprocess.PIPE,
318 stdout=subprocess.PIPE,
319 stderr=subprocess.PIPE,
320 text=True,
321 env=env,
322 bufsize=1,
323 cwd=subprocess_cwd,
324 )
325
326 # Send input and close stdin
327 process.stdin.write(ndjson_input + "\n")
328 process.stdin.close()
329
330 # Track the running process
331 agent = TalentProcess(use_id, process, file_path)
332 with self.lock:
333 self.running_uses[use_id] = agent
334
335 # Set up timeout (default to 10 minutes if not specified)
336 timeout_seconds = config.get("timeout_seconds", 600)
337 agent.timeout_timer = threading.Timer(
338 timeout_seconds,
339 lambda: self._timeout_talent(use_id, agent, timeout_seconds),
340 )
341 agent.timeout_timer.start()
342
343 # Start monitoring threads
344 threading.Thread(
345 target=self._monitor_stdout, args=(agent,), daemon=True
346 ).start()
347
348 threading.Thread(
349 target=self._monitor_stderr, args=(agent,), daemon=True
350 ).start()
351
352 self.logger.info(
353 f"{process_type.capitalize()} {use_id} spawned successfully "
354 f"(PID: {process.pid})"
355 )
356
357 except Exception as e:
358 self.logger.exception(f"Failed to spawn {process_type} {use_id}: {e}")
359 self._write_error_and_complete(
360 file_path, f"Failed to spawn {process_type}: {e}"
361 )
362
363 def _timeout_talent(
364 self, use_id: str, agent: TalentProcess, timeout_seconds: int
365 ) -> None:
366 """Handle talent timeout."""
367 if agent.is_running():
368 self.logger.warning(
369 f"Talent {use_id} timed out after {timeout_seconds} seconds"
370 )
371 error_event = self._create_error_event(
372 use_id, f"Talent timed out after {timeout_seconds} seconds"
373 )
374 try:
375 with open(agent.log_path, "a") as f:
376 f.write(json.dumps(error_event) + "\n")
377 except Exception as e:
378 self.logger.error(f"Failed to write timeout event: {e}")
379
380 # Broadcast to callosum so wait_for_uses detects immediately
381 try:
382 event_copy = error_event.copy()
383 event_type = event_copy.pop("event", "error")
384 self.callosum.emit("cortex", event_type, **event_copy)
385 except Exception:
386 pass
387
388 agent.stop()
389
390 def _monitor_stdout(self, agent: TalentProcess) -> None:
391 """Monitor talent stdout and append events to the JSONL file."""
392 if not agent.process.stdout:
393 return
394
395 try:
396 with agent.process.stdout:
397 for line in agent.process.stdout:
398 if not line:
399 continue
400
401 line = line.strip()
402 if not line:
403 continue
404
405 try:
406 # Parse JSON event
407 event = json.loads(line)
408
409 # Ensure event has timestamp and use_id
410 if "ts" not in event:
411 event["ts"] = now_ms()
412 if "use_id" not in event:
413 event["use_id"] = agent.use_id
414
415 # Inject agent name for WebSocket consumers
416 with self.lock:
417 _req = self.use_requests.get(agent.use_id)
418 if _req and "name" not in event:
419 event["name"] = _req.get("name", "")
420 # Inject display mode for triage talent finish events
421 if event.get("event") == "finish" and _req:
422 try:
423 from apps.home.events import TRIAGE_AGENT_NAMES
424 from convey.triage import compute_display_mode
425
426 if _req.get("name", "") in TRIAGE_AGENT_NAMES:
427 event["display"] = compute_display_mode(
428 event.get("result", "")
429 )
430 except Exception:
431 pass # Display is cosmetic; don't break finish handling
432
433 # Append to JSONL file
434 with open(agent.log_path, "a") as f:
435 f.write(json.dumps(event) + "\n")
436
437 # Broadcast event to Callosum
438 try:
439 event_copy = event.copy()
440 event_type = event_copy.pop("event", "unknown")
441 self.callosum.emit("cortex", event_type, **event_copy)
442 except Exception as e:
443 self.logger.info(
444 f"Failed to broadcast event to Callosum: {e}"
445 )
446
447 # Handle start event
448 if event.get("event") == "start":
449 # Capture model and provider for status reporting
450 with self.lock:
451 if agent.use_id in self.use_requests:
452 model = event.get("model")
453 if model:
454 self.use_requests[agent.use_id]["model"] = model
455 provider = event.get("provider")
456 if provider:
457 self.use_requests[agent.use_id]["provider"] = (
458 provider
459 )
460
461 # Handle finish or error event
462 if event.get("event") in ["finish", "error"]:
463 # Check for output (only on finish)
464 if event.get("event") == "finish":
465 result = event.get("result", "")
466
467 # Get original request (thread-safe access)
468 with self.lock:
469 original_request = self.use_requests.get(
470 agent.use_id
471 )
472
473 # Log token usage if available
474 usage_data = event.get("usage")
475 if usage_data and original_request:
476 try:
477 from think.models import log_token_usage
478 from think.talent import key_to_context
479
480 model = original_request.get("model", "unknown")
481 name = original_request.get("name", "unknown")
482 context = key_to_context(name)
483
484 # Extract segment from env if set (flat merge puts env at top level)
485 env_config = original_request.get("env", {})
486 segment = (
487 env_config.get("SOL_SEGMENT")
488 if env_config
489 else None
490 )
491
492 log_token_usage(
493 model=model,
494 usage=usage_data,
495 context=context,
496 segment=segment,
497 type="cogitate",
498 )
499 except Exception as e:
500 self.logger.warning(
501 f"Failed to log token usage for talent {agent.use_id}: {e}"
502 )
503
504 # Write output if requested
505 if original_request and original_request.get("output"):
506 self._write_output(
507 agent.use_id,
508 result,
509 original_request,
510 )
511
512 # Break to trigger cleanup
513 break
514
515 except json.JSONDecodeError:
516 # Non-JSON output becomes info event
517 info_event = {
518 "event": "info",
519 "ts": now_ms(),
520 "message": line,
521 "use_id": agent.use_id,
522 }
523 with open(agent.log_path, "a") as f:
524 f.write(json.dumps(info_event) + "\n")
525
526 except Exception as e:
527 self.logger.error(f"Error monitoring stdout for agent {agent.use_id}: {e}")
528 finally:
529 # Wait for process to fully exit (reaps zombie)
530 exit_code = agent.process.wait()
531 self.logger.info(f"Talent {agent.use_id} exited with code {exit_code}")
532
533 # Check if finish event was emitted
534 has_finish = self._has_finish_event(agent.log_path)
535
536 if not has_finish:
537 # Write error event if no finish using standardized format
538 error_event = self._create_error_event(
539 agent.use_id,
540 f"Talent exited with code {exit_code} without finish event",
541 exit_code=exit_code,
542 )
543 with open(agent.log_path, "a") as f:
544 f.write(json.dumps(error_event) + "\n")
545
546 # Complete the file (rename from _active.jsonl to .jsonl)
547 self._complete_use_file(agent.use_id, agent.log_path)
548
549 # Remove from running agents and clean up stored request (thread-safe)
550 with self.lock:
551 if agent.use_id in self.running_uses:
552 del self.running_uses[agent.use_id]
553 # Clean up stored request
554 if agent.use_id in self.use_requests:
555 del self.use_requests[agent.use_id]
556
557 def _monitor_stderr(self, agent: TalentProcess) -> None:
558 """Monitor talent stderr for errors."""
559 if not agent.process.stderr:
560 return
561
562 stderr_lines = []
563 try:
564 with agent.process.stderr:
565 for line in agent.process.stderr:
566 if not line:
567 continue
568 stripped = line.strip()
569 if stripped:
570 stderr_lines.append(stripped)
571 # Pass through to cortex stderr with talent prefix for traceability
572 print(
573 f"[talent:{agent.use_id}:stderr] {stripped}",
574 file=sys.stderr,
575 flush=True,
576 )
577
578 except Exception as e:
579 self.logger.error(f"Error monitoring stderr for agent {agent.use_id}: {e}")
580 finally:
581 # If process failed with stderr output, write error event
582 if stderr_lines:
583 exit_code = agent.process.poll()
584 if exit_code is not None and exit_code != 0:
585 error_event = self._create_error_event(
586 agent.use_id,
587 "Process failed with stderr output",
588 trace="\n".join(stderr_lines),
589 exit_code=exit_code,
590 )
591 try:
592 with open(agent.log_path, "a") as f:
593 f.write(json.dumps(error_event) + "\n")
594 except Exception as e:
595 self.logger.warning(f"Failed to write stderr event: {e}")
596
597 def _has_finish_event(self, file_path: Path) -> bool:
598 """Check if the JSONL file contains a finish or error event."""
599 try:
600 with open(file_path, "r") as f:
601 for line in f:
602 try:
603 event = json.loads(line)
604 if event.get("event") in ["finish", "error"]:
605 return True
606 except json.JSONDecodeError as exc:
607 self.logger.warning(
608 "Malformed event in %s while scanning for finish: %s",
609 file_path,
610 exc,
611 )
612 continue
613 except FileNotFoundError:
614 self.logger.debug("Use log disappeared before finish scan: %s", file_path)
615 except OSError as exc:
616 self.logger.warning(
617 "Failed to scan %s for finish events: %s", file_path, exc
618 )
619 return False
620
621 def _complete_use_file(self, use_id: str, file_path: Path) -> None:
622 """Complete a talent use by renaming the file from _active.jsonl to .jsonl."""
623 try:
624 completed_path = file_path.parent / f"{use_id}.jsonl"
625 file_path.rename(completed_path)
626 self.logger.info(f"Completed talent use {use_id}: {completed_path}")
627
628 # Create convenience symlink: {name}.log -> {name}/{use_id}.jsonl
629 request = self.use_requests.get(use_id)
630 if request:
631 name = request.get("name")
632 if name:
633 safe_name = name.replace(":", "--")
634 link_path = self.talents_dir / f"{safe_name}.log"
635 _atomic_symlink(link_path, f"{safe_name}/{use_id}.jsonl")
636 self.logger.debug(
637 f"Symlinked {safe_name}.log -> {safe_name}/{use_id}.jsonl"
638 )
639
640 # Append summary to day index
641 self._append_day_index(use_id, request, completed_path)
642 else:
643 self.logger.debug(
644 f"No name in request for {use_id}, skipping symlink"
645 )
646 except Exception as e:
647 self.logger.error(f"Failed to complete talent file {use_id}: {e}")
648
649 def _append_day_index(
650 self, use_id: str, request: Dict[str, Any], completed_path: Path
651 ) -> None:
652 """Append talent-use summary to the day index file."""
653 try:
654 # Determine day from request or use_id timestamp
655 day = request.get("day")
656 if not day:
657 from datetime import datetime
658
659 ts_seconds = int(use_id) / 1000
660 day = datetime.fromtimestamp(ts_seconds).strftime("%Y%m%d")
661
662 start_ts = request.get("ts", 0)
663
664 # Read last few lines to find finish/error event for runtime
665 runtime_seconds = None
666 status = "completed"
667 try:
668 with open(completed_path, "r") as f:
669 lines = f.readlines()
670 for line in reversed(lines[-10:]):
671 line = line.strip()
672 if not line:
673 continue
674 try:
675 event = json.loads(line)
676 event_type = event.get("event")
677 if event_type == "finish":
678 end_ts = event.get("ts", 0)
679 if end_ts and start_ts:
680 runtime_seconds = round((end_ts - start_ts) / 1000.0, 1)
681 break
682 if event_type == "error":
683 status = "error"
684 end_ts = event.get("ts", 0)
685 if end_ts and start_ts:
686 runtime_seconds = round((end_ts - start_ts) / 1000.0, 1)
687 break
688 except json.JSONDecodeError:
689 continue
690 except Exception:
691 pass
692
693 summary = {
694 "use_id": use_id,
695 "name": request.get("name", "unified"),
696 "day": day,
697 "facet": request.get("facet"),
698 "ts": start_ts,
699 "status": status,
700 "runtime_seconds": runtime_seconds,
701 "provider": request.get("provider"),
702 "model": request.get("model"),
703 "schedule": request.get("schedule"),
704 }
705
706 day_index_path = self.talents_dir / f"{day}.jsonl"
707 with open(day_index_path, "a") as f:
708 f.write(json.dumps(summary) + "\n")
709 f.flush()
710
711 except Exception as e:
712 self.logger.error(f"Failed to append day index for {use_id}: {e}")
713
714 def _write_error_and_complete(self, file_path: Path, error_message: str) -> None:
715 """Write an error event to the file and mark it as complete."""
716 try:
717 use_id = file_path.stem.replace("_active", "")
718 error_event = self._create_error_event(use_id, error_message)
719 with open(file_path, "a") as f:
720 f.write(json.dumps(error_event) + "\n")
721
722 # Complete the file
723 self._complete_use_file(use_id, file_path)
724 except Exception as e:
725 self.logger.error(f"Failed to write error and complete: {e}")
726
727 def _write_output(self, use_id: str, result: str, config: Dict[str, Any]) -> None:
728 """Write talent output to config["output_path"].
729
730 The output path is set by the caller — either derived by
731 prepare_config in think.talents (day/segment talents) or computed
732 by thinking.py via get_activity_output_path (activity talents).
733 Cortex does not derive paths itself.
734 """
735 output_path_str = config.get("output_path")
736 if not output_path_str:
737 return
738
739 try:
740 output_path = Path(output_path_str)
741 output_path.parent.mkdir(parents=True, exist_ok=True)
742
743 with open(output_path, "w", encoding="utf-8") as f:
744 f.write(result)
745
746 self.logger.info(f"Wrote talent {use_id} output to {output_path}")
747
748 except Exception as e:
749 self.logger.error(f"Failed to write talent {use_id} output: {e}")
750
751 def stop(self) -> None:
752 """Stop the Cortex service."""
753 self.stop_event.set()
754
755 # Close Callosum connection
756 if self.callosum:
757 self.callosum.stop()
758
759 # Stop all running talent uses
760 with self.lock:
761 for agent in self.running_uses.values():
762 agent.stop()
763
764 def _emit_periodic_status(self) -> None:
765 """Emit status events every 5 seconds (runs in background thread)."""
766 while not self.stop_event.is_set():
767 try:
768 with self.lock:
769 uses = []
770 for use_id, agent_proc in self.running_uses.items():
771 config = self.use_requests.get(use_id, {})
772 uses.append(
773 {
774 "use_id": use_id,
775 "name": config.get("name", "unknown"),
776 "provider": config.get("provider", "unknown"),
777 "elapsed_seconds": int(
778 time.time() - agent_proc.start_time
779 ),
780 }
781 )
782
783 # Only emit status when there are active talent uses
784 if uses:
785 self.callosum.emit(
786 "cortex",
787 "status",
788 running_uses=len(uses),
789 uses=uses,
790 )
791 except Exception as e:
792 self.logger.debug(f"Status emission failed: {e}")
793
794 time.sleep(5)
795
796 def get_status(self) -> Dict[str, Any]:
797 """Get service status information."""
798 with self.lock:
799 return {
800 "running_uses": len(self.running_uses),
801 "use_ids": list(self.running_uses.keys()),
802 }
803
804
805def main() -> None:
806 """CLI entry point for the Cortex service."""
807 import argparse
808
809 from think.utils import require_solstone, setup_cli
810
811 parser = argparse.ArgumentParser(description="solstone Cortex Talent Manager")
812 args = setup_cli(parser)
813 require_solstone()
814
815 # Set up logging
816 logging.basicConfig(
817 level=logging.INFO if not args.verbose else logging.DEBUG,
818 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
819 )
820
821 # Start the service
822 cortex = CortexService()
823
824 try:
825 cortex.start()
826 except KeyboardInterrupt:
827 logging.getLogger(__name__).info("Shutting down Cortex service")
828 cortex.stop()
829
830
831if __name__ == "__main__":
832 main()