personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4from __future__ import annotations
5
6import json
7import logging
8import os
9import sys
10import threading
11import time
12from datetime import date, datetime
13from pathlib import Path
14from typing import Any
15
16from think.callosum import callosum_send
17from think.indexer.journal import index_file
18from think.streams import update_stream, write_segment_stream
19from think.utils import day_path, get_journal, segment_key, segment_parse, segment_path
20
21logger = logging.getLogger(__name__)
22
23_CHAT_LOCK = threading.Lock()
24_CHAT_STREAM = "chat"
25_SEGMENT_WINDOW_MS = 300_000
26_VALID_KINDS = {
27 "owner_message": ("text", "app", "path", "facet"),
28 "sol_message": (
29 "use_id",
30 "text",
31 "notes",
32 "requested_target",
33 "requested_task",
34 ),
35 "talent_spawned": ("use_id", "name", "task", "started_at"),
36 "talent_finished": ("use_id", "name", "summary"),
37 "talent_errored": ("use_id", "name", "reason"),
38 "reflection_ready": ("day", "url"),
39 "chat_error": ("reason", "use_id"),
40}
41_TRIGGER_KINDS = {"owner_message", "talent_finished", "talent_errored"}
42
43
44def append_chat_event(kind: str, **fields: Any) -> dict[str, Any]:
45 """Append a chat event to the current 5-minute segment."""
46 if kind not in _VALID_KINDS:
47 raise ValueError(f"Unknown chat event kind: {kind}")
48
49 event = dict(fields)
50 event.setdefault("ts", int(time.time() * 1000))
51 _validate_event(kind, event)
52
53 day = _day_for_ts(event["ts"])
54 _require_journal_root()
55
56 with _CHAT_LOCK:
57 segment = _current_segment_key(day, event["ts"])
58 segment_dir = segment_path(day, segment, _CHAT_STREAM)
59 chat_path = segment_dir / "chat.jsonl"
60 had_segment_file = chat_path.exists()
61
62 events = _read_events_file(chat_path)
63 stored_event = {"kind": kind, **event}
64 events.append(stored_event)
65 _write_events_file(chat_path, events)
66
67 if not had_segment_file:
68 stream_info = update_stream(_CHAT_STREAM, day, segment, type=_CHAT_STREAM)
69 write_segment_stream(
70 segment_dir,
71 _CHAT_STREAM,
72 stream_info["prev_day"],
73 stream_info["prev_segment"],
74 stream_info["seq"],
75 )
76
77 try:
78 index_file(get_journal(), str(chat_path))
79 except Exception:
80 logger.warning(
81 "chat-event-index-failed",
82 extra={
83 "kind": kind,
84 "use_id": str(stored_event.get("use_id") or ""),
85 "chat_path": str(chat_path),
86 },
87 exc_info=True,
88 )
89 _broadcast_chat_event(stored_event)
90 return stored_event
91
92
93def read_chat_events(day: str, limit: int | None = None) -> list[dict[str, Any]]:
94 """Return chat events for ``day`` in ascending timestamp order."""
95 chat_root = day_path(day, create=False) / _CHAT_STREAM
96 if not chat_root.exists():
97 return []
98
99 ordered: list[tuple[int, str, int, dict[str, Any]]] = []
100 for segment_dir in sorted(chat_root.iterdir(), key=lambda path: path.name):
101 if not segment_dir.is_dir() or segment_key(segment_dir.name) is None:
102 continue
103 chat_path = segment_dir / "chat.jsonl"
104 if not chat_path.exists():
105 continue
106 for line_no, event in enumerate(_read_events_file(chat_path)):
107 ordered.append(
108 (int(event.get("ts", 0) or 0), segment_dir.name, line_no, event)
109 )
110
111 ordered.sort(key=lambda item: (item[0], item[1], item[2]))
112 events = [item[3] for item in ordered]
113 if limit is None:
114 return events
115 if limit == 0:
116 return []
117 return events[-limit:]
118
119
120def read_chat_tail(day: str, limit: int = 20) -> list[dict[str, Any]]:
121 """Return the most recent ``limit`` chat events for ``day``."""
122 return read_chat_events(day, limit=limit)
123
124
125def reduce_chat_state(day: str) -> dict[str, Any]:
126 """Reduce a day's chat stream into the current chat session state."""
127 latest_sol_message: dict[str, Any] | None = None
128 active_talents: dict[str, dict[str, Any]] = {}
129 completed_talents: list[dict[str, Any]] = []
130
131 for event in read_chat_events(day):
132 kind = event.get("kind")
133 if kind == "sol_message":
134 latest_sol_message = {
135 "ts": event["ts"],
136 "use_id": event["use_id"],
137 "text": event["text"],
138 "notes": event["notes"],
139 "requested_target": event["requested_target"],
140 "requested_task": event["requested_task"],
141 }
142 continue
143
144 if kind == "talent_spawned":
145 active_talents[str(event["use_id"])] = {
146 "use_id": event["use_id"],
147 "name": event["name"],
148 "task": event["task"],
149 "started_at": event["started_at"],
150 }
151 continue
152
153 if kind == "talent_finished":
154 started = active_talents.pop(str(event["use_id"]), None)
155 completed_talents.append(
156 {
157 "use_id": event["use_id"],
158 "name": event["name"],
159 "task": started["task"] if started else None,
160 "summary": event["summary"],
161 "finished_at": event["ts"],
162 }
163 )
164 continue
165
166 if kind == "talent_errored":
167 active_talents.pop(str(event["use_id"]), None)
168 continue
169
170 if kind == "reflection_ready":
171 continue
172
173 return {
174 "latest_sol_message": latest_sol_message,
175 "active_talents": sorted(
176 active_talents.values(),
177 key=lambda talent: (
178 int(talent.get("started_at", 0) or 0),
179 str(talent["use_id"]),
180 ),
181 ),
182 "completed_talents": completed_talents,
183 }
184
185
186def find_unresponded_trigger(day: str) -> dict[str, Any] | None:
187 """Return the most recent unresolved trigger event for ``day``."""
188 for event in reversed(read_chat_events(day)):
189 kind = event.get("kind")
190 if kind == "sol_message":
191 return None
192 if kind in _TRIGGER_KINDS:
193 return event
194 return None
195
196
197def _validate_event(kind: str, event: dict[str, Any]) -> None:
198 ts = event.get("ts")
199 if not isinstance(ts, int):
200 raise ValueError(f"chat event ts must be an int, got {type(ts).__name__}")
201
202 missing = [field for field in _VALID_KINDS[kind] if field not in event]
203 if missing:
204 required = ", ".join(missing)
205 raise ValueError(f"{kind} requires fields: {required}")
206
207
208def _broadcast_chat_event(stored_event: dict[str, Any]) -> None:
209 chat_module = sys.modules.get("convey.chat")
210 runtime = (
211 getattr(chat_module, "_runtime", None) if chat_module is not None else None
212 )
213 if runtime is None:
214 return
215
216 kind = str(stored_event.get("kind") or "")
217 use_id = str(stored_event.get("use_id") or "")
218
219 try:
220 if runtime.callosum.emit("chat", kind, **stored_event):
221 return
222 if callosum_send("chat", kind, **stored_event):
223 return
224 logger.warning(
225 "Failed to broadcast chat event kind=%s use_id=%s",
226 kind,
227 use_id or "-",
228 )
229 except Exception as exc:
230 logger.warning(
231 "Failed to broadcast chat event kind=%s use_id=%s: %s",
232 kind,
233 use_id or "-",
234 exc,
235 )
236
237
238def _require_journal_root() -> Path:
239 journal = Path(get_journal())
240 if not journal.exists():
241 raise FileNotFoundError(f"Journal root does not exist: {journal}")
242 if not journal.is_dir():
243 raise NotADirectoryError(f"Journal root is not a directory: {journal}")
244 return journal
245
246
247def _day_for_ts(ts_ms: int) -> str:
248 return _ts_to_local_datetime(ts_ms).strftime("%Y%m%d")
249
250
251def _current_segment_key(day: str, ts_ms: int) -> str:
252 event_dt = _ts_to_local_datetime(ts_ms)
253 existing = _chat_segments(day)
254 if not existing:
255 return _segment_key_for_start(event_dt)
256
257 current = existing[-1]
258 current_start = _segment_start_datetime(day, current)
259 current_start_ms = int(current_start.timestamp() * 1000)
260 if ts_ms - current_start_ms >= _SEGMENT_WINDOW_MS:
261 return _segment_key_for_start(event_dt)
262 return current
263
264
265def _chat_segments(day: str) -> list[str]:
266 chat_root = day_path(day, create=False) / _CHAT_STREAM
267 if not chat_root.exists():
268 return []
269 return sorted(
270 entry.name
271 for entry in chat_root.iterdir()
272 if entry.is_dir() and segment_key(entry.name) is not None
273 )
274
275
276def _segment_key_for_start(start_dt: datetime) -> str:
277 return f"{start_dt.strftime('%H%M%S')}_300"
278
279
280def _segment_start_datetime(day: str, segment: str) -> datetime:
281 start_time, _ = segment_parse(segment)
282 if start_time is None:
283 raise ValueError(f"Invalid chat segment key: {segment}")
284 return datetime.combine(
285 date.fromisoformat(f"{day[:4]}-{day[4:6]}-{day[6:8]}"), start_time
286 )
287
288
289def _ts_to_local_datetime(ts_ms: int) -> datetime:
290 return datetime.fromtimestamp(ts_ms / 1000)
291
292
293def _read_events_file(path: Path) -> list[dict[str, Any]]:
294 if not path.exists():
295 return []
296
297 events: list[dict[str, Any]] = []
298 with open(path, "r", encoding="utf-8") as handle:
299 for line_no, raw_line in enumerate(handle, start=1):
300 line = raw_line.strip()
301 if not line:
302 continue
303 try:
304 payload = json.loads(line)
305 except json.JSONDecodeError as exc:
306 raise ValueError(f"Invalid JSON in {path}:{line_no}") from exc
307 if not isinstance(payload, dict):
308 raise ValueError(f"Expected JSON object in {path}:{line_no}")
309 events.append(payload)
310 return events
311
312
313def _write_events_file(path: Path, events: list[dict[str, Any]]) -> None:
314 tmp_path = path.with_suffix(f".{os.getpid()}-{threading.get_ident()}.tmp")
315 try:
316 with open(tmp_path, "w", encoding="utf-8") as handle:
317 for event in events:
318 handle.write(json.dumps(event, ensure_ascii=False))
319 handle.write("\n")
320 os.replace(tmp_path, path)
321 except Exception:
322 try:
323 if tmp_path.exists():
324 tmp_path.unlink()
325 except OSError:
326 pass
327 raise