personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at d18a7c02359cd827d0ff15058861de5c2600a96f 405 lines 13 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""Cortex client for managing AI talent requests.""" 5 6import json 7import logging 8import threading 9from pathlib import Path 10from typing import Any, Dict, Optional 11 12from think.callosum import CallosumConnection, callosum_send 13from think.utils import get_journal, now_ms 14 15logger = logging.getLogger(__name__) 16 17# Module-level state for monotonic timestamp generation 18_last_ts = 0 19 20 21def _find_use_file(talents_dir: Path, use_id: str) -> tuple[Path | None, str]: 22 """Find a use log file in per-talent subdirectories. 23 24 Returns: 25 Tuple of (file_path, status) where status is 26 "completed", "running", or "not_found". 27 """ 28 for match in talents_dir.glob(f"*/{use_id}.jsonl"): 29 return match, "completed" 30 for match in talents_dir.glob(f"*/{use_id}_active.jsonl"): 31 return match, "running" 32 return None, "not_found" 33 34 35def cortex_request( 36 prompt: str, 37 name: str, 38 provider: Optional[str] = None, 39 config: Optional[Dict[str, Any]] = None, 40) -> str | None: 41 """Create a Cortex talent request via Callosum broadcast. 42 43 Args: 44 prompt: The task or question for the talent 45 name: Talent name - system (e.g., "unified") or app-qualified (e.g., "entities:entity_assist") 46 provider: AI provider - openai, google, or anthropic 47 config: Provider-specific configuration (model, max_output_tokens, thinking_budget, etc.) 48 49 Returns: 50 Use ID (timestamp-based string), or None if the Callosum send failed. 51 """ 52 # Get journal path (for use_id uniqueness check) 53 journal_path = get_journal() 54 55 # Create talents directory if it doesn't exist 56 talents_dir = Path(journal_path) / "talents" 57 talents_dir.mkdir(parents=True, exist_ok=True) 58 59 # Generate monotonic timestamp in milliseconds, ensuring uniqueness 60 global _last_ts 61 ts = now_ms() 62 63 # If same or earlier than last used, increment to ensure uniqueness 64 if ts <= _last_ts: 65 ts = _last_ts + 1 66 67 _last_ts = ts 68 use_id = str(ts) 69 70 # Build request object 71 request = { 72 "event": "request", 73 "ts": ts, 74 "use_id": use_id, 75 "prompt": prompt, 76 "provider": provider, 77 "name": name, 78 } 79 80 # Add optional fields 81 if config: 82 if not isinstance(config, dict): 83 raise ValueError("config must be a dictionary") 84 # Merge config overrides directly into the request for a flat schema 85 request.update(config) 86 87 # Broadcast request to Callosum 88 # Note: callosum_send() signature is send(tract, event, **fields) 89 # Remove "event" from request dict to avoid conflict 90 request_fields = {k: v for k, v in request.items() if k != "event"} 91 sent = callosum_send("cortex", "request", **request_fields) 92 93 if not sent: 94 logger.info("Failed to send cortex request for talent '%s'", name) 95 return None 96 97 return use_id 98 99 100def get_use_log_status(use_id: str) -> str: 101 """Get the status of a specific use from its log file. 102 103 Args: 104 use_id: The use ID (timestamp) 105 106 Returns: 107 "completed" - Use finished (*.jsonl exists) 108 "running" - Use still active (*_active.jsonl exists) 109 "not_found" - No use file exists 110 """ 111 talents_dir = Path(get_journal()) / "talents" 112 _, status = _find_use_file(talents_dir, use_id) 113 return status 114 115 116def wait_for_uses( 117 use_ids: list[str], 118 timeout: int | None = 600, 119) -> tuple[dict[str, str], list[str]]: 120 """Wait for uses to complete via Callosum events. 121 122 Listens for cortex.finish and cortex.error events. Sets up the event 123 listener first, then does an initial file check for uses that may have 124 already completed, and a final file check at timeout as a backstop for 125 any missed events. 126 127 Args: 128 use_ids: List of use IDs to wait for 129 timeout: Maximum wait time in seconds (default 600 = 10 minutes) 130 131 Returns: 132 Tuple of (completed, timed_out) where completed is a dict mapping 133 use_id to end state ("finish" or "error"), and timed_out is a 134 list of use IDs that did not complete within the timeout. 135 """ 136 pending = set(use_ids) 137 completed: dict[str, str] = {} 138 lock = threading.Lock() 139 all_done = threading.Event() 140 141 def on_message(msg: dict) -> None: 142 if msg.get("tract") != "cortex": 143 return 144 use_id = msg.get("use_id") 145 if not use_id: 146 return 147 148 event_type = msg.get("event") 149 if event_type in ("finish", "error"): 150 with lock: 151 if use_id in pending: 152 completed[use_id] = event_type 153 pending.discard(use_id) 154 if not pending: 155 all_done.set() 156 157 # Start listener BEFORE initial check to avoid race condition 158 listener = CallosumConnection() 159 listener.start(callback=on_message) 160 161 try: 162 # Initial file check (with lock since callback may be running) 163 with lock: 164 for use_id in list(pending): 165 end_state = get_use_end_state(use_id) 166 if end_state in ("finish", "error"): 167 completed[use_id] = end_state 168 pending.discard(use_id) 169 170 if not pending: 171 return completed, [] 172 173 # Wait for all completions or timeout 174 all_done.wait(timeout=timeout) 175 176 finally: 177 listener.stop() 178 179 # Final file check for any remaining (backstop for missed events) 180 # Listener is stopped, so no lock needed 181 for use_id in list(pending): 182 end_state = get_use_end_state(use_id) 183 if end_state in ("finish", "error"): 184 logger.info( 185 f"Talent use {use_id} completion event not received but use completed" 186 ) 187 completed[use_id] = end_state 188 pending.discard(use_id) 189 190 return completed, list(pending) 191 192 193def get_use_end_state(use_id: str) -> str: 194 """Get how a completed use ended (finish or error). 195 196 Checks file contents for terminal events even if file is still _active.jsonl, 197 since Callosum broadcasts happen before file rename. 198 199 Args: 200 use_id: The use ID (timestamp) 201 202 Returns: 203 "finish" - Use completed successfully 204 "error" - Use ended with an error 205 "running" - Use is still active (no terminal event in file) 206 "unknown" - Use file not found 207 """ 208 status = get_use_log_status(use_id) 209 if status == "not_found": 210 return "unknown" 211 212 # Read events to find terminal state (even for "running" files that may 213 # have finish event - Callosum broadcast happens before file rename) 214 try: 215 events = read_use_events(use_id) 216 # Find last finish or error event 217 for event in reversed(events): 218 event_type = event.get("event") 219 if event_type == "finish": 220 return "finish" 221 if event_type == "error": 222 return "error" 223 # No terminal event found - still running 224 return "running" 225 except FileNotFoundError: 226 return "unknown" 227 228 229def read_use_events(use_id: str) -> list[Dict[str, Any]]: 230 """Read all events from a use's JSONL log file. 231 232 Args: 233 use_id: The use ID (timestamp) 234 235 Returns: 236 List of event dictionaries in chronological order 237 238 Raises: 239 FileNotFoundError: If the use log doesn't exist 240 """ 241 talents_dir = Path(get_journal()) / "talents" 242 use_file, _status = _find_use_file(talents_dir, use_id) 243 if use_file is None: 244 raise FileNotFoundError(f"Talent log not found: {use_id}") 245 246 events = [] 247 with open(use_file, "r") as f: 248 for line in f: 249 line = line.strip() 250 if not line: 251 continue 252 try: 253 event = json.loads(line) 254 events.append(event) 255 except json.JSONDecodeError: 256 logger.debug(f"Skipping malformed JSON in {use_file}") 257 continue 258 259 return events 260 261 262def cortex_uses( 263 limit: int = 10, 264 offset: int = 0, 265 use_type: str = "all", 266 facet: Optional[str] = None, 267) -> Dict[str, Any]: 268 """List talent uses from the journal with pagination and filtering. 269 270 Args: 271 limit: Maximum number of uses to return (1-100) 272 offset: Number of uses to skip 273 use_type: Filter by "live", "historical", or "all" 274 facet: Optional facet to filter by. If provided, only returns uses 275 that were run in this facet context. None means no filtering. 276 277 Returns: 278 Dictionary with use list and pagination info 279 """ 280 # Validate parameters 281 limit = max(1, min(limit, 100)) 282 offset = max(0, offset) 283 284 talents_dir = Path(get_journal()) / "talents" 285 if not talents_dir.exists(): 286 return { 287 "uses": [], 288 "pagination": { 289 "limit": limit, 290 "offset": offset, 291 "total": 0, 292 "has_more": False, 293 }, 294 "live_count": 0, 295 "historical_count": 0, 296 } 297 298 # Collect all use files 299 all_uses = [] 300 live_count = 0 301 historical_count = 0 302 303 for use_file in talents_dir.glob("*/*.jsonl"): 304 # Determine status from filename 305 is_active = "_active.jsonl" in use_file.name 306 is_pending = "_pending.jsonl" in use_file.name 307 308 # Skip pending files 309 if is_pending: 310 continue 311 312 status = "running" if is_active else "completed" 313 314 # Count by type 315 if status == "running": 316 live_count += 1 317 else: 318 historical_count += 1 319 320 # Filter by requested type 321 if use_type == "live" and status != "running": 322 continue 323 if use_type == "historical" and status != "completed": 324 continue 325 326 # Extract use ID from filename 327 use_id = use_file.stem.replace("_active", "") 328 329 # Read use file to get request info and calculate runtime 330 try: 331 with open(use_file, "r") as f: 332 lines = f.readlines() 333 if not lines: 334 continue 335 336 # Parse first line (request) 337 first_line = lines[0].strip() 338 if not first_line: 339 continue 340 341 request = json.loads(first_line) 342 if request.get("event") != "request": 343 continue 344 345 # Extract facet from request 346 use_facet = request.get("facet") 347 348 # Filter by facet if specified 349 if facet is not None and use_facet != facet: 350 continue 351 352 # Extract basic info 353 use_info = { 354 "id": use_id, 355 "name": request.get("name", "unified"), 356 "start": request.get("ts", 0), 357 "status": status, 358 "prompt": request.get("prompt", ""), 359 "provider": request.get("provider", "openai"), 360 "facet": use_facet, 361 } 362 363 # For completed uses, find finish event to calculate runtime 364 if status == "completed" and len(lines) > 1: 365 # Read last few lines to find finish event (reading backwards is more efficient) 366 for line in reversed(lines[-10:]): # Check last 10 lines 367 line = line.strip() 368 if not line: 369 continue 370 try: 371 event = json.loads(line) 372 if event.get("event") == "finish": 373 end_ts = event.get("ts", 0) 374 if end_ts and use_info["start"]: 375 # Calculate runtime in seconds 376 use_info["runtime_seconds"] = ( 377 end_ts - use_info["start"] 378 ) / 1000.0 379 break 380 except json.JSONDecodeError: 381 continue 382 383 all_uses.append(use_info) 384 except (json.JSONDecodeError, IOError): 385 # Skip malformed files 386 continue 387 388 # Sort by start time (newest first) 389 all_uses.sort(key=lambda x: x["start"], reverse=True) 390 391 # Apply pagination 392 total = len(all_uses) 393 paginated = all_uses[offset : offset + limit] 394 395 return { 396 "uses": paginated, 397 "pagination": { 398 "limit": limit, 399 "offset": offset, 400 "total": total, 401 "has_more": (offset + limit) < total, 402 }, 403 "live_count": live_count, 404 "historical_count": historical_count, 405 }