personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

chat: add chat stream writer + formatter

First of three sub-lodes for the chat backend rewrite (parent plan:
chat-refactor). Establishes the chat stream persistence layer that the
forthcoming singleton backend (2c) will write to, and the formatter +
indexer wiring that makes chat turns searchable via `sol call journal
search` after rescan.

`convey/chat_stream.py` is the sole write-owner for
`chronicle/*/chat/*/chat.jsonl`. Segment rollover mirrors the 300-second
window semantics from `think/importers/shared.py::_window_messages`.
`think/chat_formatter.py` registers before the `*/*/*/talents/*.md`
fallback so chat events are indexed as their own domain. No runtime
callers yet — those land in 2a (talent layer) and 2c (backend flip).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+839
+273
convey/chat_stream.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + from __future__ import annotations 5 + 6 + import json 7 + import os 8 + import threading 9 + import time 10 + from datetime import date, datetime 11 + from pathlib import Path 12 + from typing import Any 13 + 14 + from think.streams import update_stream, write_segment_stream 15 + from think.utils import day_path, get_journal, segment_key, segment_parse, segment_path 16 + 17 + _CHAT_LOCK = threading.Lock() 18 + _CHAT_STREAM = "chat" 19 + _SEGMENT_WINDOW_MS = 300_000 20 + _VALID_KINDS = { 21 + "owner_message": ("text", "app", "path", "facet"), 22 + "sol_message": ( 23 + "use_id", 24 + "text", 25 + "notes", 26 + "requested_exec", 27 + "requested_task", 28 + ), 29 + "talent_spawned": ("use_id", "name", "task", "started_at"), 30 + "talent_finished": ("use_id", "name", "summary"), 31 + "talent_errored": ("use_id", "name", "reason"), 32 + "chat_error": ("reason", "use_id"), 33 + } 34 + _TRIGGER_KINDS = {"owner_message", "talent_finished", "talent_errored"} 35 + 36 + 37 + def append_chat_event(kind: str, **fields: Any) -> dict[str, Any]: 38 + """Append a chat event to the current 5-minute segment.""" 39 + if kind not in _VALID_KINDS: 40 + raise ValueError(f"Unknown chat event kind: {kind}") 41 + 42 + event = dict(fields) 43 + event.setdefault("ts", int(time.time() * 1000)) 44 + _validate_event(kind, event) 45 + 46 + day = _day_for_ts(event["ts"]) 47 + _require_journal_root() 48 + 49 + with _CHAT_LOCK: 50 + segment = _current_segment_key(day, event["ts"]) 51 + segment_dir = segment_path(day, segment, _CHAT_STREAM) 52 + chat_path = segment_dir / "chat.jsonl" 53 + had_segment_file = chat_path.exists() 54 + 55 + events = _read_events_file(chat_path) 56 + stored_event = {"kind": kind, **event} 57 + events.append(stored_event) 58 + _write_events_file(chat_path, events) 59 + 60 + if not had_segment_file: 61 + stream_info = update_stream(_CHAT_STREAM, day, segment, type=_CHAT_STREAM) 62 + write_segment_stream( 63 + segment_dir, 64 + _CHAT_STREAM, 65 + stream_info["prev_day"], 66 + stream_info["prev_segment"], 67 + stream_info["seq"], 68 + ) 69 + 70 + return stored_event 71 + 72 + 73 + def read_chat_events(day: str, limit: int | None = None) -> list[dict[str, Any]]: 74 + """Return chat events for ``day`` in ascending timestamp order.""" 75 + chat_root = day_path(day, create=False) / _CHAT_STREAM 76 + if not chat_root.exists(): 77 + return [] 78 + 79 + ordered: list[tuple[int, str, int, dict[str, Any]]] = [] 80 + for segment_dir in sorted(chat_root.iterdir(), key=lambda path: path.name): 81 + if not segment_dir.is_dir() or segment_key(segment_dir.name) is None: 82 + continue 83 + chat_path = segment_dir / "chat.jsonl" 84 + if not chat_path.exists(): 85 + continue 86 + for line_no, event in enumerate(_read_events_file(chat_path)): 87 + ordered.append( 88 + (int(event.get("ts", 0) or 0), segment_dir.name, line_no, event) 89 + ) 90 + 91 + ordered.sort(key=lambda item: (item[0], item[1], item[2])) 92 + events = [item[3] for item in ordered] 93 + if limit is None: 94 + return events 95 + if limit == 0: 96 + return [] 97 + return events[-limit:] 98 + 99 + 100 + def read_chat_tail(day: str, limit: int = 20) -> list[dict[str, Any]]: 101 + """Return the most recent ``limit`` chat events for ``day``.""" 102 + return read_chat_events(day, limit=limit) 103 + 104 + 105 + def reduce_chat_state(day: str) -> dict[str, Any]: 106 + """Reduce a day's chat stream into the current chat session state.""" 107 + latest_sol_message: dict[str, Any] | None = None 108 + active_talents: dict[str, dict[str, Any]] = {} 109 + completed_talents: list[dict[str, Any]] = [] 110 + 111 + for event in read_chat_events(day): 112 + kind = event.get("kind") 113 + if kind == "sol_message": 114 + latest_sol_message = { 115 + "ts": event["ts"], 116 + "use_id": event["use_id"], 117 + "text": event["text"], 118 + "notes": event["notes"], 119 + "requested_exec": event["requested_exec"], 120 + "requested_task": event["requested_task"], 121 + } 122 + continue 123 + 124 + if kind == "talent_spawned": 125 + active_talents[str(event["use_id"])] = { 126 + "use_id": event["use_id"], 127 + "name": event["name"], 128 + "task": event["task"], 129 + "started_at": event["started_at"], 130 + } 131 + continue 132 + 133 + if kind == "talent_finished": 134 + started = active_talents.pop(str(event["use_id"]), None) 135 + completed_talents.append( 136 + { 137 + "use_id": event["use_id"], 138 + "name": event["name"], 139 + "task": started["task"] if started else None, 140 + "summary": event["summary"], 141 + "finished_at": event["ts"], 142 + } 143 + ) 144 + continue 145 + 146 + if kind == "talent_errored": 147 + active_talents.pop(str(event["use_id"]), None) 148 + 149 + return { 150 + "latest_sol_message": latest_sol_message, 151 + "active_talents": sorted( 152 + active_talents.values(), 153 + key=lambda talent: ( 154 + int(talent.get("started_at", 0) or 0), 155 + str(talent["use_id"]), 156 + ), 157 + ), 158 + "completed_talents": completed_talents, 159 + } 160 + 161 + 162 + def find_unresponded_trigger(day: str) -> dict[str, Any] | None: 163 + """Return the most recent unresolved trigger event for ``day``.""" 164 + for event in reversed(read_chat_events(day)): 165 + kind = event.get("kind") 166 + if kind == "sol_message": 167 + return None 168 + if kind in _TRIGGER_KINDS: 169 + return event 170 + return None 171 + 172 + 173 + def _validate_event(kind: str, event: dict[str, Any]) -> None: 174 + ts = event.get("ts") 175 + if not isinstance(ts, int): 176 + raise ValueError(f"chat event ts must be an int, got {type(ts).__name__}") 177 + 178 + missing = [field for field in _VALID_KINDS[kind] if field not in event] 179 + if missing: 180 + required = ", ".join(missing) 181 + raise ValueError(f"{kind} requires fields: {required}") 182 + 183 + 184 + def _require_journal_root() -> Path: 185 + journal = Path(get_journal()) 186 + if not journal.exists(): 187 + raise FileNotFoundError(f"Journal root does not exist: {journal}") 188 + if not journal.is_dir(): 189 + raise NotADirectoryError(f"Journal root is not a directory: {journal}") 190 + return journal 191 + 192 + 193 + def _day_for_ts(ts_ms: int) -> str: 194 + return _ts_to_local_datetime(ts_ms).strftime("%Y%m%d") 195 + 196 + 197 + def _current_segment_key(day: str, ts_ms: int) -> str: 198 + event_dt = _ts_to_local_datetime(ts_ms) 199 + existing = _chat_segments(day) 200 + if not existing: 201 + return _segment_key_for_start(event_dt) 202 + 203 + current = existing[-1] 204 + current_start = _segment_start_datetime(day, current) 205 + current_start_ms = int(current_start.timestamp() * 1000) 206 + if ts_ms - current_start_ms >= _SEGMENT_WINDOW_MS: 207 + return _segment_key_for_start(event_dt) 208 + return current 209 + 210 + 211 + def _chat_segments(day: str) -> list[str]: 212 + chat_root = day_path(day, create=False) / _CHAT_STREAM 213 + if not chat_root.exists(): 214 + return [] 215 + return sorted( 216 + entry.name 217 + for entry in chat_root.iterdir() 218 + if entry.is_dir() and segment_key(entry.name) is not None 219 + ) 220 + 221 + 222 + def _segment_key_for_start(start_dt: datetime) -> str: 223 + return f"{start_dt.strftime('%H%M%S')}_300" 224 + 225 + 226 + def _segment_start_datetime(day: str, segment: str) -> datetime: 227 + start_time, _ = segment_parse(segment) 228 + if start_time is None: 229 + raise ValueError(f"Invalid chat segment key: {segment}") 230 + return datetime.combine( 231 + date.fromisoformat(f"{day[:4]}-{day[4:6]}-{day[6:8]}"), start_time 232 + ) 233 + 234 + 235 + def _ts_to_local_datetime(ts_ms: int) -> datetime: 236 + return datetime.fromtimestamp(ts_ms / 1000) 237 + 238 + 239 + def _read_events_file(path: Path) -> list[dict[str, Any]]: 240 + if not path.exists(): 241 + return [] 242 + 243 + events: list[dict[str, Any]] = [] 244 + with open(path, "r", encoding="utf-8") as handle: 245 + for line_no, raw_line in enumerate(handle, start=1): 246 + line = raw_line.strip() 247 + if not line: 248 + continue 249 + try: 250 + payload = json.loads(line) 251 + except json.JSONDecodeError as exc: 252 + raise ValueError(f"Invalid JSON in {path}:{line_no}") from exc 253 + if not isinstance(payload, dict): 254 + raise ValueError(f"Expected JSON object in {path}:{line_no}") 255 + events.append(payload) 256 + return events 257 + 258 + 259 + def _write_events_file(path: Path, events: list[dict[str, Any]]) -> None: 260 + tmp_path = path.with_suffix(f".{os.getpid()}-{threading.get_ident()}.tmp") 261 + try: 262 + with open(tmp_path, "w", encoding="utf-8") as handle: 263 + for event in events: 264 + handle.write(json.dumps(event, ensure_ascii=False)) 265 + handle.write("\n") 266 + os.replace(tmp_path, path) 267 + except Exception: 268 + try: 269 + if tmp_path.exists(): 270 + tmp_path.unlink() 271 + except OSError: 272 + pass 273 + raise
+105
tests/test_chat_formatter.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + 5 + from think.chat_formatter import format_chat 6 + from think.formatters import get_formatter 7 + 8 + 9 + def test_format_chat_produces_markdown_for_each_kind(): 10 + entries = [ 11 + { 12 + "ts": 1, 13 + "kind": "owner_message", 14 + "text": "Need a diff", 15 + }, 16 + { 17 + "ts": 2, 18 + "kind": "sol_message", 19 + "text": "I can do that", 20 + }, 21 + { 22 + "ts": 3, 23 + "kind": "talent_spawned", 24 + "name": "exec", 25 + "task": "compare drafts", 26 + }, 27 + { 28 + "ts": 4, 29 + "kind": "talent_finished", 30 + "name": "exec", 31 + "summary": "summarized the differences", 32 + }, 33 + { 34 + "ts": 5, 35 + "kind": "talent_errored", 36 + "name": "exec", 37 + "reason": "repo unavailable", 38 + }, 39 + { 40 + "ts": 6, 41 + "kind": "chat_error", 42 + "reason": "chat had trouble — try again", 43 + }, 44 + ] 45 + 46 + chunks, meta = format_chat( 47 + entries, 48 + {"owner_name": "Alice", "agent_name": "Sol-agent"}, 49 + ) 50 + 51 + assert meta == {"indexer": {"agent": "chat"}} 52 + assert [chunk["markdown"] for chunk in chunks] == [ 53 + "**Alice** Need a diff", 54 + "**Sol-agent** I can do that", 55 + "*[exec spawned: compare drafts]*", 56 + "*[exec finished: summarized the differences]*", 57 + "*[exec errored: repo unavailable]*", 58 + "*[chat trouble: chat had trouble — try again]*", 59 + ] 60 + 61 + 62 + def test_format_chat_uses_identity_owner_and_agent_names(monkeypatch): 63 + monkeypatch.setattr( 64 + "think.chat_formatter.get_config", 65 + lambda: { 66 + "identity": {"name": "Alice Smith", "preferred": "Alice"}, 67 + "agent": {"name": "Sol-agent"}, 68 + }, 69 + ) 70 + 71 + chunks, _meta = format_chat( 72 + [ 73 + {"ts": 1, "kind": "owner_message", "text": "hello"}, 74 + {"ts": 2, "kind": "sol_message", "text": "hi"}, 75 + ] 76 + ) 77 + 78 + assert [chunk["markdown"] for chunk in chunks] == [ 79 + "**Alice** hello", 80 + "**Sol-agent** hi", 81 + ] 82 + 83 + 84 + def test_format_chat_fallback_labels_when_identity_missing(monkeypatch): 85 + monkeypatch.setattr("think.chat_formatter.get_config", lambda: {}) 86 + 87 + chunks, _meta = format_chat( 88 + [ 89 + {"ts": 1, "kind": "owner_message", "text": "hello"}, 90 + {"ts": 2, "kind": "sol_message", "text": "hi"}, 91 + ] 92 + ) 93 + 94 + assert [chunk["markdown"] for chunk in chunks] == [ 95 + "**Owner** hello", 96 + "**Sol** hi", 97 + ] 98 + 99 + 100 + def test_get_formatter_chat_jsonl_wins_over_talents_fallback(): 101 + formatter = get_formatter("20260420/chat/120000_300/chat.jsonl") 102 + 103 + assert formatter is not None 104 + assert formatter.__module__ == "think.chat_formatter" 105 + assert formatter.__name__ == "format_chat"
+385
tests/test_chat_stream.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + import json 5 + from datetime import datetime 6 + from pathlib import Path 7 + 8 + import pytest 9 + 10 + from convey.chat_stream import ( 11 + append_chat_event, 12 + find_unresponded_trigger, 13 + read_chat_events, 14 + read_chat_tail, 15 + reduce_chat_state, 16 + ) 17 + 18 + 19 + def _setup_journal(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Path: 20 + journal = tmp_path / "journal" 21 + journal.mkdir() 22 + monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 23 + return journal 24 + 25 + 26 + def _ms(year: int, month: int, day: int, hour: int, minute: int, second: int) -> int: 27 + return int(datetime(year, month, day, hour, minute, second).timestamp() * 1000) 28 + 29 + 30 + def test_append_owner_message_creates_segment_and_jsonl(tmp_path, monkeypatch): 31 + journal = _setup_journal(tmp_path, monkeypatch) 32 + ts = _ms(2026, 4, 20, 12, 0, 0) 33 + 34 + event = append_chat_event( 35 + "owner_message", 36 + ts=ts, 37 + text="hello", 38 + app="sol", 39 + path="/chat", 40 + facet="work", 41 + ) 42 + 43 + segment_dir = journal / "chronicle" / "20260420" / "chat" / "120000_300" 44 + chat_path = segment_dir / "chat.jsonl" 45 + 46 + assert event["kind"] == "owner_message" 47 + assert event["ts"] == ts 48 + assert segment_dir.is_dir() 49 + assert (segment_dir / "stream.json").is_file() 50 + assert chat_path.is_file() 51 + assert (journal / "streams" / "chat.json").is_file() 52 + 53 + entries = [ 54 + json.loads(line) for line in chat_path.read_text(encoding="utf-8").splitlines() 55 + ] 56 + assert entries == [event] 57 + 58 + 59 + def test_append_is_atomic(tmp_path, monkeypatch): 60 + journal = _setup_journal(tmp_path, monkeypatch) 61 + 62 + append_chat_event( 63 + "owner_message", 64 + ts=_ms(2026, 4, 20, 12, 0, 0), 65 + text="hello", 66 + app="sol", 67 + path="/chat", 68 + facet="work", 69 + ) 70 + 71 + chat_path = ( 72 + journal / "chronicle" / "20260420" / "chat" / "120000_300" / "chat.jsonl" 73 + ) 74 + lines = chat_path.read_text(encoding="utf-8").splitlines() 75 + 76 + assert len(lines) == 1 77 + assert json.loads(lines[0])["text"] == "hello" 78 + 79 + 80 + def test_append_rejects_unknown_kind(tmp_path, monkeypatch): 81 + _setup_journal(tmp_path, monkeypatch) 82 + 83 + with pytest.raises(ValueError, match="Unknown chat event kind"): 84 + append_chat_event("unknown", ts=1) 85 + 86 + 87 + def test_append_rejects_missing_required_fields(tmp_path, monkeypatch): 88 + _setup_journal(tmp_path, monkeypatch) 89 + 90 + with pytest.raises(ValueError, match="owner_message requires fields: path, facet"): 91 + append_chat_event( 92 + "owner_message", 93 + ts=1, 94 + text="hello", 95 + app="sol", 96 + ) 97 + 98 + 99 + def test_append_rolls_at_300_seconds(tmp_path, monkeypatch): 100 + journal = _setup_journal(tmp_path, monkeypatch) 101 + start = _ms(2026, 4, 20, 12, 0, 0) 102 + 103 + append_chat_event( 104 + "owner_message", 105 + ts=start, 106 + text="first", 107 + app="sol", 108 + path="/chat", 109 + facet="work", 110 + ) 111 + append_chat_event( 112 + "owner_message", 113 + ts=start + 299_999, 114 + text="second", 115 + app="sol", 116 + path="/chat", 117 + facet="work", 118 + ) 119 + append_chat_event( 120 + "owner_message", 121 + ts=start + 300_000, 122 + text="third", 123 + app="sol", 124 + path="/chat", 125 + facet="work", 126 + ) 127 + 128 + chat_root = journal / "chronicle" / "20260420" / "chat" 129 + assert sorted(path.name for path in chat_root.iterdir()) == [ 130 + "120000_300", 131 + "120500_300", 132 + ] 133 + 134 + 135 + def test_append_rolls_at_day_cross(tmp_path, monkeypatch): 136 + journal = _setup_journal(tmp_path, monkeypatch) 137 + first = _ms(2026, 4, 20, 23, 59, 59) 138 + second = _ms(2026, 4, 21, 0, 0, 0) 139 + 140 + append_chat_event( 141 + "owner_message", 142 + ts=first, 143 + text="late", 144 + app="sol", 145 + path="/chat", 146 + facet="work", 147 + ) 148 + append_chat_event( 149 + "owner_message", 150 + ts=second, 151 + text="next day", 152 + app="sol", 153 + path="/chat", 154 + facet="work", 155 + ) 156 + 157 + assert ( 158 + journal / "chronicle" / "20260420" / "chat" / "235959_300" / "chat.jsonl" 159 + ).is_file() 160 + assert ( 161 + journal / "chronicle" / "20260421" / "chat" / "000000_300" / "chat.jsonl" 162 + ).is_file() 163 + 164 + 165 + def test_read_chat_events_returns_ordered(tmp_path, monkeypatch): 166 + _setup_journal(tmp_path, monkeypatch) 167 + start = _ms(2026, 4, 20, 12, 0, 0) 168 + 169 + append_chat_event( 170 + "owner_message", 171 + ts=start, 172 + text="first", 173 + app="sol", 174 + path="/chat", 175 + facet="work", 176 + ) 177 + append_chat_event( 178 + "owner_message", 179 + ts=start + 300_000, 180 + text="second", 181 + app="sol", 182 + path="/chat", 183 + facet="work", 184 + ) 185 + append_chat_event( 186 + "owner_message", 187 + ts=start + 600_000, 188 + text="third", 189 + app="sol", 190 + path="/chat", 191 + facet="work", 192 + ) 193 + 194 + events = read_chat_events("20260420") 195 + assert [event["text"] for event in events] == ["first", "second", "third"] 196 + assert [event["ts"] for event in events] == [ 197 + start, 198 + start + 300_000, 199 + start + 600_000, 200 + ] 201 + 202 + 203 + def test_read_chat_tail_last_n(tmp_path, monkeypatch): 204 + _setup_journal(tmp_path, monkeypatch) 205 + start = _ms(2026, 4, 20, 12, 0, 0) 206 + 207 + for index in range(4): 208 + append_chat_event( 209 + "owner_message", 210 + ts=start + (index * 60_000), 211 + text=f"msg-{index}", 212 + app="sol", 213 + path="/chat", 214 + facet="work", 215 + ) 216 + 217 + tail = read_chat_tail("20260420", limit=2) 218 + assert [event["text"] for event in tail] == ["msg-2", "msg-3"] 219 + 220 + 221 + def test_reduce_chat_state_extracts_latest_sol_and_active_talents( 222 + tmp_path, monkeypatch 223 + ): 224 + _setup_journal(tmp_path, monkeypatch) 225 + start = _ms(2026, 4, 20, 12, 0, 0) 226 + 227 + append_chat_event( 228 + "owner_message", 229 + ts=start, 230 + text="hello", 231 + app="sol", 232 + path="/chat", 233 + facet="work", 234 + ) 235 + append_chat_event( 236 + "sol_message", 237 + ts=start + 1_000, 238 + use_id="chat-1", 239 + text="dispatching", 240 + notes="planning", 241 + requested_exec=True, 242 + requested_task="compare drafts", 243 + ) 244 + append_chat_event( 245 + "talent_spawned", 246 + ts=start + 2_000, 247 + use_id="exec-1", 248 + name="exec", 249 + task="compare drafts", 250 + started_at=start + 2_000, 251 + ) 252 + append_chat_event( 253 + "talent_finished", 254 + ts=start + 3_000, 255 + use_id="exec-1", 256 + name="exec", 257 + summary="done", 258 + ) 259 + append_chat_event( 260 + "talent_spawned", 261 + ts=start + 4_000, 262 + use_id="exec-2", 263 + name="exec", 264 + task="write summary", 265 + started_at=start + 4_000, 266 + ) 267 + append_chat_event( 268 + "talent_errored", 269 + ts=start + 5_000, 270 + use_id="exec-3", 271 + name="exec", 272 + reason="bad input", 273 + ) 274 + append_chat_event( 275 + "chat_error", 276 + ts=start + 6_000, 277 + reason="chat had trouble — try again", 278 + use_id=None, 279 + ) 280 + 281 + reduced = reduce_chat_state("20260420") 282 + 283 + assert reduced["latest_sol_message"] == { 284 + "ts": start + 1_000, 285 + "use_id": "chat-1", 286 + "text": "dispatching", 287 + "notes": "planning", 288 + "requested_exec": True, 289 + "requested_task": "compare drafts", 290 + } 291 + assert reduced["active_talents"] == [ 292 + { 293 + "use_id": "exec-2", 294 + "name": "exec", 295 + "task": "write summary", 296 + "started_at": start + 4_000, 297 + } 298 + ] 299 + assert reduced["completed_talents"] == [ 300 + { 301 + "use_id": "exec-1", 302 + "name": "exec", 303 + "task": "compare drafts", 304 + "summary": "done", 305 + "finished_at": start + 3_000, 306 + } 307 + ] 308 + 309 + 310 + def test_find_unresponded_trigger_owner_message(tmp_path, monkeypatch): 311 + _setup_journal(tmp_path, monkeypatch) 312 + ts = _ms(2026, 4, 20, 12, 0, 0) 313 + 314 + append_chat_event( 315 + "owner_message", 316 + ts=ts, 317 + text="hello", 318 + app="sol", 319 + path="/chat", 320 + facet="work", 321 + ) 322 + 323 + trigger = find_unresponded_trigger("20260420") 324 + assert trigger is not None 325 + assert trigger["kind"] == "owner_message" 326 + assert trigger["text"] == "hello" 327 + 328 + 329 + def test_find_unresponded_trigger_talent_finished(tmp_path, monkeypatch): 330 + _setup_journal(tmp_path, monkeypatch) 331 + start = _ms(2026, 4, 20, 12, 0, 0) 332 + 333 + append_chat_event( 334 + "owner_message", 335 + ts=start, 336 + text="hello", 337 + app="sol", 338 + path="/chat", 339 + facet="work", 340 + ) 341 + append_chat_event( 342 + "sol_message", 343 + ts=start + 1_000, 344 + use_id="chat-1", 345 + text="working", 346 + notes="", 347 + requested_exec=False, 348 + requested_task=None, 349 + ) 350 + append_chat_event( 351 + "talent_finished", 352 + ts=start + 2_000, 353 + use_id="exec-1", 354 + name="exec", 355 + summary="done", 356 + ) 357 + 358 + trigger = find_unresponded_trigger("20260420") 359 + assert trigger is not None 360 + assert trigger["kind"] == "talent_finished" 361 + assert trigger["summary"] == "done" 362 + 363 + 364 + def test_find_unresponded_trigger_resolved(tmp_path, monkeypatch): 365 + _setup_journal(tmp_path, monkeypatch) 366 + start = _ms(2026, 4, 20, 12, 0, 0) 367 + 368 + append_chat_event( 369 + "talent_finished", 370 + ts=start, 371 + use_id="exec-1", 372 + name="exec", 373 + summary="done", 374 + ) 375 + append_chat_event( 376 + "sol_message", 377 + ts=start + 1_000, 378 + use_id="chat-1", 379 + text="thanks", 380 + notes="", 381 + requested_exec=False, 382 + requested_task=None, 383 + ) 384 + 385 + assert find_unresponded_trigger("20260420") is None
+75
think/chat_formatter.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + from __future__ import annotations 5 + 6 + from typing import Any 7 + 8 + from think.utils import get_config 9 + 10 + 11 + def format_chat( 12 + entries: list[dict[str, Any]], 13 + context: dict[str, Any] | None = None, 14 + ) -> tuple[list[dict[str, Any]], dict[str, Any]]: 15 + """Format chat stream JSONL entries into markdown chunks.""" 16 + owner_name, agent_name = _resolve_labels(context or {}) 17 + chunks: list[dict[str, Any]] = [] 18 + 19 + for entry in entries: 20 + kind = str(entry.get("kind") or "").strip() 21 + markdown = _format_entry(kind, entry, owner_name, agent_name) 22 + 23 + chunks.append( 24 + { 25 + "timestamp": int(entry.get("ts", 0) or 0), 26 + "markdown": markdown, 27 + "source": entry, 28 + } 29 + ) 30 + 31 + return chunks, {"indexer": {"agent": "chat"}} 32 + 33 + 34 + def _resolve_labels(context: dict[str, Any]) -> tuple[str, str]: 35 + owner_name = str(context.get("owner_name") or "").strip() 36 + agent_name = str(context.get("agent_name") or "").strip() 37 + 38 + config = get_config() 39 + if not owner_name: 40 + identity = config.get("identity", {}) 41 + owner_name = str( 42 + identity.get("preferred") or identity.get("name") or "" 43 + ).strip() 44 + if not agent_name: 45 + agent_name = str(config.get("agent", {}).get("name") or "").strip() 46 + 47 + return (owner_name or "Owner", agent_name or "Sol") 48 + 49 + 50 + def _format_entry( 51 + kind: str, 52 + entry: dict[str, Any], 53 + owner_name: str, 54 + agent_name: str, 55 + ) -> str | None: 56 + if kind == "owner_message": 57 + return _speaker_line(owner_name, entry.get("text")) 58 + if kind == "sol_message": 59 + return _speaker_line(agent_name, entry.get("text")) 60 + if kind == "talent_spawned": 61 + return f"*[{entry['name']} spawned: {entry['task']}]*" 62 + if kind == "talent_finished": 63 + return f"*[{entry['name']} finished: {entry['summary']}]*" 64 + if kind == "talent_errored": 65 + return f"*[{entry['name']} errored: {entry['reason']}]*" 66 + if kind == "chat_error": 67 + return f"*[chat trouble: {entry['reason']}]*" 68 + raise ValueError(f"Unknown chat event kind for formatter: {kind}") 69 + 70 + 71 + def _speaker_line(label: str, body: Any) -> str: 72 + text = str(body or "").strip() 73 + if not text: 74 + return f"**{label}**" 75 + return f"**{label}** {text}"
+1
think/formatters.py
··· 191 191 "*/*/*/*_transcript.jsonl": ("observe.hear", "format_audio", False), 192 192 "*/*/*/screen.jsonl": ("observe.screen", "format_screen", False), 193 193 "*/*/*/*_screen.jsonl": ("observe.screen", "format_screen", False), 194 + "*/chat/*/chat.jsonl": ("think.chat_formatter", "format_chat", True), 194 195 # Markdown — day-level agents output and segment-level (day/stream/segment/talents/) 195 196 "*/talents/*.md": ("think.markdown", "format_markdown", True), 196 197 # Layout: day/stream/segment/talents/*.md