# SPDX-License-Identifier: AGPL-3.0-only # Copyright (c) 2026 sol pbc from __future__ import annotations import json import logging import os import sys import threading import time from datetime import date, datetime from pathlib import Path from typing import Any from think.callosum import callosum_send from think.indexer.journal import index_file from think.streams import update_stream, write_segment_stream from think.utils import day_path, get_journal, segment_key, segment_parse, segment_path logger = logging.getLogger(__name__) _CHAT_LOCK = threading.Lock() _CHAT_STREAM = "chat" _SEGMENT_WINDOW_MS = 300_000 _VALID_KINDS = { "owner_message": ("text", "app", "path", "facet"), "sol_message": ( "use_id", "text", "notes", "requested_target", "requested_task", ), "talent_spawned": ("use_id", "name", "task", "started_at"), "talent_finished": ("use_id", "name", "summary"), "talent_errored": ("use_id", "name", "reason"), "reflection_ready": ("day", "url"), "chat_error": ("reason", "use_id"), } _TRIGGER_KINDS = {"owner_message", "talent_finished", "talent_errored"} def append_chat_event(kind: str, **fields: Any) -> dict[str, Any]: """Append a chat event to the current 5-minute segment.""" if kind not in _VALID_KINDS: raise ValueError(f"Unknown chat event kind: {kind}") event = dict(fields) event.setdefault("ts", int(time.time() * 1000)) _validate_event(kind, event) day = _day_for_ts(event["ts"]) _require_journal_root() with _CHAT_LOCK: segment = _current_segment_key(day, event["ts"]) segment_dir = segment_path(day, segment, _CHAT_STREAM) chat_path = segment_dir / "chat.jsonl" had_segment_file = chat_path.exists() events = _read_events_file(chat_path) stored_event = {"kind": kind, **event} events.append(stored_event) _write_events_file(chat_path, events) if not had_segment_file: stream_info = update_stream(_CHAT_STREAM, day, segment, type=_CHAT_STREAM) write_segment_stream( segment_dir, _CHAT_STREAM, stream_info["prev_day"], stream_info["prev_segment"], stream_info["seq"], ) try: index_file(get_journal(), str(chat_path)) except Exception: logger.warning( "chat-event-index-failed", extra={ "kind": kind, "use_id": str(stored_event.get("use_id") or ""), "chat_path": str(chat_path), }, exc_info=True, ) _broadcast_chat_event(stored_event) return stored_event def read_chat_events(day: str, limit: int | None = None) -> list[dict[str, Any]]: """Return chat events for ``day`` in ascending timestamp order.""" chat_root = day_path(day, create=False) / _CHAT_STREAM if not chat_root.exists(): return [] ordered: list[tuple[int, str, int, dict[str, Any]]] = [] for segment_dir in sorted(chat_root.iterdir(), key=lambda path: path.name): if not segment_dir.is_dir() or segment_key(segment_dir.name) is None: continue chat_path = segment_dir / "chat.jsonl" if not chat_path.exists(): continue for line_no, event in enumerate(_read_events_file(chat_path)): ordered.append( (int(event.get("ts", 0) or 0), segment_dir.name, line_no, event) ) ordered.sort(key=lambda item: (item[0], item[1], item[2])) events = [item[3] for item in ordered] if limit is None: return events if limit == 0: return [] return events[-limit:] def read_chat_tail(day: str, limit: int = 20) -> list[dict[str, Any]]: """Return the most recent ``limit`` chat events for ``day``.""" return read_chat_events(day, limit=limit) def reduce_chat_state(day: str) -> dict[str, Any]: """Reduce a day's chat stream into the current chat session state.""" latest_sol_message: dict[str, Any] | None = None active_talents: dict[str, dict[str, Any]] = {} completed_talents: list[dict[str, Any]] = [] for event in read_chat_events(day): kind = event.get("kind") if kind == "sol_message": latest_sol_message = { "ts": event["ts"], "use_id": event["use_id"], "text": event["text"], "notes": event["notes"], "requested_target": event["requested_target"], "requested_task": event["requested_task"], } continue if kind == "talent_spawned": active_talents[str(event["use_id"])] = { "use_id": event["use_id"], "name": event["name"], "task": event["task"], "started_at": event["started_at"], } continue if kind == "talent_finished": started = active_talents.pop(str(event["use_id"]), None) completed_talents.append( { "use_id": event["use_id"], "name": event["name"], "task": started["task"] if started else None, "summary": event["summary"], "finished_at": event["ts"], } ) continue if kind == "talent_errored": active_talents.pop(str(event["use_id"]), None) continue if kind == "reflection_ready": continue return { "latest_sol_message": latest_sol_message, "active_talents": sorted( active_talents.values(), key=lambda talent: ( int(talent.get("started_at", 0) or 0), str(talent["use_id"]), ), ), "completed_talents": completed_talents, } def find_unresponded_trigger(day: str) -> dict[str, Any] | None: """Return the most recent unresolved trigger event for ``day``.""" for event in reversed(read_chat_events(day)): kind = event.get("kind") if kind == "sol_message": return None if kind in _TRIGGER_KINDS: return event return None def _validate_event(kind: str, event: dict[str, Any]) -> None: ts = event.get("ts") if not isinstance(ts, int): raise ValueError(f"chat event ts must be an int, got {type(ts).__name__}") missing = [field for field in _VALID_KINDS[kind] if field not in event] if missing: required = ", ".join(missing) raise ValueError(f"{kind} requires fields: {required}") def _broadcast_chat_event(stored_event: dict[str, Any]) -> None: chat_module = sys.modules.get("convey.chat") runtime = ( getattr(chat_module, "_runtime", None) if chat_module is not None else None ) if runtime is None: return kind = str(stored_event.get("kind") or "") use_id = str(stored_event.get("use_id") or "") try: if runtime.callosum.emit("chat", kind, **stored_event): return if callosum_send("chat", kind, **stored_event): return logger.warning( "Failed to broadcast chat event kind=%s use_id=%s", kind, use_id or "-", ) except Exception as exc: logger.warning( "Failed to broadcast chat event kind=%s use_id=%s: %s", kind, use_id or "-", exc, ) def _require_journal_root() -> Path: journal = Path(get_journal()) if not journal.exists(): raise FileNotFoundError(f"Journal root does not exist: {journal}") if not journal.is_dir(): raise NotADirectoryError(f"Journal root is not a directory: {journal}") return journal def _day_for_ts(ts_ms: int) -> str: return _ts_to_local_datetime(ts_ms).strftime("%Y%m%d") def _current_segment_key(day: str, ts_ms: int) -> str: event_dt = _ts_to_local_datetime(ts_ms) existing = _chat_segments(day) if not existing: return _segment_key_for_start(event_dt) current = existing[-1] current_start = _segment_start_datetime(day, current) current_start_ms = int(current_start.timestamp() * 1000) if ts_ms - current_start_ms >= _SEGMENT_WINDOW_MS: return _segment_key_for_start(event_dt) return current def _chat_segments(day: str) -> list[str]: chat_root = day_path(day, create=False) / _CHAT_STREAM if not chat_root.exists(): return [] return sorted( entry.name for entry in chat_root.iterdir() if entry.is_dir() and segment_key(entry.name) is not None ) def _segment_key_for_start(start_dt: datetime) -> str: return f"{start_dt.strftime('%H%M%S')}_300" def _segment_start_datetime(day: str, segment: str) -> datetime: start_time, _ = segment_parse(segment) if start_time is None: raise ValueError(f"Invalid chat segment key: {segment}") return datetime.combine( date.fromisoformat(f"{day[:4]}-{day[4:6]}-{day[6:8]}"), start_time ) def _ts_to_local_datetime(ts_ms: int) -> datetime: return datetime.fromtimestamp(ts_ms / 1000) def _read_events_file(path: Path) -> list[dict[str, Any]]: if not path.exists(): return [] events: list[dict[str, Any]] = [] with open(path, "r", encoding="utf-8") as handle: for line_no, raw_line in enumerate(handle, start=1): line = raw_line.strip() if not line: continue try: payload = json.loads(line) except json.JSONDecodeError as exc: raise ValueError(f"Invalid JSON in {path}:{line_no}") from exc if not isinstance(payload, dict): raise ValueError(f"Expected JSON object in {path}:{line_no}") events.append(payload) return events def _write_events_file(path: Path, events: list[dict[str, Any]]) -> None: tmp_path = path.with_suffix(f".{os.getpid()}-{threading.get_ident()}.tmp") try: with open(tmp_path, "w", encoding="utf-8") as handle: for event in events: handle.write(json.dumps(event, ensure_ascii=False)) handle.write("\n") os.replace(tmp_path, path) except Exception: try: if tmp_path.exists(): tmp_path.unlink() except OSError: pass raise