personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""Cortex client for managing AI talent requests."""
5
6import json
7import logging
8import threading
9from pathlib import Path
10from typing import Any, Dict, Optional
11
12from think.callosum import CallosumConnection, callosum_send
13from think.utils import get_journal, now_ms
14
15logger = logging.getLogger(__name__)
16
17# Module-level state for monotonic timestamp generation
18_last_ts = 0
19
20
21def _find_use_file(talents_dir: Path, use_id: str) -> tuple[Path | None, str]:
22 """Find a use log file in per-talent subdirectories.
23
24 Returns:
25 Tuple of (file_path, status) where status is
26 "completed", "running", or "not_found".
27 """
28 for match in talents_dir.glob(f"*/{use_id}.jsonl"):
29 return match, "completed"
30 for match in talents_dir.glob(f"*/{use_id}_active.jsonl"):
31 return match, "running"
32 return None, "not_found"
33
34
35def cortex_request(
36 prompt: str,
37 name: str,
38 provider: Optional[str] = None,
39 config: Optional[Dict[str, Any]] = None,
40) -> str | None:
41 """Create a Cortex talent request via Callosum broadcast.
42
43 Args:
44 prompt: The task or question for the talent
45 name: Talent name - system (e.g., "unified") or app-qualified (e.g., "entities:entity_assist")
46 provider: AI provider - openai, google, or anthropic
47 config: Provider-specific configuration (model, max_output_tokens, thinking_budget, etc.)
48
49 Returns:
50 Use ID (timestamp-based string), or None if the Callosum send failed.
51 """
52 # Get journal path (for use_id uniqueness check)
53 journal_path = get_journal()
54
55 # Create talents directory if it doesn't exist
56 talents_dir = Path(journal_path) / "talents"
57 talents_dir.mkdir(parents=True, exist_ok=True)
58
59 # Generate monotonic timestamp in milliseconds, ensuring uniqueness
60 global _last_ts
61 ts = now_ms()
62
63 # If same or earlier than last used, increment to ensure uniqueness
64 if ts <= _last_ts:
65 ts = _last_ts + 1
66
67 _last_ts = ts
68 use_id = str(ts)
69
70 # Build request object
71 request = {
72 "event": "request",
73 "ts": ts,
74 "use_id": use_id,
75 "prompt": prompt,
76 "provider": provider,
77 "name": name,
78 }
79
80 # Add optional fields
81 if config:
82 if not isinstance(config, dict):
83 raise ValueError("config must be a dictionary")
84 # Merge config overrides directly into the request for a flat schema
85 request.update(config)
86
87 # Broadcast request to Callosum
88 # Note: callosum_send() signature is send(tract, event, **fields)
89 # Remove "event" from request dict to avoid conflict
90 request_fields = {k: v for k, v in request.items() if k != "event"}
91 sent = callosum_send("cortex", "request", **request_fields)
92
93 if not sent:
94 logger.info("Failed to send cortex request for talent '%s'", name)
95 return None
96
97 return use_id
98
99
100def get_use_log_status(use_id: str) -> str:
101 """Get the status of a specific use from its log file.
102
103 Args:
104 use_id: The use ID (timestamp)
105
106 Returns:
107 "completed" - Use finished (*.jsonl exists)
108 "running" - Use still active (*_active.jsonl exists)
109 "not_found" - No use file exists
110 """
111 talents_dir = Path(get_journal()) / "talents"
112 _, status = _find_use_file(talents_dir, use_id)
113 return status
114
115
116def wait_for_uses(
117 use_ids: list[str],
118 timeout: int | None = 600,
119) -> tuple[dict[str, str], list[str]]:
120 """Wait for uses to complete via Callosum events.
121
122 Listens for cortex.finish and cortex.error events. Sets up the event
123 listener first, then does an initial file check for uses that may have
124 already completed, and a final file check at timeout as a backstop for
125 any missed events.
126
127 Args:
128 use_ids: List of use IDs to wait for
129 timeout: Maximum wait time in seconds (default 600 = 10 minutes)
130
131 Returns:
132 Tuple of (completed, timed_out) where completed is a dict mapping
133 use_id to end state ("finish" or "error"), and timed_out is a
134 list of use IDs that did not complete within the timeout.
135 """
136 pending = set(use_ids)
137 completed: dict[str, str] = {}
138 lock = threading.Lock()
139 all_done = threading.Event()
140
141 def on_message(msg: dict) -> None:
142 if msg.get("tract") != "cortex":
143 return
144 use_id = msg.get("use_id")
145 if not use_id:
146 return
147
148 event_type = msg.get("event")
149 if event_type in ("finish", "error"):
150 with lock:
151 if use_id in pending:
152 completed[use_id] = event_type
153 pending.discard(use_id)
154 if not pending:
155 all_done.set()
156
157 # Start listener BEFORE initial check to avoid race condition
158 listener = CallosumConnection()
159 listener.start(callback=on_message)
160
161 try:
162 # Initial file check (with lock since callback may be running)
163 with lock:
164 for use_id in list(pending):
165 end_state = get_use_end_state(use_id)
166 if end_state in ("finish", "error"):
167 completed[use_id] = end_state
168 pending.discard(use_id)
169
170 if not pending:
171 return completed, []
172
173 # Wait for all completions or timeout
174 all_done.wait(timeout=timeout)
175
176 finally:
177 listener.stop()
178
179 # Final file check for any remaining (backstop for missed events)
180 # Listener is stopped, so no lock needed
181 for use_id in list(pending):
182 end_state = get_use_end_state(use_id)
183 if end_state in ("finish", "error"):
184 logger.info(
185 f"Talent use {use_id} completion event not received but use completed"
186 )
187 completed[use_id] = end_state
188 pending.discard(use_id)
189
190 return completed, list(pending)
191
192
193def get_use_end_state(use_id: str) -> str:
194 """Get how a completed use ended (finish or error).
195
196 Checks file contents for terminal events even if file is still _active.jsonl,
197 since Callosum broadcasts happen before file rename.
198
199 Args:
200 use_id: The use ID (timestamp)
201
202 Returns:
203 "finish" - Use completed successfully
204 "error" - Use ended with an error
205 "running" - Use is still active (no terminal event in file)
206 "unknown" - Use file not found
207 """
208 status = get_use_log_status(use_id)
209 if status == "not_found":
210 return "unknown"
211
212 # Read events to find terminal state (even for "running" files that may
213 # have finish event - Callosum broadcast happens before file rename)
214 try:
215 events = read_use_events(use_id)
216 # Find last finish or error event
217 for event in reversed(events):
218 event_type = event.get("event")
219 if event_type == "finish":
220 return "finish"
221 if event_type == "error":
222 return "error"
223 # No terminal event found - still running
224 return "running"
225 except FileNotFoundError:
226 return "unknown"
227
228
229def read_use_events(use_id: str) -> list[Dict[str, Any]]:
230 """Read all events from a use's JSONL log file.
231
232 Args:
233 use_id: The use ID (timestamp)
234
235 Returns:
236 List of event dictionaries in chronological order
237
238 Raises:
239 FileNotFoundError: If the use log doesn't exist
240 """
241 talents_dir = Path(get_journal()) / "talents"
242 use_file, _status = _find_use_file(talents_dir, use_id)
243 if use_file is None:
244 raise FileNotFoundError(f"Talent log not found: {use_id}")
245
246 events = []
247 with open(use_file, "r") as f:
248 for line in f:
249 line = line.strip()
250 if not line:
251 continue
252 try:
253 event = json.loads(line)
254 events.append(event)
255 except json.JSONDecodeError:
256 logger.debug(f"Skipping malformed JSON in {use_file}")
257 continue
258
259 return events
260
261
262def cortex_uses(
263 limit: int = 10,
264 offset: int = 0,
265 use_type: str = "all",
266 facet: Optional[str] = None,
267) -> Dict[str, Any]:
268 """List talent uses from the journal with pagination and filtering.
269
270 Args:
271 limit: Maximum number of uses to return (1-100)
272 offset: Number of uses to skip
273 use_type: Filter by "live", "historical", or "all"
274 facet: Optional facet to filter by. If provided, only returns uses
275 that were run in this facet context. None means no filtering.
276
277 Returns:
278 Dictionary with use list and pagination info
279 """
280 # Validate parameters
281 limit = max(1, min(limit, 100))
282 offset = max(0, offset)
283
284 talents_dir = Path(get_journal()) / "talents"
285 if not talents_dir.exists():
286 return {
287 "uses": [],
288 "pagination": {
289 "limit": limit,
290 "offset": offset,
291 "total": 0,
292 "has_more": False,
293 },
294 "live_count": 0,
295 "historical_count": 0,
296 }
297
298 # Collect all use files
299 all_uses = []
300 live_count = 0
301 historical_count = 0
302
303 for use_file in talents_dir.glob("*/*.jsonl"):
304 # Determine status from filename
305 is_active = "_active.jsonl" in use_file.name
306 is_pending = "_pending.jsonl" in use_file.name
307
308 # Skip pending files
309 if is_pending:
310 continue
311
312 status = "running" if is_active else "completed"
313
314 # Count by type
315 if status == "running":
316 live_count += 1
317 else:
318 historical_count += 1
319
320 # Filter by requested type
321 if use_type == "live" and status != "running":
322 continue
323 if use_type == "historical" and status != "completed":
324 continue
325
326 # Extract use ID from filename
327 use_id = use_file.stem.replace("_active", "")
328
329 # Read use file to get request info and calculate runtime
330 try:
331 with open(use_file, "r") as f:
332 lines = f.readlines()
333 if not lines:
334 continue
335
336 # Parse first line (request)
337 first_line = lines[0].strip()
338 if not first_line:
339 continue
340
341 request = json.loads(first_line)
342 if request.get("event") != "request":
343 continue
344
345 # Extract facet from request
346 use_facet = request.get("facet")
347
348 # Filter by facet if specified
349 if facet is not None and use_facet != facet:
350 continue
351
352 # Extract basic info
353 use_info = {
354 "id": use_id,
355 "name": request.get("name", "unified"),
356 "start": request.get("ts", 0),
357 "status": status,
358 "prompt": request.get("prompt", ""),
359 "provider": request.get("provider", "openai"),
360 "facet": use_facet,
361 }
362
363 # For completed uses, find finish event to calculate runtime
364 if status == "completed" and len(lines) > 1:
365 # Read last few lines to find finish event (reading backwards is more efficient)
366 for line in reversed(lines[-10:]): # Check last 10 lines
367 line = line.strip()
368 if not line:
369 continue
370 try:
371 event = json.loads(line)
372 if event.get("event") == "finish":
373 end_ts = event.get("ts", 0)
374 if end_ts and use_info["start"]:
375 # Calculate runtime in seconds
376 use_info["runtime_seconds"] = (
377 end_ts - use_info["start"]
378 ) / 1000.0
379 break
380 except json.JSONDecodeError:
381 continue
382
383 all_uses.append(use_info)
384 except (json.JSONDecodeError, IOError):
385 # Skip malformed files
386 continue
387
388 # Sort by start time (newest first)
389 all_uses.sort(key=lambda x: x["start"], reverse=True)
390
391 # Apply pagination
392 total = len(all_uses)
393 paginated = all_uses[offset : offset + limit]
394
395 return {
396 "uses": paginated,
397 "pagination": {
398 "limit": limit,
399 "offset": offset,
400 "total": total,
401 "has_more": (offset + limit) < total,
402 },
403 "live_count": live_count,
404 "historical_count": historical_count,
405 }