linux observer
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

chat-bridge: surface sol chat via notify-send and FIFO

- Run the chat bridge as a peer observer task so server-initiated chat can flow without touching the capture loop.
- Consume callosum SSE with a blocking worker isolated behind asyncio queues, including auth-terminal exits and bounded reconnect backoff.
- Gate desktop notifications on server opt-in while always mirroring request and clear lines to the optional local FIFO.
- Add a config off-switch and document the bridge as the observer's third concurrent concern.

Co-Authored-By: OpenAI Codex <codex@openai.com>

+1121 -1
+3 -1
AGENTS.md
··· 35 35 36 36 ## Architecture 37 37 38 - The observer runs a single asyncio event loop with two concurrent concerns: 38 + The observer runs a single asyncio event loop with three concurrent concerns: 39 39 40 40 1. **Capture loop** (`observer.py`) — Checks activity status every 5 seconds, records audio continuously, manages screencast recording via GStreamer. Creates 5-minute segments in `~/.local/share/solstone-linux/captures/YYYYMMDD/stream/HHMMSS_DDD/`. Segment directories start as `.incomplete` and are renamed on finalization. 41 41 42 42 2. **Sync service** (`sync.py`) — Background asyncio task that walks the captures directory, queries the server for existing segments, and uploads missing ones. Circuit breaker pattern with error-type-aware thresholds. 43 + 44 + 3. **Chat bridge** (`chat_bridge.py`) — Background asyncio task that consumes server-sent callosum chat events, mirrors request/clear messages to an optional local FIFO, and fires click-capturing `notify-send` subprocesses when server opt-in allows Linux desktop notifications. 43 45 44 46 State machine has two modes: `screencast` (screen active, recording video) and `idle` (screen inactive). Mode transitions, mute state changes, and 5-minute intervals all trigger segment boundaries. 45 47
+493
src/solstone_linux/chat_bridge.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Bridge server-initiated chat events into local notification surfaces. 5 + 6 + The bridge consumes callosum SSE frames, mirrors requests into an optional FIFO, 7 + and fires click-capturing desktop notifications when the server opt-in allows it. 8 + """ 9 + 10 + from __future__ import annotations 11 + 12 + import asyncio 13 + import errno 14 + import json 15 + import logging 16 + import os 17 + import stat 18 + import subprocess 19 + import threading 20 + import time 21 + from collections import OrderedDict 22 + from dataclasses import dataclass 23 + from datetime import datetime 24 + from pathlib import Path 25 + from typing import Any 26 + 27 + import requests 28 + 29 + from .config import Config 30 + 31 + logger = logging.getLogger(__name__) 32 + 33 + # Keep these event names and owner-facing copy hand-synced with 34 + # solstone/convey/sol_initiated/copy.py; this repo does not vendor that canon. 35 + EVENT_SOL_CHAT_REQUEST = "sol_chat_request" 36 + EVENT_SOL_CHAT_REQUEST_SUPERSEDED = "sol_chat_request_superseded" 37 + EVENT_OWNER_CHAT_OPEN = "owner_chat_open" 38 + EVENT_OWNER_CHAT_DISMISSED = "owner_chat_dismissed" 39 + 40 + NOTIFY_TITLE = "sol" 41 + SURFACE = "linux" 42 + FIFO_PATH = Path.home() / ".solstone" / "notify" 43 + _HANDLED_EVENTS = frozenset( 44 + { 45 + EVENT_SOL_CHAT_REQUEST, 46 + EVENT_SOL_CHAT_REQUEST_SUPERSEDED, 47 + EVENT_OWNER_CHAT_OPEN, 48 + EVENT_OWNER_CHAT_DISMISSED, 49 + } 50 + ) 51 + RECONNECT_DELAYS = [1, 2, 4, 8, 16, 30] 52 + HEARTBEAT_STALE_SECONDS = 60 53 + OPT_IN_POLL_SECONDS = 300 54 + PENDING_CAP = 32 55 + 56 + 57 + @dataclass 58 + class PendingRequest: 59 + request_id: str 60 + summary: str 61 + chat_url: str 62 + notify_task: asyncio.Task | None = None 63 + 64 + 65 + class _SseParser: 66 + def __init__(self) -> None: 67 + self._event: str | None = None 68 + self._data: list[str] = [] 69 + self._id: str | None = None 70 + 71 + def feed_line(self, line: str) -> dict[str, str | None] | None: 72 + line = line.rstrip("\r\n") 73 + if line == "": 74 + if not self._data: 75 + self._event = None 76 + self._id = None 77 + return None 78 + frame = { 79 + "event": self._event, 80 + "data": "\n".join(self._data), 81 + "id": self._id, 82 + } 83 + self._event = None 84 + self._data = [] 85 + self._id = None 86 + return frame 87 + 88 + if line.startswith(":"): 89 + return None 90 + 91 + field, sep, value = line.partition(":") 92 + if sep and value.startswith(" "): 93 + value = value[1:] 94 + 95 + if field == "data": 96 + self._data.append(value) 97 + elif field == "event": 98 + self._event = value 99 + elif field == "id": 100 + self._id = value 101 + 102 + return None 103 + 104 + 105 + def _auth_headers(key: str) -> dict[str, str]: 106 + return {"Authorization": f"Bearer {key}"} 107 + 108 + 109 + def _write_fifo(line: str, path: Path = FIFO_PATH) -> None: 110 + try: 111 + if not path.exists(): 112 + logger.debug("Chat bridge FIFO missing: %s", path) 113 + return 114 + if not stat.S_ISFIFO(path.stat().st_mode): 115 + logger.debug("Chat bridge path is not a FIFO: %s", path) 116 + return 117 + 118 + fd = os.open(path, os.O_WRONLY | os.O_NONBLOCK) 119 + try: 120 + os.write(fd, line.encode("utf-8")) 121 + finally: 122 + os.close(fd) 123 + except FileNotFoundError: 124 + logger.debug("Chat bridge FIFO missing: %s", path) 125 + except BlockingIOError: 126 + logger.debug("Chat bridge FIFO has no reader: %s", path) 127 + except OSError as e: 128 + if e.errno in (errno.ENXIO, errno.EAGAIN, errno.EWOULDBLOCK): 129 + logger.debug("Chat bridge FIFO unavailable: %s", e) 130 + return 131 + logger.warning("Chat bridge FIFO write failed: %s", e) 132 + 133 + 134 + def _push_frame( 135 + queue: asyncio.Queue, 136 + loop: asyncio.AbstractEventLoop, 137 + frame: dict[str, Any], 138 + ) -> None: 139 + loop.call_soon_threadsafe(queue.put_nowait, frame) 140 + 141 + 142 + def _sse_worker( 143 + url: str, 144 + key: str, 145 + queue: asyncio.Queue, 146 + loop: asyncio.AbstractEventLoop, 147 + stop_event: threading.Event, 148 + ) -> None: 149 + parser = _SseParser() 150 + try: 151 + response = requests.get( 152 + url, 153 + stream=True, 154 + headers=_auth_headers(key), 155 + timeout=(10, None), 156 + ) 157 + if response.status_code in (401, 403): 158 + _push_frame( 159 + queue, loop, {"_terminal": True, "status": response.status_code} 160 + ) 161 + return 162 + if response.status_code != 200: 163 + _push_frame( 164 + queue, 165 + loop, 166 + { 167 + "_transport_error": True, 168 + "error": f"status {response.status_code}", 169 + }, 170 + ) 171 + return 172 + 173 + for raw_line in response.iter_lines(decode_unicode=True): 174 + if stop_event.is_set(): 175 + return 176 + if raw_line is None: 177 + continue 178 + line = raw_line.decode("utf-8") if isinstance(raw_line, bytes) else raw_line 179 + if line.startswith(":"): 180 + _push_frame(queue, loop, {"_heartbeat": True}) 181 + frame = parser.feed_line(line) 182 + if frame is not None: 183 + _push_frame(queue, loop, frame) 184 + except requests.RequestException as e: 185 + _push_frame(queue, loop, {"_transport_error": True, "error": str(e)}) 186 + 187 + 188 + async def _poll_opt_in(server_url: str, key: str) -> bool: 189 + url = f"{server_url.rstrip('/')}/api/sol_voice" 190 + 191 + try: 192 + response = await asyncio.to_thread( 193 + requests.get, 194 + url, 195 + headers=_auth_headers(key), 196 + timeout=10, 197 + ) 198 + if response.status_code != 200: 199 + return False 200 + data = response.json() 201 + except (requests.RequestException, ValueError, TypeError): 202 + return False 203 + 204 + return bool(data.get("linux_notify_send", False)) 205 + 206 + 207 + def _chat_url(server_url: str, day: str | None, event_index: int | None) -> str: 208 + base = server_url.rstrip("/") 209 + if day and event_index is not None: 210 + return f"{base}/app/chat/{day}#event-{event_index}" 211 + today = datetime.now().strftime("%Y%m%d") 212 + return f"{base}/app/chat/{today}" 213 + 214 + 215 + async def _handle_one_notification( 216 + req: PendingRequest, server_url: str, key: str 217 + ) -> None: 218 + proc = await asyncio.create_subprocess_exec( 219 + "notify-send", 220 + "--wait", 221 + "--app-name", 222 + "solstone", 223 + "--action=open=Open", 224 + NOTIFY_TITLE, 225 + req.summary, 226 + stdout=asyncio.subprocess.PIPE, 227 + stderr=asyncio.subprocess.DEVNULL, 228 + ) 229 + try: 230 + await proc.communicate() 231 + except asyncio.CancelledError: 232 + proc.terminate() 233 + try: 234 + await asyncio.wait_for(proc.wait(), timeout=1) 235 + except asyncio.TimeoutError: 236 + proc.kill() 237 + await proc.wait() 238 + raise 239 + 240 + if proc.returncode != 0: 241 + logger.debug("notify-send exited with status %s", proc.returncode) 242 + return 243 + 244 + logger.info("Opening chat request: %s", req.request_id) 245 + url = f"{server_url.rstrip('/')}/api/chat/{EVENT_SOL_CHAT_REQUEST}/open" 246 + try: 247 + response = await asyncio.to_thread( 248 + requests.post, 249 + url, 250 + json={"request_id": req.request_id}, 251 + headers=_auth_headers(key), 252 + timeout=10, 253 + ) 254 + if response.status_code >= 400: 255 + logger.debug("Chat open ack failed: status %s", response.status_code) 256 + except requests.RequestException as e: 257 + logger.debug("Chat open ack failed: %s", e) 258 + 259 + try: 260 + subprocess.Popen( 261 + ["xdg-open", req.chat_url], 262 + stdout=subprocess.DEVNULL, 263 + stderr=subprocess.DEVNULL, 264 + ) 265 + except OSError as e: 266 + logger.debug("xdg-open failed: %s", e) 267 + 268 + 269 + async def _opt_in_poll_loop(server_url: str, key: str, state: dict[str, bool]) -> None: 270 + while True: 271 + state["value"] = await _poll_opt_in(server_url, key) 272 + await asyncio.sleep(OPT_IN_POLL_SECONDS) 273 + 274 + 275 + def _cancel_pending_task(task: asyncio.Task | None) -> None: 276 + if task is not None and not task.done(): 277 + task.cancel() 278 + 279 + 280 + def _enforce_pending_cap(pending: OrderedDict[str, PendingRequest]) -> None: 281 + while len(pending) > PENDING_CAP: 282 + request_id, old_req = pending.popitem(last=False) 283 + _cancel_pending_task(old_req.notify_task) 284 + logger.debug("Evicted pending chat request: %s", request_id) 285 + 286 + 287 + def _mark_stale_if_needed( 288 + last_frame_at: float, is_stale: bool, stale_logged: bool 289 + ) -> tuple[bool, bool]: 290 + if time.monotonic() - last_frame_at > HEARTBEAT_STALE_SECONDS and not is_stale: 291 + logger.warning("Chat bridge heartbeat stale") 292 + return True, True 293 + return is_stale, stale_logged 294 + 295 + 296 + def _mark_live_frame(is_stale: bool, stale_logged: bool) -> tuple[bool, bool]: 297 + if is_stale: 298 + if stale_logged: 299 + logger.info("Chat bridge heartbeat recovered") 300 + return False, False 301 + return is_stale, stale_logged 302 + 303 + 304 + async def _dispatch_event( 305 + payload: dict[str, Any], 306 + pending: OrderedDict[str, PendingRequest], 307 + opt_in: bool, 308 + is_stale: bool, 309 + config: Config, 310 + ) -> None: 311 + if payload.get("tract") != "chat": 312 + return 313 + 314 + event = payload.get("event") 315 + if event not in _HANDLED_EVENTS: 316 + return 317 + 318 + request_id = payload.get("request_id") 319 + if not request_id: 320 + logger.debug("Chat event missing request_id: %s", event) 321 + return 322 + request_id = str(request_id) 323 + 324 + if event == EVENT_SOL_CHAT_REQUEST: 325 + summary = str(payload.get("summary") or "") 326 + _write_fifo(f"sol-ping {request_id} {summary}\n") 327 + 328 + old_req = pending.pop(request_id, None) 329 + if old_req is not None: 330 + _cancel_pending_task(old_req.notify_task) 331 + 332 + if opt_in and not is_stale: 333 + event_index = payload.get("event_index") 334 + if not isinstance(event_index, int): 335 + event_index = None 336 + req = PendingRequest( 337 + request_id=request_id, 338 + summary=summary, 339 + chat_url=_chat_url(config.server_url, payload.get("day"), event_index), 340 + ) 341 + req.notify_task = asyncio.create_task( 342 + _handle_one_notification(req, config.server_url, config.key) 343 + ) 344 + pending[request_id] = req 345 + _enforce_pending_cap(pending) 346 + return 347 + 348 + if event in ( 349 + EVENT_SOL_CHAT_REQUEST_SUPERSEDED, 350 + EVENT_OWNER_CHAT_OPEN, 351 + EVENT_OWNER_CHAT_DISMISSED, 352 + ): 353 + old_req = pending.pop(request_id, None) 354 + if old_req is not None: 355 + _cancel_pending_task(old_req.notify_task) 356 + _write_fifo(f"clear {request_id}\n") 357 + 358 + 359 + async def _cancel_pending_notifications( 360 + pending: OrderedDict[str, PendingRequest], 361 + ) -> None: 362 + tasks = [req.notify_task for req in pending.values() if req.notify_task is not None] 363 + for task in tasks: 364 + _cancel_pending_task(task) 365 + if tasks: 366 + await asyncio.gather(*tasks, return_exceptions=True) 367 + pending.clear() 368 + 369 + 370 + async def _await_worker(worker_task: asyncio.Task | None) -> None: 371 + if worker_task is None: 372 + return 373 + try: 374 + await asyncio.wait_for(worker_task, timeout=1) 375 + except asyncio.TimeoutError: 376 + worker_task.cancel() 377 + await asyncio.gather(worker_task, return_exceptions=True) 378 + 379 + 380 + async def _sleep_reconnect(delay: int, stop_event: asyncio.Event) -> None: 381 + if not stop_event.is_set(): 382 + await asyncio.sleep(delay) 383 + 384 + 385 + async def run_chat_bridge(config: Config, stop_event: asyncio.Event) -> None: 386 + try: 387 + if not config.chat_bridge_enabled: 388 + return 389 + if not config.server_url or not config.key: 390 + logger.debug("Chat bridge disabled: server_url or key missing") 391 + return 392 + 393 + server_url = config.server_url.rstrip("/") 394 + key = config.key 395 + sse_url = f"{server_url}/app/observer/{key}/callosum" 396 + pending: OrderedDict[str, PendingRequest] = OrderedDict() 397 + opt_in_state = {"value": False} 398 + opt_in_task = asyncio.create_task( 399 + _opt_in_poll_loop(server_url, key, opt_in_state) 400 + ) 401 + reconnect_index = 0 402 + is_stale = False 403 + stale_logged = False 404 + worker_task: asyncio.Task | None = None 405 + thread_stop: threading.Event | None = None 406 + 407 + try: 408 + while not stop_event.is_set(): 409 + queue: asyncio.Queue = asyncio.Queue() 410 + thread_stop = threading.Event() 411 + loop = asyncio.get_running_loop() 412 + worker_task = asyncio.create_task( 413 + asyncio.to_thread( 414 + _sse_worker, sse_url, key, queue, loop, thread_stop 415 + ) 416 + ) 417 + last_frame_at = time.monotonic() 418 + reconnect = False 419 + 420 + while not stop_event.is_set(): 421 + try: 422 + frame = await asyncio.wait_for(queue.get(), timeout=5) 423 + except asyncio.TimeoutError: 424 + is_stale, stale_logged = _mark_stale_if_needed( 425 + last_frame_at, is_stale, stale_logged 426 + ) 427 + if worker_task.done(): 428 + reconnect = True 429 + break 430 + continue 431 + 432 + if frame.get("_terminal"): 433 + logger.error( 434 + "Chat bridge SSE authorization failed: status %s", 435 + frame.get("status"), 436 + ) 437 + thread_stop.set() 438 + return 439 + 440 + if frame.get("_transport_error"): 441 + logger.debug( 442 + "Chat bridge transport error: %s", frame.get("error") 443 + ) 444 + reconnect = True 445 + break 446 + 447 + last_frame_at = time.monotonic() 448 + reconnect_index = 0 449 + is_stale, stale_logged = _mark_live_frame(is_stale, stale_logged) 450 + 451 + if frame.get("_heartbeat"): 452 + continue 453 + 454 + data = frame.get("data") 455 + if not isinstance(data, str): 456 + continue 457 + try: 458 + payload = json.loads(data) 459 + except json.JSONDecodeError as e: 460 + logger.debug("Chat bridge frame JSON decode failed: %s", e) 461 + continue 462 + if not isinstance(payload, dict): 463 + continue 464 + await _dispatch_event( 465 + payload, 466 + pending, 467 + opt_in_state["value"], 468 + is_stale, 469 + config, 470 + ) 471 + 472 + if thread_stop: 473 + thread_stop.set() 474 + await _await_worker(worker_task) 475 + worker_task = None 476 + if stop_event.is_set(): 477 + break 478 + if reconnect: 479 + delay = RECONNECT_DELAYS[ 480 + min(reconnect_index, len(RECONNECT_DELAYS) - 1) 481 + ] 482 + reconnect_index += 1 483 + logger.info("Chat bridge reconnecting in %ss", delay) 484 + await _sleep_reconnect(delay, stop_event) 485 + finally: 486 + if thread_stop: 487 + thread_stop.set() 488 + opt_in_task.cancel() 489 + await asyncio.gather(opt_in_task, return_exceptions=True) 490 + await _cancel_pending_notifications(pending) 491 + await _await_worker(worker_task) 492 + except Exception as e: 493 + logger.error("Chat bridge crashed: %s", e, exc_info=True)
+3
src/solstone_linux/config.py
··· 38 38 ) 39 39 sync_max_retries: int = DEFAULT_SYNC_MAX_RETRIES 40 40 cache_retention_days: int = 7 41 + chat_bridge_enabled: bool = True 41 42 base_dir: Path = DEFAULT_BASE_DIR 42 43 43 44 @property ··· 96 97 config.cache_retention_days = int(data.get("cache_retention_days", 7)) 97 98 except (TypeError, ValueError): 98 99 config.cache_retention_days = 7 100 + config.chat_bridge_enabled = data.get("chat_bridge_enabled", True) 99 101 100 102 return config 101 103 ··· 112 114 "sync_retry_delays": config.sync_retry_delays, 113 115 "sync_max_retries": config.sync_max_retries, 114 116 "cache_retention_days": config.cache_retention_days, 117 + "chat_bridge_enabled": config.chat_bridge_enabled, 115 118 } 116 119 117 120 config_path = config.config_path
+21
src/solstone_linux/observer.py
··· 39 39 ) 40 40 from .audio_mute import is_sink_muted 41 41 from .audio_recorder import AudioRecorder 42 + from .chat_bridge import run_chat_bridge 42 43 from .config import Config 43 44 from .recovery import write_segment_metadata 44 45 from .screencast import Screencaster, StreamInfo ··· 477 478 logger.info(f"Starting observer loop (interval={self.interval}s)") 478 479 479 480 # Start sync service as background task 481 + bridge_stop_event = asyncio.Event() 482 + bridge_task = None 480 483 sync_task = None 481 484 if self._sync: 482 485 sync_task = asyncio.create_task(self._sync.run()) 486 + if self.config.chat_bridge_enabled: 487 + bridge_task = asyncio.create_task( 488 + run_chat_bridge(self.config, bridge_stop_event) 489 + ) 483 490 484 491 # Determine initial mode (default to screencast if check fails) 485 492 try: ··· 505 512 try: 506 513 await sync_task 507 514 except asyncio.CancelledError: 515 + pass 516 + bridge_stop_event.set() 517 + if bridge_task: 518 + bridge_task.cancel() 519 + try: 520 + await bridge_task 521 + except (asyncio.CancelledError, Exception): 508 522 pass 509 523 return 510 524 else: ··· 664 678 try: 665 679 await sync_task 666 680 except asyncio.CancelledError: 681 + pass 682 + bridge_stop_event.set() 683 + if bridge_task: 684 + bridge_task.cancel() 685 + try: 686 + await bridge_task 687 + except (asyncio.CancelledError, Exception): 667 688 pass 668 689 669 690 async def shutdown(self):
+601
tests/test_chat_bridge.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + import asyncio 5 + import errno 6 + import logging 7 + import os 8 + from collections import OrderedDict 9 + from pathlib import Path 10 + from unittest.mock import AsyncMock, patch 11 + 12 + import pytest 13 + 14 + from solstone_linux import chat_bridge 15 + from solstone_linux.chat_bridge import ( 16 + EVENT_OWNER_CHAT_DISMISSED, 17 + EVENT_OWNER_CHAT_OPEN, 18 + EVENT_SOL_CHAT_REQUEST, 19 + EVENT_SOL_CHAT_REQUEST_SUPERSEDED, 20 + HEARTBEAT_STALE_SECONDS, 21 + PENDING_CAP, 22 + PendingRequest, 23 + _chat_url, 24 + _dispatch_event, 25 + _handle_one_notification, 26 + _mark_live_frame, 27 + _mark_stale_if_needed, 28 + _SseParser, 29 + _write_fifo, 30 + run_chat_bridge, 31 + ) 32 + from solstone_linux.config import Config 33 + 34 + 35 + class FakeResponse: 36 + def __init__(self, status_code=200, data=None, lines=None): 37 + self.status_code = status_code 38 + self._data = data if data is not None else {} 39 + self._lines = lines if lines is not None else [] 40 + 41 + def json(self): 42 + return self._data 43 + 44 + def iter_lines(self, decode_unicode=True): 45 + yield from self._lines 46 + 47 + 48 + class FakeProc: 49 + def __init__(self, returncode=0): 50 + self.returncode = returncode 51 + self.terminated = False 52 + self.killed = False 53 + 54 + async def communicate(self): 55 + return b"", b"" 56 + 57 + async def wait(self): 58 + return self.returncode 59 + 60 + def terminate(self): 61 + self.terminated = True 62 + 63 + def kill(self): 64 + self.killed = True 65 + 66 + 67 + def _config(enabled=True) -> Config: 68 + config = Config() 69 + config.server_url = "https://server.test" 70 + config.key = "key-123" 71 + config.chat_bridge_enabled = enabled 72 + return config 73 + 74 + 75 + def _payload(event=EVENT_SOL_CHAT_REQUEST, request_id="req-1", **extra): 76 + payload = { 77 + "tract": "chat", 78 + "event": event, 79 + "request_id": request_id, 80 + "summary": "hello", 81 + "day": "20260509", 82 + "event_index": 7, 83 + } 84 + payload.update(extra) 85 + return payload 86 + 87 + 88 + async def _never_notify(req, server_url, key): 89 + await asyncio.Event().wait() 90 + 91 + 92 + async def _never_poll(server_url, key, state): 93 + await asyncio.Event().wait() 94 + 95 + 96 + def _terminal_worker(status): 97 + def worker(url, key, queue, loop, stop_event): 98 + loop.call_soon_threadsafe( 99 + queue.put_nowait, {"_terminal": True, "status": status} 100 + ) 101 + 102 + return worker 103 + 104 + 105 + def _transport_worker(url, key, queue, loop, thread_stop): 106 + loop.call_soon_threadsafe( 107 + queue.put_nowait, {"_transport_error": True, "error": "boom"} 108 + ) 109 + 110 + 111 + def test_sse_parser_data_only_frame(): 112 + parser = _SseParser() 113 + 114 + assert parser.feed_line("data: hello") is None 115 + assert parser.feed_line("") == {"event": None, "data": "hello", "id": None} 116 + 117 + 118 + def test_sse_parser_event_and_data_frame(): 119 + parser = _SseParser() 120 + 121 + parser.feed_line("event: message") 122 + parser.feed_line("id: 42") 123 + parser.feed_line("data: hello") 124 + 125 + assert parser.feed_line("") == {"event": "message", "data": "hello", "id": "42"} 126 + 127 + 128 + def test_sse_parser_multiline_data(): 129 + parser = _SseParser() 130 + 131 + parser.feed_line("data: hello") 132 + parser.feed_line("data: world") 133 + 134 + assert parser.feed_line("")["data"] == "hello\nworld" 135 + 136 + 137 + def test_sse_parser_ignores_comment(): 138 + parser = _SseParser() 139 + 140 + assert parser.feed_line(": heartbeat") is None 141 + parser.feed_line("data: after") 142 + 143 + assert parser.feed_line("")["data"] == "after" 144 + 145 + 146 + def test_sse_parser_partial_frame_without_terminator_returns_none(): 147 + parser = _SseParser() 148 + 149 + assert parser.feed_line("event: message") is None 150 + assert parser.feed_line("data: partial") is None 151 + 152 + 153 + @pytest.mark.asyncio 154 + async def test_dispatch_drops_non_chat_tract(): 155 + pending = OrderedDict() 156 + 157 + with patch("solstone_linux.chat_bridge._write_fifo") as write_fifo: 158 + await _dispatch_event( 159 + {"tract": "other", "event": EVENT_SOL_CHAT_REQUEST}, 160 + pending, 161 + True, 162 + False, 163 + _config(), 164 + ) 165 + 166 + write_fifo.assert_not_called() 167 + assert not pending 168 + 169 + 170 + @pytest.mark.asyncio 171 + async def test_dispatch_drops_unrecognized_chat_event(): 172 + pending = OrderedDict() 173 + 174 + with patch("solstone_linux.chat_bridge._write_fifo") as write_fifo: 175 + await _dispatch_event( 176 + {"tract": "chat", "event": "unknown", "request_id": "req-1"}, 177 + pending, 178 + True, 179 + False, 180 + _config(), 181 + ) 182 + 183 + write_fifo.assert_not_called() 184 + assert not pending 185 + 186 + 187 + @pytest.mark.asyncio 188 + async def test_dispatch_recognized_events(): 189 + pending = OrderedDict() 190 + 191 + with patch("solstone_linux.chat_bridge._write_fifo") as write_fifo: 192 + await _dispatch_event(_payload(), pending, False, False, _config()) 193 + await _dispatch_event( 194 + _payload(EVENT_SOL_CHAT_REQUEST_SUPERSEDED), 195 + pending, 196 + False, 197 + False, 198 + _config(), 199 + ) 200 + await _dispatch_event( 201 + _payload(EVENT_OWNER_CHAT_OPEN), pending, False, False, _config() 202 + ) 203 + await _dispatch_event( 204 + _payload(EVENT_OWNER_CHAT_DISMISSED), pending, False, False, _config() 205 + ) 206 + 207 + assert write_fifo.call_count == 4 208 + 209 + 210 + @pytest.mark.asyncio 211 + async def test_request_opt_in_off_writes_fifo_without_notify(): 212 + pending = OrderedDict() 213 + 214 + with patch("solstone_linux.chat_bridge._write_fifo") as write_fifo: 215 + with patch("solstone_linux.chat_bridge._handle_one_notification") as notify: 216 + await _dispatch_event(_payload(), pending, False, False, _config()) 217 + 218 + write_fifo.assert_called_once_with("sol-ping req-1 hello\n") 219 + notify.assert_not_called() 220 + assert not pending 221 + 222 + 223 + def test_request_fifo_absent_no_error(tmp_path: Path): 224 + _write_fifo("sol-ping req hello\n", tmp_path / "missing") 225 + 226 + 227 + @pytest.mark.asyncio 228 + async def test_request_opt_in_on_not_stale_fires_notify(): 229 + pending = OrderedDict() 230 + 231 + with patch("solstone_linux.chat_bridge._write_fifo"): 232 + with patch( 233 + "solstone_linux.chat_bridge._handle_one_notification", new=_never_notify 234 + ): 235 + await _dispatch_event(_payload(), pending, True, False, _config()) 236 + 237 + assert list(pending) == ["req-1"] 238 + assert pending["req-1"].notify_task is not None 239 + await chat_bridge._cancel_pending_notifications(pending) 240 + 241 + 242 + @pytest.mark.asyncio 243 + async def test_request_stale_skips_notify_but_writes_fifo(): 244 + pending = OrderedDict() 245 + 246 + with patch("solstone_linux.chat_bridge._write_fifo") as write_fifo: 247 + with patch("solstone_linux.chat_bridge._handle_one_notification") as notify: 248 + await _dispatch_event(_payload(), pending, True, True, _config()) 249 + 250 + write_fifo.assert_called_once() 251 + notify.assert_not_called() 252 + assert not pending 253 + 254 + 255 + async def _assert_clear_event_cancels(event): 256 + pending = OrderedDict() 257 + task = asyncio.create_task(asyncio.Event().wait()) 258 + pending["req-1"] = PendingRequest("req-1", "hello", "https://server.test", task) 259 + 260 + with patch("solstone_linux.chat_bridge._write_fifo") as write_fifo: 261 + await _dispatch_event(_payload(event), pending, True, False, _config()) 262 + 263 + write_fifo.assert_called_once_with("clear req-1\n") 264 + assert not pending 265 + result = await asyncio.gather(task, return_exceptions=True) 266 + assert isinstance(result[0], asyncio.CancelledError) 267 + 268 + 269 + @pytest.mark.asyncio 270 + async def test_superseded_removes_pending_writes_clear_and_cancels_task(): 271 + await _assert_clear_event_cancels(EVENT_SOL_CHAT_REQUEST_SUPERSEDED) 272 + 273 + 274 + @pytest.mark.asyncio 275 + async def test_owner_chat_open_removes_pending_writes_clear_and_cancels_task(): 276 + await _assert_clear_event_cancels(EVENT_OWNER_CHAT_OPEN) 277 + 278 + 279 + @pytest.mark.asyncio 280 + async def test_owner_chat_dismissed_removes_pending_writes_clear_and_cancels_task(): 281 + await _assert_clear_event_cancels(EVENT_OWNER_CHAT_DISMISSED) 282 + 283 + 284 + def test_fifo_present_with_reader_succeeds(tmp_path: Path): 285 + fifo = tmp_path / "notify" 286 + os.mkfifo(fifo) 287 + reader = os.open(fifo, os.O_RDONLY | os.O_NONBLOCK) 288 + try: 289 + _write_fifo("line one\n", fifo) 290 + assert os.read(reader, 1024) == b"line one\n" 291 + finally: 292 + os.close(reader) 293 + 294 + 295 + def test_fifo_present_no_reader_enxio_swallowed(tmp_path: Path): 296 + fifo = tmp_path / "notify" 297 + os.mkfifo(fifo) 298 + 299 + _write_fifo("line one\n", fifo) 300 + 301 + 302 + def test_fifo_missing_noop(tmp_path: Path): 303 + _write_fifo("line one\n", tmp_path / "missing") 304 + 305 + 306 + def test_fifo_regular_file_noop(tmp_path: Path): 307 + regular = tmp_path / "notify" 308 + regular.write_text("") 309 + 310 + _write_fifo("line one\n", regular) 311 + 312 + assert regular.read_text() == "" 313 + 314 + 315 + def test_fifo_eagain_swallowed(tmp_path: Path): 316 + fifo = tmp_path / "notify" 317 + os.mkfifo(fifo) 318 + 319 + with patch( 320 + "solstone_linux.chat_bridge.os.open", 321 + side_effect=OSError(errno.EAGAIN, "try again"), 322 + ): 323 + _write_fifo("line one\n", fifo) 324 + 325 + 326 + def test_heartbeat_staleness_marks_stale_and_logs_once_after_60s(caplog): 327 + with patch("solstone_linux.chat_bridge.time.monotonic", return_value=1000): 328 + is_stale, stale_logged = _mark_stale_if_needed( 329 + 1000 - HEARTBEAT_STALE_SECONDS - 1, False, False 330 + ) 331 + assert is_stale 332 + assert stale_logged 333 + _mark_stale_if_needed(1000 - HEARTBEAT_STALE_SECONDS - 2, True, True) 334 + 335 + assert [r.message for r in caplog.records].count("Chat bridge heartbeat stale") == 1 336 + 337 + 338 + def test_heartbeat_new_frame_recovers_from_stale(caplog): 339 + with caplog.at_level(logging.INFO): 340 + is_stale, stale_logged = _mark_live_frame(True, True) 341 + 342 + assert not is_stale 343 + assert not stale_logged 344 + assert "Chat bridge heartbeat recovered" in [r.message for r in caplog.records] 345 + 346 + 347 + @pytest.mark.asyncio 348 + async def test_reconnect_transport_error_backoff_sequence(): 349 + stop_event = asyncio.Event() 350 + delays = [] 351 + 352 + async def fake_sleep(delay): 353 + delays.append(delay) 354 + if len(delays) >= 7: 355 + stop_event.set() 356 + 357 + with patch("solstone_linux.chat_bridge._sse_worker", new=_transport_worker): 358 + with patch("solstone_linux.chat_bridge._opt_in_poll_loop", new=_never_poll): 359 + with patch( 360 + "solstone_linux.chat_bridge.asyncio.sleep", side_effect=fake_sleep 361 + ): 362 + await run_chat_bridge(_config(), stop_event) 363 + 364 + assert delays == [1, 2, 4, 8, 16, 30, 30] 365 + 366 + 367 + @pytest.mark.asyncio 368 + async def test_reconnect_successful_frame_resets_backoff_index(): 369 + stop_event = asyncio.Event() 370 + attempts = 0 371 + delays = [] 372 + 373 + def worker(url, key, queue, loop, thread_stop): 374 + nonlocal attempts 375 + attempts += 1 376 + if attempts == 4: 377 + loop.call_soon_threadsafe(queue.put_nowait, {"_heartbeat": True}) 378 + loop.call_soon_threadsafe( 379 + queue.put_nowait, {"_transport_error": True, "error": "boom"} 380 + ) 381 + 382 + async def fake_sleep(delay): 383 + delays.append(delay) 384 + if len(delays) >= 4: 385 + stop_event.set() 386 + 387 + with patch("solstone_linux.chat_bridge._sse_worker", new=worker): 388 + with patch("solstone_linux.chat_bridge._opt_in_poll_loop", new=_never_poll): 389 + with patch( 390 + "solstone_linux.chat_bridge.asyncio.sleep", side_effect=fake_sleep 391 + ): 392 + await run_chat_bridge(_config(), stop_event) 393 + 394 + assert delays == [1, 2, 4, 1] 395 + 396 + 397 + @pytest.mark.asyncio 398 + async def test_terminal_401_exits_without_reconnect(caplog): 399 + with patch("solstone_linux.chat_bridge._sse_worker", new=_terminal_worker(401)): 400 + with patch("solstone_linux.chat_bridge._opt_in_poll_loop", new=_never_poll): 401 + with patch( 402 + "solstone_linux.chat_bridge.asyncio.sleep", new_callable=AsyncMock 403 + ) as sleep: 404 + await run_chat_bridge(_config(), asyncio.Event()) 405 + 406 + sleep.assert_not_called() 407 + assert "status 401" in caplog.text 408 + 409 + 410 + @pytest.mark.asyncio 411 + async def test_terminal_403_exits_without_reconnect(caplog): 412 + with patch("solstone_linux.chat_bridge._sse_worker", new=_terminal_worker(403)): 413 + with patch("solstone_linux.chat_bridge._opt_in_poll_loop", new=_never_poll): 414 + with patch( 415 + "solstone_linux.chat_bridge.asyncio.sleep", new_callable=AsyncMock 416 + ) as sleep: 417 + await run_chat_bridge(_config(), asyncio.Event()) 418 + 419 + sleep.assert_not_called() 420 + assert "status 403" in caplog.text 421 + 422 + 423 + @pytest.mark.asyncio 424 + async def test_click_post_reachable_posts_then_xdg_open(): 425 + proc = FakeProc(returncode=0) 426 + response = FakeResponse(status_code=200) 427 + 428 + with patch( 429 + "asyncio.create_subprocess_exec", new_callable=AsyncMock, return_value=proc 430 + ): 431 + with patch( 432 + "solstone_linux.chat_bridge.requests.post", return_value=response 433 + ) as post: 434 + with patch("solstone_linux.chat_bridge.subprocess.Popen") as popen: 435 + await _handle_one_notification( 436 + PendingRequest("req-1", "hello", "https://server.test/app/chat/x"), 437 + "https://server.test", 438 + "key-123", 439 + ) 440 + 441 + post.assert_called_once_with( 442 + "https://server.test/api/chat/sol_chat_request/open", 443 + json={"request_id": "req-1"}, 444 + headers={"Authorization": "Bearer key-123"}, 445 + timeout=10, 446 + ) 447 + popen.assert_called_once() 448 + 449 + 450 + @pytest.mark.asyncio 451 + async def test_click_post_unreachable_still_xdg_open(): 452 + proc = FakeProc(returncode=0) 453 + 454 + with patch( 455 + "asyncio.create_subprocess_exec", new_callable=AsyncMock, return_value=proc 456 + ): 457 + with patch( 458 + "solstone_linux.chat_bridge.requests.post", 459 + side_effect=chat_bridge.requests.RequestException("down"), 460 + ): 461 + with patch("solstone_linux.chat_bridge.subprocess.Popen") as popen: 462 + await _handle_one_notification( 463 + PendingRequest("req-1", "hello", "https://server.test/app/chat/x"), 464 + "https://server.test", 465 + "key-123", 466 + ) 467 + 468 + popen.assert_called_once() 469 + 470 + 471 + @pytest.mark.asyncio 472 + async def test_click_notify_nonzero_does_not_xdg_open(): 473 + proc = FakeProc(returncode=1) 474 + 475 + with patch( 476 + "asyncio.create_subprocess_exec", new_callable=AsyncMock, return_value=proc 477 + ): 478 + with patch("solstone_linux.chat_bridge.requests.post") as post: 479 + with patch("solstone_linux.chat_bridge.subprocess.Popen") as popen: 480 + await _handle_one_notification( 481 + PendingRequest("req-1", "hello", "https://server.test/app/chat/x"), 482 + "https://server.test", 483 + "key-123", 484 + ) 485 + 486 + post.assert_not_called() 487 + popen.assert_not_called() 488 + 489 + 490 + def test_chat_url_with_day_and_event_index(): 491 + assert ( 492 + _chat_url("https://server.test/", "20260509", 7) 493 + == "https://server.test/app/chat/20260509#event-7" 494 + ) 495 + 496 + 497 + def test_chat_url_missing_day_or_event_index_uses_today(): 498 + with patch("solstone_linux.chat_bridge.datetime") as mock_datetime: 499 + mock_datetime.now.return_value.strftime.return_value = "20260509" 500 + assert _chat_url("https://server.test/", None, None) == ( 501 + "https://server.test/app/chat/20260509" 502 + ) 503 + 504 + 505 + @pytest.mark.asyncio 506 + async def test_bridge_crash_isolation_logs_and_returns(caplog): 507 + def worker(url, key, queue, loop, thread_stop): 508 + loop.call_soon_threadsafe( 509 + queue.put_nowait, 510 + { 511 + "data": ( 512 + '{"tract": "chat", "event": "sol_chat_request", ' 513 + '"request_id": "req-1"}' 514 + ) 515 + }, 516 + ) 517 + 518 + with caplog.at_level(logging.ERROR): 519 + with patch("solstone_linux.chat_bridge._sse_worker", new=worker): 520 + with patch("solstone_linux.chat_bridge._opt_in_poll_loop", new=_never_poll): 521 + with patch( 522 + "solstone_linux.chat_bridge._dispatch_event", 523 + new_callable=AsyncMock, 524 + side_effect=RuntimeError("boom"), 525 + ): 526 + await run_chat_bridge(_config(), asyncio.Event()) 527 + 528 + error_records = [r for r in caplog.records if r.levelno == logging.ERROR] 529 + assert any("Chat bridge crashed" in r.message for r in error_records) 530 + assert any(r.exc_info for r in error_records) 531 + 532 + 533 + @pytest.mark.asyncio 534 + async def test_chat_bridge_enabled_false_no_sse_attempt(): 535 + with patch("solstone_linux.chat_bridge.requests.get") as get: 536 + await run_chat_bridge(_config(enabled=False), asyncio.Event()) 537 + 538 + get.assert_not_called() 539 + 540 + 541 + def test_observer_bridge_task_none_when_disabled(): 542 + import inspect 543 + 544 + from solstone_linux.observer import Observer 545 + 546 + source = inspect.getsource(Observer.main_loop) 547 + assert "if self.config.chat_bridge_enabled:" in source 548 + assert "bridge_task = None" in source 549 + 550 + 551 + @pytest.mark.asyncio 552 + async def test_pending_cap_33rd_entry_evicts_oldest_and_cancels_task(caplog): 553 + pending = OrderedDict() 554 + tasks = [] 555 + 556 + with caplog.at_level(logging.DEBUG): 557 + with patch("solstone_linux.chat_bridge._write_fifo"): 558 + with patch( 559 + "solstone_linux.chat_bridge._handle_one_notification", new=_never_notify 560 + ): 561 + for i in range(PENDING_CAP + 1): 562 + await _dispatch_event( 563 + _payload(request_id=f"req-{i}"), 564 + pending, 565 + True, 566 + False, 567 + _config(), 568 + ) 569 + if i < PENDING_CAP: 570 + tasks.append(pending[f"req-{i}"].notify_task) 571 + 572 + assert "req-0" not in pending 573 + assert len(pending) == PENDING_CAP 574 + assert "Evicted pending chat request: req-0" in caplog.text 575 + result = await asyncio.gather(tasks[0], return_exceptions=True) 576 + assert isinstance(result[0], asyncio.CancelledError) 577 + await chat_bridge._cancel_pending_notifications(pending) 578 + 579 + 580 + def test_constants_forbidden_literals_appear_once_in_src_only_in_chat_bridge_module_level(): 581 + src_dir = Path(__file__).resolve().parents[1] / "src" / "solstone_linux" 582 + files = list(src_dir.glob("*.py")) 583 + event_literals = [ 584 + '"sol_chat_request"', 585 + '"sol_chat_request_superseded"', 586 + '"owner_chat_open"', 587 + '"owner_chat_dismissed"', 588 + ] 589 + 590 + for literal in event_literals: 591 + hits = [] 592 + for path in files: 593 + for lineno, line in enumerate(path.read_text().splitlines(), 1): 594 + if literal in line: 595 + hits.append((path.name, lineno, line.strip())) 596 + assert len(hits) == 1 597 + assert hits[0][0] == "chat_bridge.py" 598 + 599 + text = (src_dir / "chat_bridge.py").read_text() 600 + assert text.count('NOTIFY_TITLE = "sol"') == 1 601 + assert text.count('SURFACE = "linux"') == 1