personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

chat: broadcast chat-stream events + add talent-log endpoint (lode 3a)

Wrap append_chat_event so every persisted event broadcasts on a new
"chat" tract after the write commits. Broadcast failure is swallowed
and does not roll back the durable append.

Add GET /api/chat/talent-log/<use_id>: reads active or completed talent
JSONL, derives status from tail event (running/completed/errored),
returns task + started_at + finished_at + events.

Decisions:
- Tract for UI events: new "chat" tract (keeps raw cortex process events
separate from reduced chat-stream UI events).
- Talent-log endpoint: new /api/chat/talent-log/<use_id> rather than
reusing /app/sol/api/run/<use_id> (sol route 202s on active runs with
no events; chat modal needs live-mode playback).

Cleanup:
- Delete _display_mode() + the "display" field on cortex/finish proxy
emits and on /api/chat/result/<use_id>. No legacy-mode branch in
inbound data. Ban _display_mode in the legacy-chat guard test.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+416 -13
+99 -13
convey/chat.py
··· 14 14 import threading 15 15 from dataclasses import dataclass, field 16 16 from datetime import datetime 17 + from pathlib import Path 17 18 from typing import Any 18 19 19 20 from flask import Blueprint, jsonify, request ··· 26 27 ) 27 28 from convey.utils import error_response 28 29 from think.callosum import CallosumConnection, callosum_send 29 - from think.utils import now_ms 30 + from think.utils import get_journal, now_ms 30 31 31 32 logger = logging.getLogger(__name__) 32 33 ··· 133 134 result = _read_result_state(use_id) 134 135 if result is None: 135 136 return jsonify(error="not found"), 404 137 + return jsonify(result) 138 + 139 + 140 + @chat_bp.route("/talent-log/<use_id>", methods=["GET"]) 141 + def get_talent_log(use_id: str) -> Any: 142 + """Return a talent-use timeline from the JSONL log.""" 143 + result = _read_talent_log(use_id) 144 + if result is None: 145 + return jsonify(error=f"Talent log not found for use_id {use_id}"), 404 136 146 return jsonify(result) 137 147 138 148 ··· 702 712 "finish", 703 713 use_id=use_id, 704 714 result=message, 705 - display=_display_mode(message), 706 715 chat_proxy=True, 707 716 ) 708 717 ··· 721 730 if runtime is not None and runtime.callosum.emit("cortex", event, **fields): 722 731 return 723 732 callosum_send("cortex", event, **fields) 724 - 725 - 726 - def _display_mode(text: str) -> str: 727 - if not text: 728 - return "inline" 729 - if len(text) >= 120 or "\n" in text: 730 - return "panel" 731 - if len(re.split(r"(?<=[.!?])\s", text)) > 2: 732 - return "panel" 733 - return "inline" 734 733 735 734 736 735 def _normalize_location(app_name: Any, path: Any, facet: Any) -> dict[str, str]: ··· 827 826 return { 828 827 "state": "finished", 829 828 "summary": latest_sol.get("text", ""), 830 - "display": _display_mode(str(latest_sol.get("text", ""))), 831 829 } 832 830 return exec_state 831 + 832 + 833 + def _read_talent_log(use_id: str) -> dict[str, Any] | None: 834 + log_path = _find_talent_log_path(use_id) 835 + if log_path is None: 836 + return None 837 + 838 + request_event: dict[str, Any] | None = None 839 + events: list[dict[str, Any]] = [] 840 + started_at: int | None = None 841 + finished_at: int | None = None 842 + 843 + for index, event in enumerate(_read_jsonl_events(log_path)): 844 + event_type = str(event.get("event") or "").strip() 845 + if index == 0 and event_type == "request": 846 + request_event = event 847 + continue 848 + if request_event is None and event_type == "request": 849 + request_event = event 850 + continue 851 + 852 + event.pop("raw", None) 853 + events.append(event) 854 + 855 + event_ts = _event_ts(event) 856 + if event_type == "start" and started_at is None: 857 + started_at = event_ts 858 + elif event_type == "finish": 859 + finished_at = event_ts 860 + elif event_type == "error": 861 + finished_at = event_ts 862 + 863 + request_ts = _event_ts(request_event) 864 + task = None 865 + if request_event is not None: 866 + task = request_event.get("task") or request_event.get("prompt") 867 + if started_at is None: 868 + started_at = request_ts 869 + 870 + last_event_type = str(events[-1].get("event") or "").strip() if events else "" 871 + if last_event_type == "finish": 872 + status = "completed" 873 + elif last_event_type == "error": 874 + status = "errored" 875 + else: 876 + status = "running" 877 + 878 + return { 879 + "use_id": use_id, 880 + "status": status, 881 + "task": task, 882 + "started_at": started_at, 883 + "finished_at": finished_at, 884 + "events": events, 885 + } 886 + 887 + 888 + def _find_talent_log_path(use_id: str) -> Path | None: 889 + talents_dir = Path(get_journal()) / "talents" 890 + if not talents_dir.is_dir(): 891 + return None 892 + 893 + for pattern in (f"*/{use_id}_active.jsonl", f"*/{use_id}.jsonl"): 894 + matches = sorted(talents_dir.glob(pattern)) 895 + if matches: 896 + return matches[0] 897 + return None 898 + 899 + 900 + def _read_jsonl_events(path: Path) -> list[dict[str, Any]]: 901 + parsed: list[dict[str, Any]] = [] 902 + with open(path, encoding="utf-8") as handle: 903 + for line in handle: 904 + line = line.strip() 905 + if not line: 906 + continue 907 + try: 908 + parsed.append(json.loads(line)) 909 + except json.JSONDecodeError: 910 + continue 911 + return parsed 912 + 913 + 914 + def _event_ts(event: dict[str, Any] | None) -> int | None: 915 + if event is None: 916 + return None 917 + value = event.get("ts") 918 + return value if isinstance(value, int) else None 833 919 834 920 835 921 def _reserve_use_id_locked() -> str:
+36
convey/chat_stream.py
··· 4 4 from __future__ import annotations 5 5 6 6 import json 7 + import logging 7 8 import os 9 + import sys 8 10 import threading 9 11 import time 10 12 from datetime import date, datetime 11 13 from pathlib import Path 12 14 from typing import Any 13 15 16 + from think.callosum import callosum_send 14 17 from think.streams import update_stream, write_segment_stream 15 18 from think.utils import day_path, get_journal, segment_key, segment_parse, segment_path 19 + 20 + logger = logging.getLogger(__name__) 16 21 17 22 _CHAT_LOCK = threading.Lock() 18 23 _CHAT_STREAM = "chat" ··· 67 72 stream_info["seq"], 68 73 ) 69 74 75 + _broadcast_chat_event(stored_event) 70 76 return stored_event 71 77 72 78 ··· 179 185 if missing: 180 186 required = ", ".join(missing) 181 187 raise ValueError(f"{kind} requires fields: {required}") 188 + 189 + 190 + def _broadcast_chat_event(stored_event: dict[str, Any]) -> None: 191 + chat_module = sys.modules.get("convey.chat") 192 + runtime = ( 193 + getattr(chat_module, "_runtime", None) if chat_module is not None else None 194 + ) 195 + if runtime is None: 196 + return 197 + 198 + kind = str(stored_event.get("kind") or "") 199 + use_id = str(stored_event.get("use_id") or "") 200 + 201 + try: 202 + if runtime.callosum.emit("chat", kind, **stored_event): 203 + return 204 + if callosum_send("chat", kind, **stored_event): 205 + return 206 + logger.warning( 207 + "Failed to broadcast chat event kind=%s use_id=%s", 208 + kind, 209 + use_id or "-", 210 + ) 211 + except Exception as exc: 212 + logger.warning( 213 + "Failed to broadcast chat event kind=%s use_id=%s: %s", 214 + kind, 215 + use_id or "-", 216 + exc, 217 + ) 182 218 183 219 184 220 def _require_journal_root() -> Path:
+58
tests/test_chat_stream.py
··· 4 4 import json 5 5 from datetime import datetime 6 6 from pathlib import Path 7 + from types import SimpleNamespace 7 8 8 9 import pytest 9 10 ··· 75 76 76 77 assert len(lines) == 1 77 78 assert json.loads(lines[0])["text"] == "hello" 79 + 80 + 81 + def test_append_broadcasts_on_chat_tract_with_stored_event_payload( 82 + tmp_path, monkeypatch 83 + ): 84 + _setup_journal(tmp_path, monkeypatch) 85 + import convey.chat as chat 86 + 87 + calls: list[tuple[str, str, dict]] = [] 88 + 89 + class FakeCallosum: 90 + def emit(self, tract, event, **fields): 91 + calls.append((tract, event, fields)) 92 + return True 93 + 94 + monkeypatch.setattr(chat, "_runtime", SimpleNamespace(callosum=FakeCallosum())) 95 + 96 + event = append_chat_event( 97 + "sol_message", 98 + ts=_ms(2026, 4, 20, 12, 0, 0), 99 + use_id="1713626000000", 100 + text="hello", 101 + notes="ready", 102 + requested_exec=False, 103 + requested_task=None, 104 + ) 105 + 106 + assert calls == [("chat", "sol_message", event)] 107 + 108 + 109 + def test_append_broadcast_failure_is_swallowed(tmp_path, monkeypatch): 110 + journal = _setup_journal(tmp_path, monkeypatch) 111 + import convey.chat as chat 112 + 113 + class FakeCallosum: 114 + def emit(self, tract, event, **fields): 115 + raise RuntimeError("boom") 116 + 117 + monkeypatch.setattr(chat, "_runtime", SimpleNamespace(callosum=FakeCallosum())) 118 + 119 + event = append_chat_event( 120 + "owner_message", 121 + ts=_ms(2026, 4, 20, 12, 0, 0), 122 + text="hello", 123 + app="sol", 124 + path="/chat", 125 + facet="work", 126 + ) 127 + 128 + chat_path = ( 129 + journal / "chronicle" / "20260420" / "chat" / "120000_300" / "chat.jsonl" 130 + ) 131 + 132 + assert event["kind"] == "owner_message" 133 + lines = chat_path.read_text(encoding="utf-8").splitlines() 134 + assert len(lines) == 1 135 + assert json.loads(lines[0]) == event 78 136 79 137 80 138 def test_append_rejects_unknown_kind(tmp_path, monkeypatch):
+222
tests/test_convey_chat.py
··· 3 3 4 4 from __future__ import annotations 5 5 6 + import json 6 7 from datetime import datetime 7 8 8 9 import pytest ··· 32 33 33 34 def _ms(year: int, month: int, day: int, hour: int, minute: int, second: int) -> int: 34 35 return int(datetime(year, month, day, hour, minute, second).timestamp() * 1000) 36 + 37 + 38 + def _write_talent_log( 39 + journal, talent_name: str, filename: str, events: list[dict] 40 + ) -> None: 41 + talent_dir = journal / "talents" / talent_name 42 + talent_dir.mkdir(parents=True, exist_ok=True) 43 + log_path = talent_dir / filename 44 + log_path.write_text( 45 + "\n".join(json.dumps(event) for event in events) + "\n", 46 + encoding="utf-8", 47 + ) 35 48 36 49 37 50 @pytest.fixture ··· 141 154 payload = response.get_json() 142 155 assert payload["state"] == "finished" 143 156 assert payload["summary"] == "stream reply" 157 + 158 + 159 + def test_talent_log_endpoint_returns_completed_run(chat_client, tmp_path): 160 + use_id = "1700000000001" 161 + _write_talent_log( 162 + tmp_path / "journal", 163 + "default", 164 + f"{use_id}.jsonl", 165 + [ 166 + { 167 + "event": "request", 168 + "ts": 1700000000001, 169 + "use_id": use_id, 170 + "prompt": "Search for meetings about project updates", 171 + "name": "default", 172 + "provider": "openai", 173 + }, 174 + { 175 + "event": "start", 176 + "ts": 1700000000100, 177 + "use_id": use_id, 178 + "model": "gpt-4o", 179 + "provider": "openai", 180 + }, 181 + { 182 + "event": "thinking", 183 + "ts": 1700000000300, 184 + "use_id": use_id, 185 + "content": "reasoning", 186 + "raw": {"provider": "openai"}, 187 + }, 188 + { 189 + "event": "finish", 190 + "ts": 1700000000600, 191 + "use_id": use_id, 192 + "result": "done", 193 + }, 194 + ], 195 + ) 196 + 197 + response = chat_client.get(f"/api/chat/talent-log/{use_id}") 198 + 199 + assert response.status_code == 200 200 + payload = response.get_json() 201 + assert payload["use_id"] == use_id 202 + assert payload["status"] == "completed" 203 + assert payload["task"] == "Search for meetings about project updates" 204 + assert payload["started_at"] == 1700000000100 205 + assert payload["finished_at"] == 1700000000600 206 + assert len(payload["events"]) == 3 207 + assert payload["events"][1]["event"] == "thinking" 208 + assert "raw" not in payload["events"][1] 209 + 210 + 211 + def test_talent_log_endpoint_returns_running_active_run(chat_client, tmp_path): 212 + use_id = "1700000000002" 213 + _write_talent_log( 214 + tmp_path / "journal", 215 + "default", 216 + f"{use_id}_active.jsonl", 217 + [ 218 + { 219 + "event": "request", 220 + "ts": 1700000000002, 221 + "use_id": use_id, 222 + "task": "Analyze conversation flow", 223 + }, 224 + { 225 + "event": "start", 226 + "ts": 1700000000102, 227 + "use_id": use_id, 228 + "model": "gpt-4o-mini", 229 + }, 230 + { 231 + "event": "thinking", 232 + "ts": 1700000000202, 233 + "use_id": use_id, 234 + "content": "still working", 235 + }, 236 + ], 237 + ) 238 + 239 + response = chat_client.get(f"/api/chat/talent-log/{use_id}") 240 + 241 + assert response.status_code == 200 242 + payload = response.get_json() 243 + assert payload["status"] == "running" 244 + assert payload["task"] == "Analyze conversation flow" 245 + assert payload["finished_at"] is None 246 + assert payload["events"][-1]["event"] == "thinking" 247 + 248 + 249 + def test_talent_log_endpoint_prefers_active_log(chat_client, tmp_path): 250 + use_id = "1700000000003" 251 + journal = tmp_path / "journal" 252 + _write_talent_log( 253 + journal, 254 + "default", 255 + f"{use_id}_active.jsonl", 256 + [ 257 + { 258 + "event": "request", 259 + "ts": 1700000000003, 260 + "use_id": use_id, 261 + "prompt": "active prompt", 262 + }, 263 + { 264 + "event": "thinking", 265 + "ts": 1700000000103, 266 + "use_id": use_id, 267 + "content": "active content", 268 + }, 269 + ], 270 + ) 271 + _write_talent_log( 272 + journal, 273 + "flow", 274 + f"{use_id}.jsonl", 275 + [ 276 + { 277 + "event": "request", 278 + "ts": 1700000000003, 279 + "use_id": use_id, 280 + "prompt": "completed prompt", 281 + }, 282 + { 283 + "event": "finish", 284 + "ts": 1700000000203, 285 + "use_id": use_id, 286 + "result": "completed result", 287 + }, 288 + ], 289 + ) 290 + 291 + response = chat_client.get(f"/api/chat/talent-log/{use_id}") 292 + 293 + assert response.status_code == 200 294 + payload = response.get_json() 295 + assert payload["status"] == "running" 296 + assert payload["task"] == "active prompt" 297 + assert payload["events"][0]["content"] == "active content" 298 + 299 + 300 + def test_talent_log_endpoint_returns_errored_run(chat_client, tmp_path): 301 + use_id = "1700000000004" 302 + _write_talent_log( 303 + tmp_path / "journal", 304 + "flow", 305 + f"{use_id}.jsonl", 306 + [ 307 + { 308 + "event": "request", 309 + "ts": 1700000000004, 310 + "use_id": use_id, 311 + "prompt": "Analyze flow", 312 + }, 313 + { 314 + "event": "error", 315 + "ts": 1700000000204, 316 + "use_id": use_id, 317 + "error": "Rate limit exceeded", 318 + }, 319 + ], 320 + ) 321 + 322 + response = chat_client.get(f"/api/chat/talent-log/{use_id}") 323 + 324 + assert response.status_code == 200 325 + payload = response.get_json() 326 + assert payload["status"] == "errored" 327 + assert payload["finished_at"] == 1700000000204 328 + assert payload["events"][-1]["event"] == "error" 329 + 330 + 331 + def test_talent_log_endpoint_returns_missing(chat_client): 332 + use_id = "1700000000999" 333 + 334 + response = chat_client.get(f"/api/chat/talent-log/{use_id}") 335 + 336 + assert response.status_code == 404 337 + assert response.get_json() == {"error": f"Talent log not found for use_id {use_id}"} 338 + 339 + 340 + def test_talent_log_endpoint_task_falls_back_to_prompt(chat_client, tmp_path): 341 + use_id = "1700000000005" 342 + _write_talent_log( 343 + tmp_path / "journal", 344 + "default", 345 + f"{use_id}.jsonl", 346 + [ 347 + { 348 + "event": "request", 349 + "ts": 1700000000005, 350 + "use_id": use_id, 351 + "prompt": "Fallback prompt", 352 + }, 353 + { 354 + "event": "finish", 355 + "ts": 1700000000305, 356 + "use_id": use_id, 357 + "result": "done", 358 + }, 359 + ], 360 + ) 361 + 362 + response = chat_client.get(f"/api/chat/talent-log/{use_id}") 363 + 364 + assert response.status_code == 200 365 + assert response.get_json()["task"] == "Fallback prompt"
+1
tests/test_no_legacy_chat_imports.py
··· 18 18 19 19 20 20 BANNED_NAMES = { 21 + _parts("_", "display_", "mode"), 21 22 _parts("record_", "exchange"), 22 23 _parts("build_", "memory_", "context"), 23 24 _parts("INJECTION_", "MARKER"),