personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Fix observer data loss on upload failure, discover convey port dynamically

Observer was deleting captured screencast/audio data (cleanup_draft) even
when upload to convey failed. On upload failure, draft directories are now
renamed to final segment format so the dream pipeline processes them locally.

Replaced hardcoded DEFAULT_URL (stale port 5173) with read_service_port()
discovery — observer now finds convey's actual port from health/convey.port.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+94 -11
+38 -3
apps/remote/tests/test_observer_client.py
··· 22 22 23 23 @pytest.fixture 24 24 def mock_config(): 25 - with patch("observe.remote_client.get_config") as mock: 25 + with patch("observe.remote_client.get_config") as mock, patch( 26 + "observe.remote_client.read_service_port" 27 + ) as mock_port: 26 28 mock.return_value = {} 29 + mock_port.return_value = 8000 27 30 yield mock 28 31 29 32 ··· 35 38 36 39 37 40 def test_observer_client_init(mock_session, mock_config): 38 - from observe.remote_client import DEFAULT_URL, ObserverClient 41 + from observe.remote_client import ObserverClient 39 42 40 43 client = ObserverClient("main-stream") 41 44 42 - assert client._url == DEFAULT_URL 45 + assert client._url == "http://localhost:8000" 43 46 assert client._key is None 44 47 assert client._name == "main-stream" 45 48 assert client._stream == "main-stream" 46 49 assert client._auto_register is True 50 + 51 + 52 + def test_observer_client_init_no_port(mock_session): 53 + """When no config URL and no convey.port file, _url is empty.""" 54 + from observe.remote_client import ObserverClient 55 + 56 + with patch("observe.remote_client.get_config") as cfg, patch( 57 + "observe.remote_client.read_service_port" 58 + ) as port: 59 + cfg.return_value = {} 60 + port.return_value = None 61 + client = ObserverClient("main-stream") 62 + 63 + assert client._url == "" 47 64 48 65 49 66 def test_observer_client_init_with_config(mock_session, mock_config): ··· 323 340 cleanup_draft(str(draft_dir)) 324 341 325 342 assert not draft_dir.exists() 343 + 344 + 345 + def test_finalize_draft(tmp_path): 346 + from observe.remote_client import finalize_draft 347 + 348 + draft_dir = tmp_path / "091551_draft" 349 + draft_dir.mkdir() 350 + (draft_dir / "screen.webm").write_text("video") 351 + (draft_dir / "audio.flac").write_text("audio") 352 + 353 + result = finalize_draft(str(draft_dir), "091551_300") 354 + 355 + assert result == str(tmp_path / "091551_300") 356 + assert not draft_dir.exists() 357 + final = tmp_path / "091551_300" 358 + assert final.exists() 359 + assert (final / "screen.webm").read_text() == "video" 360 + assert (final / "audio.flac").read_text() == "audio" 326 361 327 362 328 363 def test_upload_duplicate_response(mock_session, mock_config, tmp_path):
+13 -3
observe/linux/observer.py
··· 39 39 from observe.hear import AudioRecorder 40 40 from observe.linux.audio import is_sink_muted 41 41 from observe.linux.screencast import Screencaster, StreamInfo 42 - from observe.remote_client import ObserverClient, cleanup_draft 42 + from observe.remote_client import ObserverClient, cleanup_draft, finalize_draft 43 43 from observe.utils import create_draft_folder, get_timestamp_parts 44 44 from think.streams import stream_name 45 45 from think.utils import setup_cli ··· 296 296 for f in os.listdir(self.draft_dir) 297 297 if (draft_path / f).is_file() 298 298 ] 299 + uploaded = False 299 300 if draft_files and self._client: 300 301 meta = {"host": HOST, "platform": PLATFORM, "stream": self.stream} 301 302 result = self._client.upload_segment( ··· 305 306 logger.info( 306 307 f"Segment uploaded: {segment_key} ({len(draft_files)} files)" 307 308 ) 309 + uploaded = True 308 310 else: 309 311 logger.error(f"Segment upload failed: {segment_key}") 310 - cleanup_draft(self.draft_dir) 312 + if uploaded: 313 + cleanup_draft(self.draft_dir) 314 + else: 315 + finalize_draft(self.draft_dir, segment_key) 311 316 elif self.draft_dir and not files: 312 317 cleanup_draft(self.draft_dir) 313 318 ··· 573 578 for f in os.listdir(self.draft_dir) 574 579 if (draft_path / f).is_file() 575 580 ] 581 + uploaded = False 576 582 if draft_files and self._client: 577 583 meta = {"host": HOST, "platform": PLATFORM, "stream": self.stream} 578 584 result = self._client.upload_segment( ··· 582 588 logger.info( 583 589 f"Final segment uploaded: {segment_key} ({len(draft_files)} files)" 584 590 ) 591 + uploaded = True 585 592 else: 586 593 logger.error(f"Final segment upload failed: {segment_key}") 587 - cleanup_draft(self.draft_dir) 594 + if uploaded: 595 + cleanup_draft(self.draft_dir) 596 + else: 597 + finalize_draft(self.draft_dir, segment_key) 588 598 elif self.draft_dir: 589 599 cleanup_draft(self.draft_dir) 590 600
+36 -3
observe/remote_client.py
··· 14 14 15 15 import requests 16 16 17 - from think.utils import get_config, get_journal 17 + from think.utils import get_config, get_journal, read_service_port 18 18 19 19 logger = logging.getLogger(__name__) 20 20 HOST = socket.gethostname() 21 21 PLATFORM = platform.system().lower() 22 - DEFAULT_URL = "http://localhost:5173" 23 22 RETRY_BACKOFF = [1, 5, 15] 24 23 MAX_RETRIES = 3 25 24 UPLOAD_TIMEOUT = 300 ··· 43 42 pass 44 43 45 44 45 + def finalize_draft(draft_dir: str, segment_key: str) -> str | None: 46 + """Rename a draft directory to its final segment name. 47 + 48 + Preserves captured data locally when remote upload fails, so the 49 + dream pipeline can process it later. 50 + 51 + Args: 52 + draft_dir: Path to the draft directory (e.g. .../HHMMSS_draft/) 53 + segment_key: Final segment name (e.g. "091551_300") 54 + 55 + Returns: 56 + Path to the finalized directory, or None on failure. 57 + """ 58 + final_dir = os.path.join(os.path.dirname(draft_dir), segment_key) 59 + try: 60 + os.rename(draft_dir, final_dir) 61 + logger.info(f"Finalized draft locally: {final_dir}") 62 + return final_dir 63 + except OSError as e: 64 + logger.error(f"Failed to finalize draft {draft_dir} -> {final_dir}: {e}") 65 + return None 66 + 67 + 46 68 class ObserverClient: 47 69 """HTTP client for uploading observer segments to the remote ingest server.""" 48 70 ··· 54 76 ): 55 77 config = get_config() 56 78 remote_cfg = config.get("observe", {}).get("remote", {}) 57 - self._url = remote_cfg.get("url", DEFAULT_URL).rstrip("/") 79 + self._url = remote_cfg.get("url", "").rstrip("/") 80 + if not self._url: 81 + # Discover local convey port from health directory 82 + port = read_service_port("convey") 83 + if port: 84 + self._url = f"http://localhost:{port}" 85 + logger.info(f"Discovered convey at port {port}") 86 + else: 87 + logger.warning("No convey port found in health directory") 88 + self._url = "" 58 89 self._key = remote_cfg.get("key") 59 90 self._auto_register = remote_cfg.get("auto_register", True) 60 91 self._name = remote_cfg.get("name") or stream ··· 90 121 91 122 def _ensure_registered(self) -> None: 92 123 if self._key: 124 + return 125 + if not self._url: 93 126 return 94 127 if not self._auto_register: 95 128 logger.error(
+7 -2
observe/tmux/observer.py
··· 23 23 import time 24 24 from pathlib import Path 25 25 26 - from observe.remote_client import ObserverClient, cleanup_draft 26 + from observe.remote_client import ObserverClient, cleanup_draft, finalize_draft 27 27 from observe.tmux.capture import TmuxCapture, write_captures_jsonl 28 28 from observe.utils import create_draft_folder, get_timestamp_parts 29 29 from think.streams import stream_name ··· 150 150 for f in os.listdir(self.draft_dir) 151 151 if (draft_path / f).is_file() 152 152 ] 153 + uploaded = False 153 154 if draft_files and self._client: 154 155 meta = {"host": HOST, "platform": PLATFORM, "stream": self.stream} 155 156 result = self._client.upload_segment( ··· 159 160 logger.info( 160 161 f"Segment uploaded: {segment_key} ({len(draft_files)} files)" 161 162 ) 163 + uploaded = True 162 164 else: 163 165 logger.error(f"Segment upload failed: {segment_key}") 164 - cleanup_draft(self.draft_dir) 166 + if uploaded: 167 + cleanup_draft(self.draft_dir) 168 + else: 169 + finalize_draft(self.draft_dir, segment_key) 165 170 166 171 self._reset_capture_state() 167 172 return tmux_files