personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 416 lines 13 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""Cortex client for managing AI talent requests.""" 5 6import json 7import logging 8import threading 9from pathlib import Path 10from typing import Any, Dict, Optional 11 12from think.callosum import CallosumConnection, callosum_send 13from think.utils import get_journal, now_ms 14 15logger = logging.getLogger(__name__) 16 17# Module-level state for monotonic timestamp generation 18_last_ts = 0 19 20 21def _find_use_file(talents_dir: Path, use_id: str) -> tuple[Path | None, str]: 22 """Find a use log file in per-talent subdirectories. 23 24 Returns: 25 Tuple of (file_path, status) where status is 26 "completed", "running", or "not_found". 27 """ 28 for match in talents_dir.glob(f"*/{use_id}.jsonl"): 29 return match, "completed" 30 for match in talents_dir.glob(f"*/{use_id}_active.jsonl"): 31 return match, "running" 32 return None, "not_found" 33 34 35def cortex_request( 36 prompt: str, 37 name: str, 38 provider: Optional[str] = None, 39 config: Optional[Dict[str, Any]] = None, 40 use_id: Optional[str] = None, 41) -> str | None: 42 """Create a Cortex talent request via Callosum broadcast. 43 44 Args: 45 prompt: The task or question for the talent 46 name: Talent name - system (e.g., "chat") or app-qualified (e.g., "entities:entity_assist") 47 provider: AI provider - openai, google, or anthropic 48 config: Provider-specific configuration (model, max_output_tokens, thinking_budget, etc.) 49 use_id: Optional pre-reserved use_id. When omitted, a unique timestamp is allocated. 50 51 Returns: 52 Use ID (timestamp-based string), or None if the Callosum send failed. 53 """ 54 # Get journal path (for use_id uniqueness check) 55 journal_path = get_journal() 56 57 # Create talents directory if it doesn't exist 58 talents_dir = Path(journal_path) / "talents" 59 talents_dir.mkdir(parents=True, exist_ok=True) 60 61 # Generate monotonic timestamp in milliseconds, ensuring uniqueness 62 global _last_ts 63 if use_id is None: 64 ts = now_ms() 65 66 if ts <= _last_ts: 67 ts = _last_ts + 1 68 69 _last_ts = ts 70 use_id = str(ts) 71 else: 72 if not use_id.isdigit(): 73 raise ValueError("use_id must be a millisecond timestamp string") 74 ts = int(use_id) 75 if ts > _last_ts: 76 _last_ts = ts 77 78 # Build request object 79 request = { 80 "event": "request", 81 "ts": ts, 82 "use_id": use_id, 83 "prompt": prompt, 84 "provider": provider, 85 "name": name, 86 } 87 88 # Add optional fields 89 if config: 90 if not isinstance(config, dict): 91 raise ValueError("config must be a dictionary") 92 # Merge config overrides directly into the request for a flat schema 93 request.update(config) 94 95 # Broadcast request to Callosum 96 # Note: callosum_send() signature is send(tract, event, **fields) 97 # Remove "event" from request dict to avoid conflict 98 request_fields = {k: v for k, v in request.items() if k != "event"} 99 sent = callosum_send("cortex", "request", **request_fields) 100 101 if not sent: 102 logger.info("Failed to send cortex request for talent '%s'", name) 103 return None 104 105 return use_id 106 107 108def get_use_log_status(use_id: str) -> str: 109 """Get the status of a specific use from its log file. 110 111 Args: 112 use_id: The use ID (timestamp) 113 114 Returns: 115 "completed" - Use finished (*.jsonl exists) 116 "running" - Use still active (*_active.jsonl exists) 117 "not_found" - No use file exists 118 """ 119 talents_dir = Path(get_journal()) / "talents" 120 _, status = _find_use_file(talents_dir, use_id) 121 return status 122 123 124def wait_for_uses( 125 use_ids: list[str], 126 timeout: int | None = 600, 127) -> tuple[dict[str, str], list[str]]: 128 """Wait for uses to complete via Callosum events. 129 130 Listens for cortex.finish and cortex.error events. Sets up the event 131 listener first, then does an initial file check for uses that may have 132 already completed, and a final file check at timeout as a backstop for 133 any missed events. 134 135 Args: 136 use_ids: List of use IDs to wait for 137 timeout: Maximum wait time in seconds (default 600 = 10 minutes) 138 139 Returns: 140 Tuple of (completed, timed_out) where completed is a dict mapping 141 use_id to end state ("finish" or "error"), and timed_out is a 142 list of use IDs that did not complete within the timeout. 143 """ 144 pending = set(use_ids) 145 completed: dict[str, str] = {} 146 lock = threading.Lock() 147 all_done = threading.Event() 148 149 def on_message(msg: dict) -> None: 150 if msg.get("tract") != "cortex": 151 return 152 use_id = msg.get("use_id") 153 if not use_id: 154 return 155 156 event_type = msg.get("event") 157 if event_type in ("finish", "error"): 158 with lock: 159 if use_id in pending: 160 completed[use_id] = event_type 161 pending.discard(use_id) 162 if not pending: 163 all_done.set() 164 165 # Start listener BEFORE initial check to avoid race condition 166 listener = CallosumConnection() 167 listener.start(callback=on_message) 168 169 try: 170 # Initial file check (with lock since callback may be running) 171 with lock: 172 for use_id in list(pending): 173 end_state = get_use_end_state(use_id) 174 if end_state in ("finish", "error"): 175 completed[use_id] = end_state 176 pending.discard(use_id) 177 178 if not pending: 179 return completed, [] 180 181 # Wait for all completions or timeout 182 all_done.wait(timeout=timeout) 183 184 finally: 185 listener.stop() 186 187 # Final file check for any remaining (backstop for missed events) 188 # Listener is stopped, so no lock needed 189 for use_id in list(pending): 190 end_state = get_use_end_state(use_id) 191 if end_state in ("finish", "error"): 192 logger.info( 193 f"Talent use {use_id} completion event not received but use completed" 194 ) 195 completed[use_id] = end_state 196 pending.discard(use_id) 197 198 return completed, list(pending) 199 200 201def get_use_end_state(use_id: str) -> str: 202 """Get how a completed use ended (finish or error). 203 204 Checks file contents for terminal events even if file is still _active.jsonl, 205 since Callosum broadcasts happen before file rename. 206 207 Args: 208 use_id: The use ID (timestamp) 209 210 Returns: 211 "finish" - Use completed successfully 212 "error" - Use ended with an error 213 "running" - Use is still active (no terminal event in file) 214 "unknown" - Use file not found 215 """ 216 status = get_use_log_status(use_id) 217 if status == "not_found": 218 return "unknown" 219 220 # Read events to find terminal state (even for "running" files that may 221 # have finish event - Callosum broadcast happens before file rename) 222 try: 223 events = read_use_events(use_id) 224 # Find last finish or error event 225 for event in reversed(events): 226 event_type = event.get("event") 227 if event_type == "finish": 228 return "finish" 229 if event_type == "error": 230 return "error" 231 # No terminal event found - still running 232 return "running" 233 except FileNotFoundError: 234 return "unknown" 235 236 237def read_use_events(use_id: str) -> list[Dict[str, Any]]: 238 """Read all events from a use's JSONL log file. 239 240 Args: 241 use_id: The use ID (timestamp) 242 243 Returns: 244 List of event dictionaries in chronological order 245 246 Raises: 247 FileNotFoundError: If the use log doesn't exist 248 """ 249 talents_dir = Path(get_journal()) / "talents" 250 use_file, _status = _find_use_file(talents_dir, use_id) 251 if use_file is None: 252 raise FileNotFoundError(f"Talent log not found: {use_id}") 253 254 events = [] 255 with open(use_file, "r") as f: 256 for line in f: 257 line = line.strip() 258 if not line: 259 continue 260 try: 261 event = json.loads(line) 262 events.append(event) 263 except json.JSONDecodeError: 264 logger.debug(f"Skipping malformed JSON in {use_file}") 265 continue 266 267 return events 268 269 270def cortex_uses( 271 limit: int = 10, 272 offset: int = 0, 273 use_type: str = "all", 274 facet: Optional[str] = None, 275) -> Dict[str, Any]: 276 """List talent uses from the journal with pagination and filtering. 277 278 Legacy unnamed run logs predate the chat rename and are surfaced as chat. 279 280 Args: 281 limit: Maximum number of uses to return (1-100) 282 offset: Number of uses to skip 283 use_type: Filter by "live", "historical", or "all" 284 facet: Optional facet to filter by. If provided, only returns uses 285 that were run in this facet context. None means no filtering. 286 287 Returns: 288 Dictionary with use list and pagination info 289 """ 290 # Validate parameters 291 limit = max(1, min(limit, 100)) 292 offset = max(0, offset) 293 294 talents_dir = Path(get_journal()) / "talents" 295 if not talents_dir.exists(): 296 return { 297 "uses": [], 298 "pagination": { 299 "limit": limit, 300 "offset": offset, 301 "total": 0, 302 "has_more": False, 303 }, 304 "live_count": 0, 305 "historical_count": 0, 306 } 307 308 # Collect all use files 309 all_uses = [] 310 live_count = 0 311 historical_count = 0 312 313 for use_file in talents_dir.glob("*/*.jsonl"): 314 # Determine status from filename 315 is_active = "_active.jsonl" in use_file.name 316 is_pending = "_pending.jsonl" in use_file.name 317 318 # Skip pending files 319 if is_pending: 320 continue 321 322 status = "running" if is_active else "completed" 323 324 # Count by type 325 if status == "running": 326 live_count += 1 327 else: 328 historical_count += 1 329 330 # Filter by requested type 331 if use_type == "live" and status != "running": 332 continue 333 if use_type == "historical" and status != "completed": 334 continue 335 336 # Extract use ID from filename 337 use_id = use_file.stem.replace("_active", "") 338 339 # Read use file to get request info and calculate runtime 340 try: 341 with open(use_file, "r") as f: 342 lines = f.readlines() 343 if not lines: 344 continue 345 346 # Parse first line (request) 347 first_line = lines[0].strip() 348 if not first_line: 349 continue 350 351 request = json.loads(first_line) 352 if request.get("event") != "request": 353 continue 354 355 # Extract facet from request 356 use_facet = request.get("facet") 357 358 # Filter by facet if specified 359 if facet is not None and use_facet != facet: 360 continue 361 362 # Extract basic info 363 use_info = { 364 "id": use_id, 365 # Legacy unnamed run logs predate the chat rename; treat them as chat. 366 "name": request.get("name", "chat"), 367 "start": request.get("ts", 0), 368 "status": status, 369 "prompt": request.get("prompt", ""), 370 "provider": request.get("provider", "openai"), 371 "facet": use_facet, 372 } 373 374 # For completed uses, find finish event to calculate runtime 375 if status == "completed" and len(lines) > 1: 376 # Read last few lines to find finish event (reading backwards is more efficient) 377 for line in reversed(lines[-10:]): # Check last 10 lines 378 line = line.strip() 379 if not line: 380 continue 381 try: 382 event = json.loads(line) 383 if event.get("event") == "finish": 384 end_ts = event.get("ts", 0) 385 if end_ts and use_info["start"]: 386 # Calculate runtime in seconds 387 use_info["runtime_seconds"] = ( 388 end_ts - use_info["start"] 389 ) / 1000.0 390 break 391 except json.JSONDecodeError: 392 continue 393 394 all_uses.append(use_info) 395 except (json.JSONDecodeError, IOError): 396 # Skip malformed files 397 continue 398 399 # Sort by start time (newest first) 400 all_uses.sort(key=lambda x: x["start"], reverse=True) 401 402 # Apply pagination 403 total = len(all_uses) 404 paginated = all_uses[offset : offset + limit] 405 406 return { 407 "uses": paginated, 408 "pagination": { 409 "limit": limit, 410 "offset": offset, 411 "total": total, 412 "has_more": (offset + limit) < total, 413 }, 414 "live_count": live_count, 415 "historical_count": historical_count, 416 }