personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 327 lines 10 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4from __future__ import annotations 5 6import json 7import logging 8import os 9import sys 10import threading 11import time 12from datetime import date, datetime 13from pathlib import Path 14from typing import Any 15 16from think.callosum import callosum_send 17from think.indexer.journal import index_file 18from think.streams import update_stream, write_segment_stream 19from think.utils import day_path, get_journal, segment_key, segment_parse, segment_path 20 21logger = logging.getLogger(__name__) 22 23_CHAT_LOCK = threading.Lock() 24_CHAT_STREAM = "chat" 25_SEGMENT_WINDOW_MS = 300_000 26_VALID_KINDS = { 27 "owner_message": ("text", "app", "path", "facet"), 28 "sol_message": ( 29 "use_id", 30 "text", 31 "notes", 32 "requested_target", 33 "requested_task", 34 ), 35 "talent_spawned": ("use_id", "name", "task", "started_at"), 36 "talent_finished": ("use_id", "name", "summary"), 37 "talent_errored": ("use_id", "name", "reason"), 38 "reflection_ready": ("day", "url"), 39 "chat_error": ("reason", "use_id"), 40} 41_TRIGGER_KINDS = {"owner_message", "talent_finished", "talent_errored"} 42 43 44def append_chat_event(kind: str, **fields: Any) -> dict[str, Any]: 45 """Append a chat event to the current 5-minute segment.""" 46 if kind not in _VALID_KINDS: 47 raise ValueError(f"Unknown chat event kind: {kind}") 48 49 event = dict(fields) 50 event.setdefault("ts", int(time.time() * 1000)) 51 _validate_event(kind, event) 52 53 day = _day_for_ts(event["ts"]) 54 _require_journal_root() 55 56 with _CHAT_LOCK: 57 segment = _current_segment_key(day, event["ts"]) 58 segment_dir = segment_path(day, segment, _CHAT_STREAM) 59 chat_path = segment_dir / "chat.jsonl" 60 had_segment_file = chat_path.exists() 61 62 events = _read_events_file(chat_path) 63 stored_event = {"kind": kind, **event} 64 events.append(stored_event) 65 _write_events_file(chat_path, events) 66 67 if not had_segment_file: 68 stream_info = update_stream(_CHAT_STREAM, day, segment, type=_CHAT_STREAM) 69 write_segment_stream( 70 segment_dir, 71 _CHAT_STREAM, 72 stream_info["prev_day"], 73 stream_info["prev_segment"], 74 stream_info["seq"], 75 ) 76 77 try: 78 index_file(get_journal(), str(chat_path)) 79 except Exception: 80 logger.warning( 81 "chat-event-index-failed", 82 extra={ 83 "kind": kind, 84 "use_id": str(stored_event.get("use_id") or ""), 85 "chat_path": str(chat_path), 86 }, 87 exc_info=True, 88 ) 89 _broadcast_chat_event(stored_event) 90 return stored_event 91 92 93def read_chat_events(day: str, limit: int | None = None) -> list[dict[str, Any]]: 94 """Return chat events for ``day`` in ascending timestamp order.""" 95 chat_root = day_path(day, create=False) / _CHAT_STREAM 96 if not chat_root.exists(): 97 return [] 98 99 ordered: list[tuple[int, str, int, dict[str, Any]]] = [] 100 for segment_dir in sorted(chat_root.iterdir(), key=lambda path: path.name): 101 if not segment_dir.is_dir() or segment_key(segment_dir.name) is None: 102 continue 103 chat_path = segment_dir / "chat.jsonl" 104 if not chat_path.exists(): 105 continue 106 for line_no, event in enumerate(_read_events_file(chat_path)): 107 ordered.append( 108 (int(event.get("ts", 0) or 0), segment_dir.name, line_no, event) 109 ) 110 111 ordered.sort(key=lambda item: (item[0], item[1], item[2])) 112 events = [item[3] for item in ordered] 113 if limit is None: 114 return events 115 if limit == 0: 116 return [] 117 return events[-limit:] 118 119 120def read_chat_tail(day: str, limit: int = 20) -> list[dict[str, Any]]: 121 """Return the most recent ``limit`` chat events for ``day``.""" 122 return read_chat_events(day, limit=limit) 123 124 125def reduce_chat_state(day: str) -> dict[str, Any]: 126 """Reduce a day's chat stream into the current chat session state.""" 127 latest_sol_message: dict[str, Any] | None = None 128 active_talents: dict[str, dict[str, Any]] = {} 129 completed_talents: list[dict[str, Any]] = [] 130 131 for event in read_chat_events(day): 132 kind = event.get("kind") 133 if kind == "sol_message": 134 latest_sol_message = { 135 "ts": event["ts"], 136 "use_id": event["use_id"], 137 "text": event["text"], 138 "notes": event["notes"], 139 "requested_target": event["requested_target"], 140 "requested_task": event["requested_task"], 141 } 142 continue 143 144 if kind == "talent_spawned": 145 active_talents[str(event["use_id"])] = { 146 "use_id": event["use_id"], 147 "name": event["name"], 148 "task": event["task"], 149 "started_at": event["started_at"], 150 } 151 continue 152 153 if kind == "talent_finished": 154 started = active_talents.pop(str(event["use_id"]), None) 155 completed_talents.append( 156 { 157 "use_id": event["use_id"], 158 "name": event["name"], 159 "task": started["task"] if started else None, 160 "summary": event["summary"], 161 "finished_at": event["ts"], 162 } 163 ) 164 continue 165 166 if kind == "talent_errored": 167 active_talents.pop(str(event["use_id"]), None) 168 continue 169 170 if kind == "reflection_ready": 171 continue 172 173 return { 174 "latest_sol_message": latest_sol_message, 175 "active_talents": sorted( 176 active_talents.values(), 177 key=lambda talent: ( 178 int(talent.get("started_at", 0) or 0), 179 str(talent["use_id"]), 180 ), 181 ), 182 "completed_talents": completed_talents, 183 } 184 185 186def find_unresponded_trigger(day: str) -> dict[str, Any] | None: 187 """Return the most recent unresolved trigger event for ``day``.""" 188 for event in reversed(read_chat_events(day)): 189 kind = event.get("kind") 190 if kind == "sol_message": 191 return None 192 if kind in _TRIGGER_KINDS: 193 return event 194 return None 195 196 197def _validate_event(kind: str, event: dict[str, Any]) -> None: 198 ts = event.get("ts") 199 if not isinstance(ts, int): 200 raise ValueError(f"chat event ts must be an int, got {type(ts).__name__}") 201 202 missing = [field for field in _VALID_KINDS[kind] if field not in event] 203 if missing: 204 required = ", ".join(missing) 205 raise ValueError(f"{kind} requires fields: {required}") 206 207 208def _broadcast_chat_event(stored_event: dict[str, Any]) -> None: 209 chat_module = sys.modules.get("convey.chat") 210 runtime = ( 211 getattr(chat_module, "_runtime", None) if chat_module is not None else None 212 ) 213 if runtime is None: 214 return 215 216 kind = str(stored_event.get("kind") or "") 217 use_id = str(stored_event.get("use_id") or "") 218 219 try: 220 if runtime.callosum.emit("chat", kind, **stored_event): 221 return 222 if callosum_send("chat", kind, **stored_event): 223 return 224 logger.warning( 225 "Failed to broadcast chat event kind=%s use_id=%s", 226 kind, 227 use_id or "-", 228 ) 229 except Exception as exc: 230 logger.warning( 231 "Failed to broadcast chat event kind=%s use_id=%s: %s", 232 kind, 233 use_id or "-", 234 exc, 235 ) 236 237 238def _require_journal_root() -> Path: 239 journal = Path(get_journal()) 240 if not journal.exists(): 241 raise FileNotFoundError(f"Journal root does not exist: {journal}") 242 if not journal.is_dir(): 243 raise NotADirectoryError(f"Journal root is not a directory: {journal}") 244 return journal 245 246 247def _day_for_ts(ts_ms: int) -> str: 248 return _ts_to_local_datetime(ts_ms).strftime("%Y%m%d") 249 250 251def _current_segment_key(day: str, ts_ms: int) -> str: 252 event_dt = _ts_to_local_datetime(ts_ms) 253 existing = _chat_segments(day) 254 if not existing: 255 return _segment_key_for_start(event_dt) 256 257 current = existing[-1] 258 current_start = _segment_start_datetime(day, current) 259 current_start_ms = int(current_start.timestamp() * 1000) 260 if ts_ms - current_start_ms >= _SEGMENT_WINDOW_MS: 261 return _segment_key_for_start(event_dt) 262 return current 263 264 265def _chat_segments(day: str) -> list[str]: 266 chat_root = day_path(day, create=False) / _CHAT_STREAM 267 if not chat_root.exists(): 268 return [] 269 return sorted( 270 entry.name 271 for entry in chat_root.iterdir() 272 if entry.is_dir() and segment_key(entry.name) is not None 273 ) 274 275 276def _segment_key_for_start(start_dt: datetime) -> str: 277 return f"{start_dt.strftime('%H%M%S')}_300" 278 279 280def _segment_start_datetime(day: str, segment: str) -> datetime: 281 start_time, _ = segment_parse(segment) 282 if start_time is None: 283 raise ValueError(f"Invalid chat segment key: {segment}") 284 return datetime.combine( 285 date.fromisoformat(f"{day[:4]}-{day[4:6]}-{day[6:8]}"), start_time 286 ) 287 288 289def _ts_to_local_datetime(ts_ms: int) -> datetime: 290 return datetime.fromtimestamp(ts_ms / 1000) 291 292 293def _read_events_file(path: Path) -> list[dict[str, Any]]: 294 if not path.exists(): 295 return [] 296 297 events: list[dict[str, Any]] = [] 298 with open(path, "r", encoding="utf-8") as handle: 299 for line_no, raw_line in enumerate(handle, start=1): 300 line = raw_line.strip() 301 if not line: 302 continue 303 try: 304 payload = json.loads(line) 305 except json.JSONDecodeError as exc: 306 raise ValueError(f"Invalid JSON in {path}:{line_no}") from exc 307 if not isinstance(payload, dict): 308 raise ValueError(f"Expected JSON object in {path}:{line_no}") 309 events.append(payload) 310 return events 311 312 313def _write_events_file(path: Path, events: list[dict[str, Any]]) -> None: 314 tmp_path = path.with_suffix(f".{os.getpid()}-{threading.get_ident()}.tmp") 315 try: 316 with open(tmp_path, "w", encoding="utf-8") as handle: 317 for event in events: 318 handle.write(json.dumps(event, ensure_ascii=False)) 319 handle.write("\n") 320 os.replace(tmp_path, path) 321 except Exception: 322 try: 323 if tmp_path.exists(): 324 tmp_path.unlink() 325 except OSError: 326 pass 327 raise