personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

chat: remove unused stream result endpoints

-159
-90
convey/chat.py
··· 11 11 import logging 12 12 import os 13 13 import pprint 14 - import re 15 14 import threading 16 15 from dataclasses import dataclass, field 17 16 from datetime import datetime ··· 36 35 37 36 MAX_ACTIVE_TALENTS = 2 38 37 MAX_LOOP_RETRIES = 3 39 - DEFAULT_STREAM_LIMIT = 200 40 - MAX_STREAM_LIMIT = 1000 41 38 _CHAT_WATCHDOG_SECONDS = 180 42 39 MAX_ACTIVE_REASON = "max active — waiting for one to finish" 43 40 CHAT_TROUBLE_REASON = "chat had trouble — try again" 44 41 CHAT_WATCHDOG_REASON = "chat took too long — try again" 45 42 46 - _DAY_RE = re.compile(r"^\d{8}$") 47 43 _state_lock = threading.Lock() 48 44 _runtime_lock = threading.Lock() 49 45 _current_chat_use_id: str | None = None ··· 114 110 """Return reduced state for today's chat stream.""" 115 111 _recover_chat_if_needed() 116 112 return jsonify(reduce_chat_state(_today_day())) 117 - 118 - 119 - @chat_bp.route("/stream/<day>", methods=["GET"]) 120 - def chat_stream(day: str) -> Any: 121 - """Return ordered chat events for a day.""" 122 - if not _DAY_RE.fullmatch(day): 123 - return error_response("day must be YYYYMMDD", 400) 124 - 125 - limit_raw = request.args.get("limit", str(DEFAULT_STREAM_LIMIT)) 126 - try: 127 - limit = int(limit_raw) 128 - except (TypeError, ValueError): 129 - limit = DEFAULT_STREAM_LIMIT 130 - limit = max(1, min(limit, MAX_STREAM_LIMIT)) 131 - 132 - return jsonify(events=read_chat_events(day, limit=limit)) 133 - 134 - 135 - @chat_bp.route("/result/<use_id>", methods=["GET"]) 136 - def chat_result(use_id: str) -> Any: 137 - """Return chat or exec state from the chat stream.""" 138 - result = _read_result_state(use_id) 139 - if result is None: 140 - return jsonify(error="not found"), 404 141 - return jsonify(result) 142 113 143 114 144 115 @chat_bp.route("/talent-log/<use_id>", methods=["GET"]) ··· 1057 1028 } 1058 1029 1059 1030 1060 - def _read_result_state(use_id: str) -> dict[str, Any] | None: 1061 - day = _day_for_use_id(use_id) 1062 - if day is None: 1063 - return None 1064 - 1065 - latest_sol: dict[str, Any] | None = None 1066 - talent_state: dict[str, Any] | None = None 1067 - chat_error: dict[str, Any] | None = None 1068 - spawned_task: str | None = None 1069 - 1070 - for event in read_chat_events(day): 1071 - kind = event.get("kind") 1072 - if kind == "sol_message" and str(event.get("use_id")) == use_id: 1073 - latest_sol = event 1074 - elif kind == "chat_error" and str(event.get("use_id") or "") == use_id: 1075 - chat_error = event 1076 - elif kind == "talent_spawned" and str(event.get("use_id")) == use_id: 1077 - spawned_task = event.get("task") 1078 - talent_state = {"state": "active", "task": spawned_task} 1079 - elif kind == "talent_finished" and str(event.get("use_id")) == use_id: 1080 - talent_state = { 1081 - "state": "finished", 1082 - "summary": event.get("summary", ""), 1083 - "task": spawned_task, 1084 - } 1085 - elif kind == "talent_errored" and str(event.get("use_id")) == use_id: 1086 - talent_state = { 1087 - "state": "errored", 1088 - "reason": event.get("reason", ""), 1089 - "task": spawned_task, 1090 - } 1091 - 1092 - with _state_lock: 1093 - if _current_chat_use_id == use_id: 1094 - task = None 1095 - if latest_sol and latest_sol.get("requested_target"): 1096 - task = latest_sol.get("requested_task") 1097 - return {"state": "active", "task": task} 1098 - 1099 - if chat_error is not None: 1100 - return { 1101 - "state": "errored", 1102 - "reason": chat_error.get("reason", CHAT_TROUBLE_REASON), 1103 - } 1104 - if latest_sol is not None: 1105 - return { 1106 - "state": "finished", 1107 - "summary": latest_sol.get("text", ""), 1108 - } 1109 - return talent_state 1110 - 1111 - 1112 1031 def _read_talent_log(use_id: str) -> dict[str, Any] | None: 1113 1032 log_path = _find_talent_log_path(use_id) 1114 1033 if log_path is None: ··· 1209 1128 1210 1129 def _today_day() -> str: 1211 1130 return datetime.now().strftime("%Y%m%d") 1212 - 1213 - 1214 - def _day_for_use_id(use_id: str) -> str | None: 1215 - if not use_id.isdigit(): 1216 - return None 1217 - try: 1218 - return datetime.fromtimestamp(int(use_id) / 1000).strftime("%Y%m%d") 1219 - except (OSError, OverflowError, ValueError): 1220 - return None
-3
tests/baselines/api/chat/result.json
··· 1 - { 2 - "error": "not found" 3 - }
-3
tests/baselines/api/chat/stream.json
··· 1 - { 2 - "events": [] 3 - }
-5
tests/test_chat_runtime.py
··· 1303 1303 1304 1304 timers[-1].fire() 1305 1305 1306 - assert chat._read_result_state(talent_use_id) == { 1307 - "state": "errored", 1308 - "reason": "chat took too long — try again", 1309 - "task": "summarize", 1310 - } 1311 1306 parent_errors = [ 1312 1307 event 1313 1308 for event in read_chat_events(chat._today_day())
-44
tests/test_convey_chat.py
··· 129 129 "use_id": "1713626000001", 130 130 } 131 131 ] 132 - assert chat_client.get(f"/api/chat/stream/{day}").status_code == 200 133 132 134 133 135 134 def test_chat_session_retries_unresolved_trigger_when_idle(chat_client, monkeypatch): ··· 187 186 assert len(starts) == 2 188 187 assert starts[0]["trigger"]["type"] == "owner_message" 189 188 assert starts[1]["trigger"]["type"] == "owner_message" 190 - 191 - 192 - def test_stream_endpoint_ordered_with_limit(chat_client): 193 - start = _ms(2026, 4, 20, 12, 0, 0) 194 - for index in range(4): 195 - append_chat_event( 196 - "owner_message", 197 - ts=start + (index * 300_000), 198 - text=f"m{index}", 199 - app="sol", 200 - path="/app/sol", 201 - facet="work", 202 - ) 203 - 204 - response = chat_client.get("/api/chat/stream/20260420?limit=2") 205 - assert response.status_code == 200 206 - payload = response.get_json() 207 - assert [event["text"] for event in payload["events"]] == ["m2", "m3"] 208 - 209 - 210 - def test_result_endpoint_reads_stream_not_talent_log(chat_client, tmp_path): 211 - use_id = str(_ms(2026, 4, 20, 12, 0, 0)) 212 - append_chat_event( 213 - "sol_message", 214 - ts=_ms(2026, 4, 20, 12, 0, 0), 215 - use_id=use_id, 216 - text="stream reply", 217 - notes="done", 218 - requested_target=None, 219 - requested_task=None, 220 - ) 221 - 222 - talents_dir = tmp_path / "journal" / "talents" / "chat" 223 - talents_dir.mkdir(parents=True, exist_ok=True) 224 - (talents_dir / f"{use_id}.jsonl").write_text( 225 - '{"event":"finish","result":"log reply"}\n' 226 - ) 227 - 228 - response = chat_client.get(f"/api/chat/result/{use_id}") 229 - assert response.status_code == 200 230 - payload = response.get_json() 231 - assert payload["state"] == "finished" 232 - assert payload["summary"] == "stream reply" 233 189 234 190 235 191 def test_talent_log_endpoint_returns_completed_run(chat_client, tmp_path):
-14
tests/verify_api.py
··· 96 96 "params": {}, 97 97 "status": 200, 98 98 }, 99 - { 100 - "app": "chat", 101 - "name": "stream", 102 - "path": "/api/chat/stream/20260304", 103 - "params": {"limit": "20"}, 104 - "status": 200, 105 - }, 106 - { 107 - "app": "chat", 108 - "name": "result", 109 - "path": "/api/chat/result/1700000000001", 110 - "params": {}, 111 - "status": 404, 112 - }, 113 99 # apps/activities/routes.py 114 100 { 115 101 "app": "activities",