personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""Cortex client for managing AI talent requests."""
5
6import json
7import logging
8import threading
9from pathlib import Path
10from typing import Any, Dict, Optional
11
12from think.callosum import CallosumConnection, callosum_send
13from think.utils import get_journal, now_ms
14
15logger = logging.getLogger(__name__)
16
17# Module-level state for monotonic timestamp generation
18_last_ts = 0
19
20
21def _find_use_file(talents_dir: Path, use_id: str) -> tuple[Path | None, str]:
22 """Find a use log file in per-talent subdirectories.
23
24 Returns:
25 Tuple of (file_path, status) where status is
26 "completed", "running", or "not_found".
27 """
28 for match in talents_dir.glob(f"*/{use_id}.jsonl"):
29 return match, "completed"
30 for match in talents_dir.glob(f"*/{use_id}_active.jsonl"):
31 return match, "running"
32 return None, "not_found"
33
34
35def cortex_request(
36 prompt: str,
37 name: str,
38 provider: Optional[str] = None,
39 config: Optional[Dict[str, Any]] = None,
40 use_id: Optional[str] = None,
41) -> str | None:
42 """Create a Cortex talent request via Callosum broadcast.
43
44 Args:
45 prompt: The task or question for the talent
46 name: Talent name - system (e.g., "chat") or app-qualified (e.g., "entities:entity_assist")
47 provider: AI provider - openai, google, or anthropic
48 config: Provider-specific configuration (model, max_output_tokens, thinking_budget, etc.)
49 use_id: Optional pre-reserved use_id. When omitted, a unique timestamp is allocated.
50
51 Returns:
52 Use ID (timestamp-based string), or None if the Callosum send failed.
53 """
54 # Get journal path (for use_id uniqueness check)
55 journal_path = get_journal()
56
57 # Create talents directory if it doesn't exist
58 talents_dir = Path(journal_path) / "talents"
59 talents_dir.mkdir(parents=True, exist_ok=True)
60
61 # Generate monotonic timestamp in milliseconds, ensuring uniqueness
62 global _last_ts
63 if use_id is None:
64 ts = now_ms()
65
66 if ts <= _last_ts:
67 ts = _last_ts + 1
68
69 _last_ts = ts
70 use_id = str(ts)
71 else:
72 if not use_id.isdigit():
73 raise ValueError("use_id must be a millisecond timestamp string")
74 ts = int(use_id)
75 if ts > _last_ts:
76 _last_ts = ts
77
78 # Build request object
79 request = {
80 "event": "request",
81 "ts": ts,
82 "use_id": use_id,
83 "prompt": prompt,
84 "provider": provider,
85 "name": name,
86 }
87
88 # Add optional fields
89 if config:
90 if not isinstance(config, dict):
91 raise ValueError("config must be a dictionary")
92 # Merge config overrides directly into the request for a flat schema
93 request.update(config)
94
95 # Broadcast request to Callosum
96 # Note: callosum_send() signature is send(tract, event, **fields)
97 # Remove "event" from request dict to avoid conflict
98 request_fields = {k: v for k, v in request.items() if k != "event"}
99 sent = callosum_send("cortex", "request", **request_fields)
100
101 if not sent:
102 logger.info("Failed to send cortex request for talent '%s'", name)
103 return None
104
105 return use_id
106
107
108def get_use_log_status(use_id: str) -> str:
109 """Get the status of a specific use from its log file.
110
111 Args:
112 use_id: The use ID (timestamp)
113
114 Returns:
115 "completed" - Use finished (*.jsonl exists)
116 "running" - Use still active (*_active.jsonl exists)
117 "not_found" - No use file exists
118 """
119 talents_dir = Path(get_journal()) / "talents"
120 _, status = _find_use_file(talents_dir, use_id)
121 return status
122
123
124def wait_for_uses(
125 use_ids: list[str],
126 timeout: int | None = 600,
127) -> tuple[dict[str, str], list[str]]:
128 """Wait for uses to complete via Callosum events.
129
130 Listens for cortex.finish and cortex.error events. Sets up the event
131 listener first, then does an initial file check for uses that may have
132 already completed, and a final file check at timeout as a backstop for
133 any missed events.
134
135 Args:
136 use_ids: List of use IDs to wait for
137 timeout: Maximum wait time in seconds (default 600 = 10 minutes)
138
139 Returns:
140 Tuple of (completed, timed_out) where completed is a dict mapping
141 use_id to end state ("finish" or "error"), and timed_out is a
142 list of use IDs that did not complete within the timeout.
143 """
144 pending = set(use_ids)
145 completed: dict[str, str] = {}
146 lock = threading.Lock()
147 all_done = threading.Event()
148
149 def on_message(msg: dict) -> None:
150 if msg.get("tract") != "cortex":
151 return
152 use_id = msg.get("use_id")
153 if not use_id:
154 return
155
156 event_type = msg.get("event")
157 if event_type in ("finish", "error"):
158 with lock:
159 if use_id in pending:
160 completed[use_id] = event_type
161 pending.discard(use_id)
162 if not pending:
163 all_done.set()
164
165 # Start listener BEFORE initial check to avoid race condition
166 listener = CallosumConnection()
167 listener.start(callback=on_message)
168
169 try:
170 # Initial file check (with lock since callback may be running)
171 with lock:
172 for use_id in list(pending):
173 end_state = get_use_end_state(use_id)
174 if end_state in ("finish", "error"):
175 completed[use_id] = end_state
176 pending.discard(use_id)
177
178 if not pending:
179 return completed, []
180
181 # Wait for all completions or timeout
182 all_done.wait(timeout=timeout)
183
184 finally:
185 listener.stop()
186
187 # Final file check for any remaining (backstop for missed events)
188 # Listener is stopped, so no lock needed
189 for use_id in list(pending):
190 end_state = get_use_end_state(use_id)
191 if end_state in ("finish", "error"):
192 logger.info(
193 f"Talent use {use_id} completion event not received but use completed"
194 )
195 completed[use_id] = end_state
196 pending.discard(use_id)
197
198 return completed, list(pending)
199
200
201def get_use_end_state(use_id: str) -> str:
202 """Get how a completed use ended (finish or error).
203
204 Checks file contents for terminal events even if file is still _active.jsonl,
205 since Callosum broadcasts happen before file rename.
206
207 Args:
208 use_id: The use ID (timestamp)
209
210 Returns:
211 "finish" - Use completed successfully
212 "error" - Use ended with an error
213 "running" - Use is still active (no terminal event in file)
214 "unknown" - Use file not found
215 """
216 status = get_use_log_status(use_id)
217 if status == "not_found":
218 return "unknown"
219
220 # Read events to find terminal state (even for "running" files that may
221 # have finish event - Callosum broadcast happens before file rename)
222 try:
223 events = read_use_events(use_id)
224 # Find last finish or error event
225 for event in reversed(events):
226 event_type = event.get("event")
227 if event_type == "finish":
228 return "finish"
229 if event_type == "error":
230 return "error"
231 # No terminal event found - still running
232 return "running"
233 except FileNotFoundError:
234 return "unknown"
235
236
237def read_use_events(use_id: str) -> list[Dict[str, Any]]:
238 """Read all events from a use's JSONL log file.
239
240 Args:
241 use_id: The use ID (timestamp)
242
243 Returns:
244 List of event dictionaries in chronological order
245
246 Raises:
247 FileNotFoundError: If the use log doesn't exist
248 """
249 talents_dir = Path(get_journal()) / "talents"
250 use_file, _status = _find_use_file(talents_dir, use_id)
251 if use_file is None:
252 raise FileNotFoundError(f"Talent log not found: {use_id}")
253
254 events = []
255 with open(use_file, "r") as f:
256 for line in f:
257 line = line.strip()
258 if not line:
259 continue
260 try:
261 event = json.loads(line)
262 events.append(event)
263 except json.JSONDecodeError:
264 logger.debug(f"Skipping malformed JSON in {use_file}")
265 continue
266
267 return events
268
269
270def cortex_uses(
271 limit: int = 10,
272 offset: int = 0,
273 use_type: str = "all",
274 facet: Optional[str] = None,
275) -> Dict[str, Any]:
276 """List talent uses from the journal with pagination and filtering.
277
278 Legacy unnamed run logs predate the chat rename and are surfaced as chat.
279
280 Args:
281 limit: Maximum number of uses to return (1-100)
282 offset: Number of uses to skip
283 use_type: Filter by "live", "historical", or "all"
284 facet: Optional facet to filter by. If provided, only returns uses
285 that were run in this facet context. None means no filtering.
286
287 Returns:
288 Dictionary with use list and pagination info
289 """
290 # Validate parameters
291 limit = max(1, min(limit, 100))
292 offset = max(0, offset)
293
294 talents_dir = Path(get_journal()) / "talents"
295 if not talents_dir.exists():
296 return {
297 "uses": [],
298 "pagination": {
299 "limit": limit,
300 "offset": offset,
301 "total": 0,
302 "has_more": False,
303 },
304 "live_count": 0,
305 "historical_count": 0,
306 }
307
308 # Collect all use files
309 all_uses = []
310 live_count = 0
311 historical_count = 0
312
313 for use_file in talents_dir.glob("*/*.jsonl"):
314 # Determine status from filename
315 is_active = "_active.jsonl" in use_file.name
316 is_pending = "_pending.jsonl" in use_file.name
317
318 # Skip pending files
319 if is_pending:
320 continue
321
322 status = "running" if is_active else "completed"
323
324 # Count by type
325 if status == "running":
326 live_count += 1
327 else:
328 historical_count += 1
329
330 # Filter by requested type
331 if use_type == "live" and status != "running":
332 continue
333 if use_type == "historical" and status != "completed":
334 continue
335
336 # Extract use ID from filename
337 use_id = use_file.stem.replace("_active", "")
338
339 # Read use file to get request info and calculate runtime
340 try:
341 with open(use_file, "r") as f:
342 lines = f.readlines()
343 if not lines:
344 continue
345
346 # Parse first line (request)
347 first_line = lines[0].strip()
348 if not first_line:
349 continue
350
351 request = json.loads(first_line)
352 if request.get("event") != "request":
353 continue
354
355 # Extract facet from request
356 use_facet = request.get("facet")
357
358 # Filter by facet if specified
359 if facet is not None and use_facet != facet:
360 continue
361
362 # Extract basic info
363 use_info = {
364 "id": use_id,
365 # Legacy unnamed run logs predate the chat rename; treat them as chat.
366 "name": request.get("name", "chat"),
367 "start": request.get("ts", 0),
368 "status": status,
369 "prompt": request.get("prompt", ""),
370 "provider": request.get("provider", "openai"),
371 "facet": use_facet,
372 }
373
374 # For completed uses, find finish event to calculate runtime
375 if status == "completed" and len(lines) > 1:
376 # Read last few lines to find finish event (reading backwards is more efficient)
377 for line in reversed(lines[-10:]): # Check last 10 lines
378 line = line.strip()
379 if not line:
380 continue
381 try:
382 event = json.loads(line)
383 if event.get("event") == "finish":
384 end_ts = event.get("ts", 0)
385 if end_ts and use_info["start"]:
386 # Calculate runtime in seconds
387 use_info["runtime_seconds"] = (
388 end_ts - use_info["start"]
389 ) / 1000.0
390 break
391 except json.JSONDecodeError:
392 continue
393
394 all_uses.append(use_info)
395 except (json.JSONDecodeError, IOError):
396 # Skip malformed files
397 continue
398
399 # Sort by start time (newest first)
400 all_uses.sort(key=lambda x: x["start"], reverse=True)
401
402 # Apply pagination
403 total = len(all_uses)
404 paginated = all_uses[offset : offset + limit]
405
406 return {
407 "uses": paginated,
408 "pagination": {
409 "limit": limit,
410 "offset": offset,
411 "total": total,
412 "has_more": (offset + limit) < total,
413 },
414 "live_count": live_count,
415 "historical_count": historical_count,
416 }