personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Remove obsolete sol sync command

Sync is being replaced by `sol transfer send` (separate work). This removes
the sync module, its tests, all CLI wiring, and the supervisor integration.
The `--remote` flag and remote-mode infrastructure are preserved for reuse.

Deleted:
- observe/sync.py (SyncService, RemoteClient, check_remote_health)
- tests/test_sync.py
- apps/observer/tests/test_client.py

Edited:
- sol.py: removed sync from COMMANDS and GROUPS
- think/supervisor.py: removed import, start_sync(), and remote-mode sync launch
- tests/test_supervisor.py: removed test_start_sync
- docs/SOLCLI.md: removed sync from command table

+4 -2075
-188
apps/observer/tests/test_client.py
··· 1 - # SPDX-License-Identifier: AGPL-3.0-only 2 - # Copyright (c) 2026 sol pbc 3 - 4 - """Tests for RemoteClient with mocked HTTP calls.""" 5 - 6 - from __future__ import annotations 7 - 8 - from unittest.mock import MagicMock, patch 9 - 10 - import pytest 11 - 12 - 13 - @pytest.fixture 14 - def mock_session(): 15 - """Create a mock requests session.""" 16 - with patch("observe.sync.requests.Session") as mock: 17 - session = MagicMock() 18 - mock.return_value = session 19 - yield session 20 - 21 - 22 - def test_remote_client_init(): 23 - """Test RemoteClient initialization.""" 24 - from observe.sync import RemoteClient 25 - 26 - client = RemoteClient("https://server:5000/app/observer/ingest/abc123") 27 - 28 - assert client.remote_url == "https://server:5000/app/observer/ingest/abc123" 29 - 30 - 31 - def test_upload_segment_success(mock_session, tmp_path): 32 - """Test successful file upload.""" 33 - from observe.sync import RemoteClient 34 - 35 - # Create test files 36 - file1 = tmp_path / "audio.flac" 37 - file1.write_bytes(b"audio data") 38 - file2 = tmp_path / "video.webm" 39 - file2.write_bytes(b"video data") 40 - 41 - # Mock successful response 42 - mock_response = MagicMock() 43 - mock_response.status_code = 200 44 - mock_response.json.return_value = { 45 - "files": ["audio.flac", "video.webm"], 46 - "bytes": 20, 47 - } 48 - mock_session.post.return_value = mock_response 49 - 50 - client = RemoteClient("https://server/ingest/key") 51 - result = client.upload_segment("20250103", "120000_300", [file1, file2]) 52 - 53 - assert result.success is True 54 - assert result.duplicate is False 55 - mock_session.post.assert_called_once() 56 - 57 - # Check the call arguments 58 - call_args = mock_session.post.call_args 59 - assert call_args[0][0] == "https://server/ingest/key" 60 - # Verify required fields (host/platform are also sent but vary by machine) 61 - data = call_args[1]["data"] 62 - assert data["day"] == "20250103" 63 - assert data["segment"] == "120000_300" 64 - assert "host" in data 65 - assert "platform" in data 66 - 67 - 68 - def test_upload_segment_retry_on_failure(mock_session, tmp_path): 69 - """Test that upload retries on failure.""" 70 - from observe.sync import RemoteClient 71 - 72 - # Create test file 73 - file1 = tmp_path / "audio.flac" 74 - file1.write_bytes(b"audio data") 75 - 76 - # Mock failure then success 77 - mock_failure = MagicMock() 78 - mock_failure.status_code = 500 79 - mock_failure.text = "Server error" 80 - 81 - mock_success = MagicMock() 82 - mock_success.status_code = 200 83 - mock_success.json.return_value = {"files": ["audio.flac"], "bytes": 10} 84 - 85 - mock_session.post.side_effect = [mock_failure, mock_success] 86 - 87 - # Patch sleep to avoid delays 88 - with patch("observe.sync.time.sleep"): 89 - client = RemoteClient("https://server/ingest/key") 90 - result = client.upload_segment("20250103", "120000_300", [file1]) 91 - 92 - assert result.success is True 93 - assert mock_session.post.call_count == 2 94 - 95 - 96 - def test_upload_segment_all_retries_fail(mock_session, tmp_path): 97 - """Test that upload returns False after all retries fail.""" 98 - from observe.sync import RETRY_BACKOFF, RemoteClient 99 - 100 - # Create test file 101 - file1 = tmp_path / "audio.flac" 102 - file1.write_bytes(b"audio data") 103 - 104 - # Mock all failures 105 - mock_failure = MagicMock() 106 - mock_failure.status_code = 500 107 - mock_failure.text = "Server error" 108 - mock_session.post.return_value = mock_failure 109 - 110 - # Patch sleep to avoid delays 111 - with patch("observe.sync.time.sleep"): 112 - client = RemoteClient("https://server/ingest/key") 113 - result = client.upload_segment("20250103", "120000_300", [file1]) 114 - 115 - assert result.success is False 116 - assert mock_session.post.call_count == len(RETRY_BACKOFF) 117 - 118 - 119 - def test_upload_segment_skips_missing_files(mock_session, tmp_path): 120 - """Test that upload skips missing files.""" 121 - from observe.sync import RemoteClient 122 - 123 - # Create one existing file 124 - file1 = tmp_path / "exists.flac" 125 - file1.write_bytes(b"data") 126 - 127 - # Reference a missing file 128 - file2 = tmp_path / "missing.flac" 129 - 130 - # Mock successful response 131 - mock_response = MagicMock() 132 - mock_response.status_code = 200 133 - mock_response.json.return_value = {"files": ["exists.flac"], "bytes": 4} 134 - mock_session.post.return_value = mock_response 135 - 136 - client = RemoteClient("https://server/ingest/key") 137 - result = client.upload_segment("20250103", "120000_300", [file1, file2]) 138 - 139 - assert result.success is True 140 - 141 - 142 - def test_upload_segment_fails_if_all_missing(mock_session, tmp_path): 143 - """Test that upload fails if all files are missing.""" 144 - from observe.sync import RemoteClient 145 - 146 - # Reference missing files 147 - file1 = tmp_path / "missing1.flac" 148 - file2 = tmp_path / "missing2.flac" 149 - 150 - client = RemoteClient("https://server/ingest/key") 151 - result = client.upload_segment("20250103", "120000_300", [file1, file2]) 152 - 153 - assert result.success is False 154 - mock_session.post.assert_not_called() 155 - 156 - 157 - def test_upload_segment_empty_list(mock_session): 158 - """Test that upload fails with empty file list.""" 159 - from observe.sync import RemoteClient 160 - 161 - client = RemoteClient("https://server/ingest/key") 162 - result = client.upload_segment("20250103", "120000_300", []) 163 - 164 - assert result.success is False 165 - mock_session.post.assert_not_called() 166 - 167 - 168 - def test_upload_segment_duplicate_response(mock_session, tmp_path): 169 - """Test that duplicate server response is detected.""" 170 - from observe.sync import RemoteClient 171 - 172 - file1 = tmp_path / "audio.flac" 173 - file1.write_bytes(b"audio data") 174 - 175 - mock_response = MagicMock() 176 - mock_response.status_code = 200 177 - mock_response.json.return_value = { 178 - "status": "duplicate", 179 - "existing_segment": "120000_300", 180 - "message": "All files already received", 181 - } 182 - mock_session.post.return_value = mock_response 183 - 184 - client = RemoteClient("https://server/ingest/key") 185 - result = client.upload_segment("20250103", "120000_300", [file1]) 186 - 187 - assert result.success is True 188 - assert result.duplicate is True
+1 -1
docs/SOLCLI.md
··· 295 295 |-------|----------| 296 296 | Think (processing) | `import`, `dream`, `planner`, `indexer`, `supervisor`, `schedule`, `top`, `health`, `callosum`, `notify`, `heartbeat` | 297 297 | Service | `service` (+ aliases `up`, `down`, `start`) | 298 - | Observe (capture) | `transcribe`, `describe`, `sense`, `sync`, `transfer`, `observer` | 298 + | Observe (capture) | `transcribe`, `describe`, `sense`, `transfer`, `observer` | 299 299 | Talent (AI agents) | `agents`, `cortex`, `talent`, `call`, `engage` | 300 300 | Convey (web UI) | `convey`, `restart-convey`, `screenshot`, `maint` | 301 301 | Specialized | `config`, `streams`, `journal-stats`, `formatter`, `detect-created` |
-792
observe/sync.py
··· 1 - # SPDX-License-Identifier: AGPL-3.0-only 2 - # Copyright (c) 2026 sol pbc 3 - 4 - """Sync service for remote observer uploads. 5 - 6 - Listens for observe.observing events and uploads segments to a remote server. 7 - Processes one segment at a time: upload, confirm via sha256 match, cleanup. 8 - 9 - State is persisted in YYYYMMDD/health/sync.jsonl files for crash recovery. 10 - """ 11 - 12 - from __future__ import annotations 13 - 14 - import argparse 15 - import json 16 - import logging 17 - import platform 18 - import queue 19 - import signal 20 - import socket 21 - import threading 22 - import time 23 - from dataclasses import dataclass 24 - from datetime import datetime 25 - from pathlib import Path 26 - from typing import Any, NamedTuple 27 - from urllib.parse import urlparse 28 - 29 - import requests 30 - 31 - from think.callosum import CallosumConnection 32 - from think.utils import day_path, get_rev, now_ms, setup_cli 33 - 34 - from .utils import compute_file_sha256 35 - 36 - logger = logging.getLogger(__name__) 37 - 38 - # Host identification 39 - HOST = socket.gethostname() 40 - PLATFORM = platform.system().lower() 41 - 42 - # Retry configuration for RemoteClient 43 - MAX_RETRIES = 3 44 - RETRY_BACKOFF = [1, 5, 15] # seconds 45 - UPLOAD_TIMEOUT = 300 # 5 minutes for large files 46 - HEALTH_CHECK_TIMEOUT = 10 # seconds for startup health check 47 - 48 - 49 - class UploadResult(NamedTuple): 50 - """Result of an upload_segment() call.""" 51 - 52 - success: bool 53 - duplicate: bool = False 54 - 55 - 56 - def check_remote_health( 57 - remote_url: str, timeout: float = HEALTH_CHECK_TIMEOUT 58 - ) -> tuple[bool, str]: 59 - """Check if remote server is reachable and key is valid. 60 - 61 - Hits the segments endpoint to verify connectivity and authentication. 62 - Intended to be called at startup before launching sync service. 63 - 64 - Args: 65 - remote_url: Full URL to remote ingest endpoint (including key) 66 - e.g., "https://server:5000/app/observer/ingest/abc123..." 67 - timeout: Request timeout in seconds (default: 10) 68 - 69 - Returns: 70 - Tuple of (success, message): 71 - - (True, "Connected to host:port (key: prefix)") on success 72 - - (False, "error description") on failure 73 - """ 74 - # Parse URL for readable logging 75 - remote_url = remote_url.rstrip("/") 76 - try: 77 - parsed = urlparse(remote_url) 78 - host = parsed.netloc or parsed.hostname or "unknown" 79 - # Extract key from path: /app/observer/ingest/KEY -> KEY[:8] 80 - path_parts = parsed.path.split("/") 81 - key_prefix = path_parts[-1][:8] if path_parts else "unknown" 82 - except Exception: 83 - host = "unknown" 84 - key_prefix = "unknown" 85 - 86 - # Build segments endpoint URL with today's date 87 - today = datetime.now().strftime("%Y%m%d") 88 - segments_url = f"{remote_url}/segments/{today}" 89 - 90 - try: 91 - response = requests.get(segments_url, timeout=timeout) 92 - 93 - if response.status_code == 200: 94 - return (True, f"Connected to {host} (key: {key_prefix})") 95 - elif response.status_code == 401: 96 - return (False, "Invalid key (401) - check remote URL") 97 - elif response.status_code == 403: 98 - try: 99 - error = response.json().get("error", "forbidden") 100 - except Exception: 101 - error = "forbidden" 102 - return (False, f"Key rejected (403): {error}") 103 - else: 104 - return (False, f"Unexpected response ({response.status_code}) from {host}") 105 - 106 - except requests.exceptions.Timeout: 107 - return (False, f"Connection timeout after {timeout}s to {host}") 108 - except requests.exceptions.ConnectionError as e: 109 - # Extract cleaner error message 110 - err_str = str(e) 111 - if "Connection refused" in err_str: 112 - return (False, f"Connection refused: {host}") 113 - elif "Name or service not known" in err_str or "getaddrinfo failed" in err_str: 114 - return (False, f"Host not found: {host}") 115 - else: 116 - return (False, f"Connection error: {host}") 117 - except Exception as e: 118 - return (False, f"Health check failed: {e}") 119 - 120 - 121 - class RemoteClient: 122 - """Client for uploading segment files to a remote server.""" 123 - 124 - def __init__(self, remote_url: str): 125 - """Initialize remote client. 126 - 127 - Args: 128 - remote_url: Full URL to remote ingest endpoint (including key) 129 - e.g., "https://server:5000/app/observer/ingest/abc123..." 130 - """ 131 - self.remote_url = remote_url.rstrip("/") 132 - self.session = requests.Session() 133 - 134 - def upload_segment( 135 - self, 136 - day: str, 137 - segment: str, 138 - files: list[Path], 139 - meta: dict | None = None, 140 - ) -> UploadResult: 141 - """Upload segment files to remote server. 142 - 143 - Args: 144 - day: Day string (YYYYMMDD) 145 - segment: Segment key (HHMMSS_LEN) 146 - files: List of file paths to upload 147 - meta: Optional metadata dict (host, platform, facet, setting, etc.) 148 - to include in the segment. Will be JSON-encoded. If meta 149 - doesn't contain host/platform, they're sent as top-level 150 - fields and the server merges them into meta. 151 - 152 - Returns: 153 - UploadResult with success=True/False and duplicate=True if server 154 - reported duplicate 155 - """ 156 - if not files: 157 - logger.warning("No files to upload") 158 - return UploadResult(False) 159 - 160 - for attempt, delay in enumerate(RETRY_BACKOFF): 161 - # Open file handles and ensure they're closed 162 - file_handles = [] 163 - files_data = [] 164 - try: 165 - # Build files list for requests 166 - for path in files: 167 - if not path.exists(): 168 - logger.warning(f"File not found, skipping: {path}") 169 - continue 170 - fh = open(path, "rb") 171 - file_handles.append(fh) 172 - files_data.append( 173 - ("files", (path.name, fh, "application/octet-stream")) 174 - ) 175 - 176 - if not files_data: 177 - logger.error("No valid files to upload") 178 - return UploadResult(False) 179 - 180 - # Build request data 181 - data: dict[str, Any] = { 182 - "day": day, 183 - "segment": segment, 184 - } 185 - # Only send top-level host/platform if not already in meta 186 - # (avoids redundant data; server merges them into meta if missing) 187 - if not meta or "host" not in meta: 188 - data["host"] = HOST 189 - if not meta or "platform" not in meta: 190 - data["platform"] = PLATFORM 191 - if meta: 192 - data["meta"] = json.dumps(meta) 193 - 194 - response = self.session.post( 195 - self.remote_url, 196 - data=data, 197 - files=files_data, 198 - timeout=UPLOAD_TIMEOUT, 199 - ) 200 - 201 - if response.status_code == 200: 202 - resp_data = response.json() 203 - is_duplicate = resp_data.get("status") == "duplicate" 204 - if is_duplicate: 205 - logger.info(f"Server reported duplicate for {day}/{segment}") 206 - else: 207 - logger.info( 208 - f"Uploaded {len(resp_data.get('files', []))} files " 209 - f"({resp_data.get('bytes', 0)} bytes) for {day}/{segment}" 210 - ) 211 - return UploadResult(True, duplicate=is_duplicate) 212 - 213 - logger.warning(f"Upload failed: {response.status_code} {response.text}") 214 - 215 - except requests.RequestException as e: 216 - logger.warning(f"Upload attempt {attempt + 1} failed: {e}") 217 - 218 - finally: 219 - # Always close file handles 220 - for fh in file_handles: 221 - try: 222 - fh.close() 223 - except Exception: 224 - pass 225 - 226 - if attempt < len(RETRY_BACKOFF) - 1: 227 - logger.info(f"Retrying upload in {delay}s...") 228 - time.sleep(delay) 229 - 230 - logger.error(f"Upload failed after {MAX_RETRIES} attempts: {day}/{segment}") 231 - return UploadResult(False) 232 - 233 - 234 - # Confirmation polling configuration 235 - CONFIRM_POLL_INTERVAL = 5 # seconds between confirmation checks 236 - CONFIRM_MAX_ATTEMPTS = 12 # 5s * 12 = 60s max wait before retry 237 - 238 - 239 - @dataclass 240 - class SegmentInfo: 241 - """Info about a segment to sync.""" 242 - 243 - day: str 244 - segment: str 245 - files: list[dict] # [{name, sha256}, ...] 246 - meta: dict | None = None # Optional metadata (host, platform, facet, etc.) 247 - 248 - 249 - def get_sync_state_path(day: str) -> Path: 250 - """Get path to sync state file for a day.""" 251 - health_dir = day_path(day) / "health" 252 - health_dir.mkdir(parents=True, exist_ok=True) 253 - return health_dir / "sync.jsonl" 254 - 255 - 256 - def append_sync_record(day: str, record: dict) -> None: 257 - """Append a record to the sync state file.""" 258 - state_path = get_sync_state_path(day) 259 - with open(state_path, "a", encoding="utf-8") as f: 260 - f.write(json.dumps(record, ensure_ascii=False) + "\n") 261 - 262 - 263 - def load_sync_state(day: str) -> list[dict]: 264 - """Load sync state records for a day.""" 265 - state_path = get_sync_state_path(day) 266 - if not state_path.exists(): 267 - return [] 268 - 269 - records = [] 270 - try: 271 - with open(state_path, encoding="utf-8") as f: 272 - for line in f: 273 - line = line.strip() 274 - if line: 275 - records.append(json.loads(line)) 276 - except (json.JSONDecodeError, OSError) as e: 277 - logger.warning(f"Failed to load sync state {state_path}: {e}") 278 - return records 279 - 280 - 281 - def get_pending_segments(days_back: int = 7) -> list[SegmentInfo]: 282 - """Scan journal for pending segments that need sync. 283 - 284 - Finds segments with 'pending' status but no 'confirmed' status. 285 - 286 - Args: 287 - days_back: Number of days to scan back 288 - 289 - Returns: 290 - List of SegmentInfo for pending segments 291 - """ 292 - from datetime import datetime, timedelta 293 - 294 - pending = [] 295 - today = datetime.now() 296 - 297 - for i in range(days_back): 298 - day_date = today - timedelta(days=i) 299 - day = day_date.strftime("%Y%m%d") 300 - 301 - # Use day_path() for consistency with load_sync_state 302 - day_dir = day_path(day) 303 - if not day_dir.exists(): 304 - continue 305 - 306 - # Load state and find pending without confirmed 307 - records = load_sync_state(day) 308 - 309 - # Track status per segment 310 - segment_status: dict[str, dict] = {} 311 - for record in records: 312 - seg = record.get("segment", "") 313 - status = record.get("status", "") 314 - 315 - if seg not in segment_status: 316 - segment_status[seg] = {"pending": None, "confirmed": False} 317 - 318 - if status == "pending": 319 - segment_status[seg]["pending"] = record 320 - elif status == "confirmed": 321 - segment_status[seg]["confirmed"] = True 322 - 323 - # Collect pending segments 324 - for seg, info in segment_status.items(): 325 - if info["pending"] and not info["confirmed"]: 326 - pending_record = info["pending"] 327 - pending.append( 328 - SegmentInfo( 329 - day=day, 330 - segment=seg, 331 - files=pending_record.get("files", []), 332 - meta=pending_record.get("meta"), 333 - ) 334 - ) 335 - 336 - return pending 337 - 338 - 339 - class SyncService: 340 - """Service for syncing segments to a remote server.""" 341 - 342 - def __init__(self, remote_url: str, days_back: int = 7): 343 - """Initialize sync service. 344 - 345 - Args: 346 - remote_url: Full URL to remote ingest endpoint (including key) 347 - days_back: Number of days to scan back on startup 348 - """ 349 - self.remote_url = remote_url 350 - self.days_back = days_back 351 - 352 - self._client = RemoteClient(remote_url) 353 - self._callosum: CallosumConnection | None = None 354 - 355 - # Segment queue 356 - self._queue: queue.Queue[SegmentInfo] = queue.Queue() 357 - 358 - # Worker thread 359 - self._worker_thread: threading.Thread | None = None 360 - self._stop_event = threading.Event() 361 - self._draining = False 362 - 363 - # Status tracking 364 - self._current_segment: SegmentInfo | None = None 365 - self._current_state: str | None = None # "uploading" | "confirming" 366 - self._confirm_attempt = 0 367 - self._last_confirmed: str | None = None 368 - self._last_status_emit = 0.0 369 - self._lock = threading.Lock() 370 - 371 - def start(self) -> None: 372 - """Start the sync service.""" 373 - # Start Callosum connection 374 - self._callosum = CallosumConnection(defaults={"rev": get_rev()}) 375 - self._callosum.start(callback=self._handle_message) 376 - 377 - # Scan for pending segments 378 - pending = get_pending_segments(self.days_back) 379 - if pending: 380 - logger.info(f"Found {len(pending)} pending segment(s) from previous run") 381 - for seg_info in pending: 382 - self._queue.put(seg_info) 383 - 384 - # Start worker thread 385 - self._stop_event.clear() 386 - self._worker_thread = threading.Thread(target=self._sync_worker, daemon=False) 387 - self._worker_thread.start() 388 - 389 - logger.info(f"Sync service started: {self.remote_url[:50]}...") 390 - 391 - def stop(self) -> None: 392 - """Stop the sync service, draining queued segments first.""" 393 - # Disconnect callosum first so no new events arrive 394 - if self._callosum: 395 - self._callosum.stop() 396 - self._callosum = None 397 - 398 - # Enter drain mode before setting stop_event 399 - self._draining = True 400 - 401 - remaining = self._queue.qsize() 402 - with self._lock: 403 - if self._current_segment: 404 - remaining += 1 405 - if remaining > 0: 406 - print(f"Syncing {remaining} remaining segment(s)...", flush=True) 407 - 408 - # Signal worker to exit main loop (it will drain remaining items) 409 - self._stop_event.set() 410 - 411 - if self._worker_thread: 412 - self._worker_thread.join(timeout=90.0) 413 - 414 - if self._worker_thread and self._worker_thread.is_alive(): 415 - logger.warning("Sync worker did not finish within 90s timeout") 416 - 417 - logger.info("Sync service stopped") 418 - 419 - def _handle_message(self, message: dict[str, Any]) -> None: 420 - """Handle incoming Callosum messages.""" 421 - if self._draining or self._stop_event.is_set(): 422 - return 423 - 424 - tract = message.get("tract") 425 - event = message.get("event") 426 - 427 - if tract != "observe" or event != "observing": 428 - return 429 - 430 - day = message.get("day") 431 - segment = message.get("segment") 432 - files = message.get("files", []) 433 - 434 - if not day or not segment or not files: 435 - logger.warning(f"Invalid observing event: {message}") 436 - return 437 - 438 - logger.info(f"Received observing event: {day}/{segment} ({len(files)} files)") 439 - 440 - # Build metadata dict from message fields 441 - # Observers emit host/platform/stream as top-level fields, and may include a meta dict 442 - meta: dict[str, Any] = {} 443 - if message.get("host"): 444 - meta["host"] = message["host"] 445 - if message.get("platform"): 446 - meta["platform"] = message["platform"] 447 - if message.get("stream"): 448 - meta["stream"] = message["stream"] 449 - # Merge any explicit meta dict (its values take precedence) 450 - if message.get("meta"): 451 - meta.update(message["meta"]) 452 - 453 - # Compute sha256 for all files 454 - stream = meta.get("stream", "") if meta else "" 455 - segment_dir = ( 456 - day_path(day) / stream / segment if stream else day_path(day) / segment 457 - ) 458 - file_info = [] 459 - for filename in files: 460 - file_path = segment_dir / filename 461 - if not file_path.exists(): 462 - logger.warning(f"File not found: {file_path}") 463 - continue 464 - if file_path.stat().st_size == 0: 465 - logger.warning(f"Skipping 0-byte file: {file_path}") 466 - continue 467 - sha = compute_file_sha256(file_path) 468 - file_info.append({"name": filename, "sha256": sha}) 469 - 470 - if not file_info: 471 - logger.error(f"No valid files for segment {day}/{segment}") 472 - return 473 - 474 - # Write pending record (include meta for crash recovery) 475 - record: dict[str, Any] = { 476 - "ts": now_ms(), 477 - "segment": segment, 478 - "status": "pending", 479 - "files": file_info, 480 - } 481 - if meta: 482 - record["meta"] = meta 483 - append_sync_record(day, record) 484 - 485 - # Add to queue 486 - seg_info = SegmentInfo( 487 - day=day, segment=segment, files=file_info, meta=meta or None 488 - ) 489 - self._queue.put(seg_info) 490 - 491 - def _sync_worker(self) -> None: 492 - """Worker thread: upload segments one at a time.""" 493 - while not self._stop_event.is_set(): 494 - # Emit status periodically 495 - now = time.time() 496 - if now - self._last_status_emit >= 5: 497 - self._emit_status() 498 - self._last_status_emit = now 499 - 500 - # Try to get next segment 501 - try: 502 - seg_info = self._queue.get(timeout=1.0) 503 - except queue.Empty: 504 - continue 505 - 506 - # Process this segment 507 - self._process_segment(seg_info) 508 - 509 - # Drain remaining segments after stop 510 - if self._draining: 511 - while not self._queue.empty(): 512 - try: 513 - seg_info = self._queue.get_nowait() 514 - except queue.Empty: 515 - break 516 - logger.info(f"Draining: {self._queue.qsize() + 1} segment(s) remaining") 517 - self._process_segment(seg_info) 518 - 519 - def _process_segment(self, seg_info: SegmentInfo) -> None: 520 - """Process a single segment: upload, confirm, cleanup.""" 521 - day = seg_info.day 522 - segment = seg_info.segment 523 - stream = seg_info.meta.get("stream", "") if seg_info.meta else "" 524 - expected_sha256s = {f["name"]: f["sha256"] for f in seg_info.files} 525 - 526 - logger.info(f"Processing segment: {day}/{segment}") 527 - 528 - # Build segment directory path (includes stream level) 529 - seg_dir = ( 530 - day_path(day) / stream / segment if stream else day_path(day) / segment 531 - ) 532 - 533 - # Check if already confirmed on server before uploading 534 - # This handles crash recovery where we have a pending record but server already has it 535 - with self._lock: 536 - self._current_segment = seg_info 537 - self._current_state = "checking" 538 - self._confirm_attempt = 0 539 - 540 - if self._check_confirmation(day, segment, expected_sha256s): 541 - logger.info( 542 - f"Segment already confirmed on server: {day}/{segment}, skipping upload" 543 - ) 544 - # Write confirmed record and cleanup 545 - record = { 546 - "ts": now_ms(), 547 - "segment": segment, 548 - "status": "confirmed", 549 - } 550 - append_sync_record(day, record) 551 - 552 - file_paths = [seg_dir / f["name"] for f in seg_info.files] 553 - existing_files = [p for p in file_paths if p.exists()] 554 - self._cleanup_segment(seg_dir, existing_files) 555 - 556 - with self._lock: 557 - self._last_confirmed = f"{day}/{segment}" 558 - self._current_segment = None 559 - self._current_state = None 560 - return 561 - 562 - while self._draining or not self._stop_event.is_set(): 563 - # Upload 564 - with self._lock: 565 - self._current_segment = seg_info 566 - self._current_state = "uploading" 567 - self._confirm_attempt = 0 568 - 569 - segment_dir = seg_dir 570 - file_paths = [segment_dir / f["name"] for f in seg_info.files] 571 - 572 - # Filter to existing files 573 - existing_files = [p for p in file_paths if p.exists()] 574 - if not existing_files: 575 - logger.warning(f"No files found for segment {day}/{segment}, skipping") 576 - break 577 - 578 - result = self._client.upload_segment( 579 - day, segment, existing_files, meta=seg_info.meta 580 - ) 581 - if not result.success: 582 - logger.error(f"Upload failed for {day}/{segment}, will retry") 583 - time.sleep(CONFIRM_POLL_INTERVAL) 584 - continue 585 - 586 - if result.duplicate: 587 - # Server already has these files - mark confirmed immediately 588 - # without entering the confirmation polling loop 589 - record = { 590 - "ts": now_ms(), 591 - "segment": segment, 592 - "status": "confirmed", 593 - } 594 - append_sync_record(day, record) 595 - self._cleanup_segment(seg_dir, existing_files) 596 - with self._lock: 597 - self._last_confirmed = f"{day}/{segment}" 598 - logger.info(f"Duplicate upload confirmed, cleaned up: {day}/{segment}") 599 - break 600 - 601 - logger.info(f"Upload complete for {day}/{segment}, confirming...") 602 - 603 - # Confirm via sha256 match 604 - with self._lock: 605 - self._current_state = "confirming" 606 - self._confirm_attempt = 0 607 - 608 - confirmed = False 609 - for attempt in range(CONFIRM_MAX_ATTEMPTS): 610 - if not self._draining and self._stop_event.is_set(): 611 - return 612 - 613 - with self._lock: 614 - self._confirm_attempt = attempt + 1 615 - 616 - if self._check_confirmation(day, segment, expected_sha256s): 617 - confirmed = True 618 - break 619 - 620 - time.sleep(CONFIRM_POLL_INTERVAL) 621 - 622 - if confirmed: 623 - # Write confirmed record 624 - record = { 625 - "ts": now_ms(), 626 - "segment": segment, 627 - "status": "confirmed", 628 - } 629 - append_sync_record(day, record) 630 - 631 - # Cleanup local files 632 - self._cleanup_segment(seg_dir, existing_files) 633 - 634 - with self._lock: 635 - self._last_confirmed = f"{day}/{segment}" 636 - 637 - logger.info(f"Segment confirmed and cleaned up: {day}/{segment}") 638 - break 639 - else: 640 - logger.warning( 641 - f"Confirmation timeout for {day}/{segment}, retrying upload" 642 - ) 643 - # Loop back to upload 644 - 645 - with self._lock: 646 - self._current_segment = None 647 - self._current_state = None 648 - self._confirm_attempt = 0 649 - 650 - def _check_confirmation( 651 - self, day: str, segment: str, expected: dict[str, str] 652 - ) -> bool: 653 - """Check if segment is confirmed on server via sha256 match. 654 - 655 - Args: 656 - day: Day string 657 - segment: Segment key 658 - expected: Dict of {filename: sha256} expected values 659 - 660 - Returns: 661 - True if all files confirmed with matching sha256 662 - """ 663 - # Build segments endpoint URL 664 - # remote_url is like: https://server/app/observer/ingest/KEY 665 - # segments endpoint is: https://server/app/observer/ingest/KEY/segments/DAY 666 - segments_url = f"{self.remote_url}/segments/{day}" 667 - 668 - try: 669 - response = self._client.session.get(segments_url, timeout=30) 670 - if response.status_code != 200: 671 - logger.debug(f"Segments check failed: {response.status_code}") 672 - return False 673 - 674 - data = response.json() 675 - except Exception as e: 676 - logger.debug(f"Segments check error: {e}") 677 - return False 678 - 679 - # Find our segment in the response 680 - for seg_data in data: 681 - if seg_data.get("key") == segment: 682 - # Check all files have matching sha256 683 - server_files = { 684 - f["name"]: f.get("sha256", "") for f in seg_data.get("files", []) 685 - } 686 - 687 - for name, expected_sha in expected.items(): 688 - server_sha = server_files.get(name, "") 689 - if server_sha != expected_sha: 690 - logger.debug( 691 - f"SHA256 mismatch for {name}: " 692 - f"expected {expected_sha[:8]}..., got {server_sha[:8]}..." 693 - ) 694 - return False 695 - 696 - return True 697 - 698 - return False 699 - 700 - def _cleanup_segment(self, segment_dir: Path, file_paths: list[Path]) -> None: 701 - """Delete local segment files after confirmation.""" 702 - for path in file_paths: 703 - try: 704 - if path.exists(): 705 - path.unlink() 706 - logger.debug(f"Deleted: {path}") 707 - except OSError as e: 708 - logger.warning(f"Failed to delete {path}: {e}") 709 - 710 - # Try to remove segment directory if empty 711 - try: 712 - segment_dir.rmdir() 713 - logger.debug(f"Removed empty segment directory: {segment_dir}") 714 - except OSError: 715 - pass # Directory not empty or other error 716 - 717 - def _emit_status(self) -> None: 718 - """Emit sync.status event.""" 719 - if not self._callosum: 720 - return 721 - 722 - with self._lock: 723 - status = { 724 - "queue_size": self._queue.qsize(), 725 - "host": HOST, 726 - "platform": PLATFORM, 727 - } 728 - 729 - if self._current_segment: 730 - status["segment"] = ( 731 - f"{self._current_segment.day}/{self._current_segment.segment}" 732 - ) 733 - status["state"] = self._current_state 734 - if self._current_state == "confirming": 735 - status["confirm_attempt"] = self._confirm_attempt 736 - 737 - if self._last_confirmed: 738 - status["last_confirmed"] = self._last_confirmed 739 - 740 - self._callosum.emit("sync", "status", **status) 741 - 742 - 743 - def main(): 744 - """CLI entry point.""" 745 - parser = argparse.ArgumentParser( 746 - description="Sync service for remote observer uploads" 747 - ) 748 - parser.add_argument( 749 - "--remote", 750 - type=str, 751 - required=True, 752 - help="Remote server URL (e.g., https://server:5000/app/observer/ingest/KEY)", 753 - ) 754 - parser.add_argument( 755 - "--days-back", 756 - type=int, 757 - default=7, 758 - help="Number of days to scan for pending segments on startup (default: 7)", 759 - ) 760 - args = setup_cli(parser) 761 - 762 - service = SyncService( 763 - remote_url=args.remote, 764 - days_back=args.days_back, 765 - ) 766 - 767 - shutdown_event = threading.Event() 768 - 769 - def shutdown_handler(signum, frame): 770 - logger.info(f"Received signal {signum}, shutting down...") 771 - shutdown_event.set() 772 - 773 - signal.signal(signal.SIGTERM, shutdown_handler) 774 - signal.signal(signal.SIGINT, shutdown_handler) 775 - 776 - logger.info("Starting sync service...") 777 - try: 778 - service.start() 779 - 780 - # Main loop - wait for shutdown signal 781 - while not shutdown_event.is_set(): 782 - shutdown_event.wait(timeout=1.0) 783 - 784 - except KeyboardInterrupt: 785 - pass 786 - finally: 787 - logger.info("Shutting down...") 788 - service.stop() 789 - 790 - 791 - if __name__ == "__main__": 792 - main()
-2
sol.py
··· 54 54 "transcribe": "observe.transcribe", 55 55 "describe": "observe.describe", 56 56 "sense": "observe.sense", 57 - "sync": "observe.sync", 58 57 "transfer": "observe.transfer", 59 58 "observer": "observe.observer_cli", 60 59 # AI agents (talent package) ··· 107 106 "transcribe", 108 107 "describe", 109 108 "sense", 110 - "sync", 111 109 "transfer", 112 110 "observer", 113 111 ],
-46
tests/test_supervisor.py
··· 110 110 assert stderr == subprocess.PIPE 111 111 112 112 113 - def test_start_sync(tmp_path, mock_callosum, monkeypatch): 114 - """Test that start_sync() launches sol sync with remote URL.""" 115 - mod = importlib.import_module("think.supervisor") 116 - 117 - started = [] 118 - 119 - class DummyProc: 120 - def __init__(self): 121 - self.stdout = io.StringIO() 122 - self.stderr = io.StringIO() 123 - self.pid = 12345 124 - 125 - def terminate(self): 126 - pass 127 - 128 - def wait(self, timeout=None): 129 - pass 130 - 131 - def fake_popen( 132 - cmd, 133 - stdout=None, 134 - stderr=None, 135 - text=False, 136 - bufsize=-1, 137 - start_new_session=False, 138 - env=None, 139 - ): 140 - proc = DummyProc() 141 - started.append((cmd, stdout, stderr)) 142 - return proc 143 - 144 - monkeypatch.setattr(mod.subprocess, "Popen", fake_popen) 145 - monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 146 - 147 - # Test start_sync() 148 - remote_url = "https://server:5000/app/observer/ingest/abc123" 149 - sync_proc = mod.start_sync(remote_url) 150 - assert sync_proc is not None 151 - 152 - # Verify the command includes --remote with the URL 153 - sync_cmds = [cmd for cmd, _, _ in started if "sync" in cmd] 154 - assert len(sync_cmds) == 1 155 - cmd = sync_cmds[0] 156 - assert cmd == ["sol", "sync", "-v", "--remote", remote_url] 157 - 158 - 159 113 def test_parse_args_remote_flag(): 160 114 """Test that parse_args includes --remote flag.""" 161 115 mod = importlib.reload(importlib.import_module("think.supervisor"))
-1018
tests/test_sync.py
··· 1 - # SPDX-License-Identifier: AGPL-3.0-only 2 - # Copyright (c) 2026 sol pbc 3 - 4 - """Tests for observe/sync.py - sync service for remote uploads.""" 5 - 6 - import signal 7 - import threading 8 - import time 9 - from unittest.mock import MagicMock, patch 10 - 11 - import pytest 12 - import requests 13 - 14 - 15 - @pytest.fixture 16 - def sync_journal(tmp_path): 17 - """Create a temporary journal structure for sync tests. 18 - 19 - Returns a dict with 'path' and 'day' keys. 20 - """ 21 - from datetime import datetime 22 - 23 - journal = tmp_path / "journal" 24 - journal.mkdir() 25 - 26 - # Use today's date so get_pending_segments finds it within days_back 27 - day = datetime.now().strftime("%Y%m%d") 28 - day_dir = journal / day 29 - day_dir.mkdir() 30 - 31 - # Create segment with files under default stream 32 - segment = "120000_300" 33 - stream_dir = day_dir / "default" 34 - stream_dir.mkdir() 35 - segment_dir = stream_dir / segment 36 - segment_dir.mkdir() 37 - 38 - audio_file = segment_dir / "audio.flac" 39 - audio_file.write_bytes(b"audio data for testing") 40 - 41 - video_file = segment_dir / "screen.webm" 42 - video_file.write_bytes(b"video data for testing") 43 - 44 - # Create health directory 45 - health_dir = day_dir / "health" 46 - health_dir.mkdir() 47 - 48 - return {"path": journal, "day": day} 49 - 50 - 51 - def test_compute_file_sha256(sync_journal): 52 - """Test SHA256 computation.""" 53 - from observe.utils import compute_file_sha256 54 - 55 - journal = sync_journal["path"] 56 - day = sync_journal["day"] 57 - test_file = journal / day / "default" / "120000_300" / "audio.flac" 58 - sha = compute_file_sha256(test_file) 59 - 60 - # Just verify it's a valid SHA256 hex string 61 - assert len(sha) == 64 62 - assert all(c in "0123456789abcdef" for c in sha) 63 - 64 - 65 - def test_get_sync_state_path(sync_journal, monkeypatch): 66 - """Test sync state path generation.""" 67 - from observe.sync import get_sync_state_path 68 - 69 - journal = sync_journal["path"] 70 - day = sync_journal["day"] 71 - monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 72 - 73 - path = get_sync_state_path(day) 74 - assert path == journal / day / "health" / "sync.jsonl" 75 - 76 - 77 - def test_append_and_load_sync_state(sync_journal, monkeypatch): 78 - """Test appending and loading sync state records.""" 79 - from observe.sync import append_sync_record, load_sync_state 80 - 81 - journal = sync_journal["path"] 82 - day = sync_journal["day"] 83 - monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 84 - 85 - # Initially empty 86 - records = load_sync_state(day) 87 - assert records == [] 88 - 89 - # Append a pending record 90 - record1 = { 91 - "ts": 1234567890000, 92 - "segment": "120000_300", 93 - "status": "pending", 94 - "files": [{"name": "audio.flac", "sha256": "abc123"}], 95 - } 96 - append_sync_record(day, record1) 97 - 98 - # Append a confirmed record 99 - record2 = { 100 - "ts": 1234567891000, 101 - "segment": "120000_300", 102 - "status": "confirmed", 103 - } 104 - append_sync_record(day, record2) 105 - 106 - # Load and verify 107 - records = load_sync_state(day) 108 - assert len(records) == 2 109 - assert records[0]["status"] == "pending" 110 - assert records[1]["status"] == "confirmed" 111 - 112 - 113 - def test_get_pending_segments(sync_journal, monkeypatch): 114 - """Test scanning for pending segments.""" 115 - from observe.sync import append_sync_record, get_pending_segments 116 - 117 - journal = sync_journal["path"] 118 - day = sync_journal["day"] 119 - monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 120 - 121 - # Add pending segment 122 - append_sync_record( 123 - day, 124 - { 125 - "ts": 1234567890000, 126 - "segment": "120000_300", 127 - "status": "pending", 128 - "files": [{"name": "audio.flac", "sha256": "abc123"}], 129 - }, 130 - ) 131 - 132 - # Add another pending segment 133 - segment2_dir = journal / day / "default" / "130000_300" 134 - segment2_dir.mkdir(parents=True) 135 - append_sync_record( 136 - day, 137 - { 138 - "ts": 1234567890001, 139 - "segment": "130000_300", 140 - "status": "pending", 141 - "files": [{"name": "audio.flac", "sha256": "def456"}], 142 - }, 143 - ) 144 - 145 - # Add a confirmed segment (should not be returned) 146 - append_sync_record( 147 - day, 148 - { 149 - "ts": 1234567890002, 150 - "segment": "140000_300", 151 - "status": "pending", 152 - "files": [{"name": "audio.flac", "sha256": "ghi789"}], 153 - }, 154 - ) 155 - append_sync_record( 156 - day, 157 - { 158 - "ts": 1234567890003, 159 - "segment": "140000_300", 160 - "status": "confirmed", 161 - }, 162 - ) 163 - 164 - # Get pending 165 - pending = get_pending_segments(days_back=7) 166 - 167 - assert len(pending) == 2 168 - segments = {p.segment for p in pending} 169 - assert "120000_300" in segments 170 - assert "130000_300" in segments 171 - assert "140000_300" not in segments # Already confirmed 172 - 173 - 174 - def test_get_pending_segments_empty(sync_journal, monkeypatch): 175 - """Test scanning when no pending segments exist.""" 176 - from observe.sync import get_pending_segments 177 - 178 - journal = sync_journal["path"] 179 - monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 180 - 181 - pending = get_pending_segments(days_back=7) 182 - assert pending == [] 183 - 184 - 185 - class TestSyncService: 186 - """Tests for SyncService class.""" 187 - 188 - @pytest.fixture 189 - def mock_remote_client(self): 190 - """Create a mock RemoteClient.""" 191 - with patch("observe.sync.RemoteClient") as mock: 192 - client = MagicMock() 193 - client.session = MagicMock() 194 - mock.return_value = client 195 - yield client 196 - 197 - @pytest.fixture 198 - def mock_callosum(self): 199 - """Create a mock CallosumConnection.""" 200 - with patch("observe.sync.CallosumConnection") as mock: 201 - conn = MagicMock() 202 - mock.return_value = conn 203 - yield conn 204 - 205 - def test_sync_service_init(self, sync_journal, monkeypatch): 206 - """Test SyncService initialization.""" 207 - from observe.sync import SyncService 208 - 209 - journal = sync_journal["path"] 210 - monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 211 - 212 - service = SyncService( 213 - remote_url="https://server/ingest/key", 214 - days_back=7, 215 - ) 216 - 217 - assert service.remote_url == "https://server/ingest/key" 218 - assert service.days_back == 7 219 - 220 - def test_check_confirmation_success( 221 - self, sync_journal, monkeypatch, mock_remote_client, mock_callosum 222 - ): 223 - """Test successful sha256 confirmation check.""" 224 - from observe.sync import SyncService 225 - 226 - journal = sync_journal["path"] 227 - day = sync_journal["day"] 228 - monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 229 - 230 - service = SyncService("https://server/ingest/key") 231 - service._client = mock_remote_client 232 - 233 - # Mock segments endpoint response 234 - mock_response = MagicMock() 235 - mock_response.status_code = 200 236 - mock_response.json.return_value = [ 237 - { 238 - "key": "120000_300", 239 - "files": [ 240 - {"name": "audio.flac", "sha256": "abc123", "size": 100}, 241 - {"name": "screen.webm", "sha256": "def456", "size": 200}, 242 - ], 243 - } 244 - ] 245 - mock_remote_client.session.get.return_value = mock_response 246 - 247 - # Check with matching sha256s 248 - result = service._check_confirmation( 249 - day, 250 - "120000_300", 251 - {"audio.flac": "abc123", "screen.webm": "def456"}, 252 - ) 253 - 254 - assert result is True 255 - mock_remote_client.session.get.assert_called_once() 256 - 257 - def test_check_confirmation_mismatch( 258 - self, sync_journal, monkeypatch, mock_remote_client, mock_callosum 259 - ): 260 - """Test sha256 mismatch returns False.""" 261 - from observe.sync import SyncService 262 - 263 - journal = sync_journal["path"] 264 - day = sync_journal["day"] 265 - monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 266 - 267 - service = SyncService("https://server/ingest/key") 268 - service._client = mock_remote_client 269 - 270 - # Mock response with wrong sha256 271 - mock_response = MagicMock() 272 - mock_response.status_code = 200 273 - mock_response.json.return_value = [ 274 - { 275 - "key": "120000_300", 276 - "files": [ 277 - {"name": "audio.flac", "sha256": "wrong_hash", "size": 100}, 278 - ], 279 - } 280 - ] 281 - mock_remote_client.session.get.return_value = mock_response 282 - 283 - result = service._check_confirmation( 284 - day, 285 - "120000_300", 286 - {"audio.flac": "abc123"}, 287 - ) 288 - 289 - assert result is False 290 - 291 - def test_check_confirmation_segment_not_found( 292 - self, sync_journal, monkeypatch, mock_remote_client, mock_callosum 293 - ): 294 - """Test segment not in response returns False.""" 295 - from observe.sync import SyncService 296 - 297 - journal = sync_journal["path"] 298 - day = sync_journal["day"] 299 - monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 300 - 301 - service = SyncService("https://server/ingest/key") 302 - service._client = mock_remote_client 303 - 304 - # Mock empty response 305 - mock_response = MagicMock() 306 - mock_response.status_code = 200 307 - mock_response.json.return_value = [] 308 - mock_remote_client.session.get.return_value = mock_response 309 - 310 - result = service._check_confirmation( 311 - day, 312 - "120000_300", 313 - {"audio.flac": "abc123"}, 314 - ) 315 - 316 - assert result is False 317 - 318 - def test_cleanup_segment(self, sync_journal, monkeypatch): 319 - """Test segment cleanup deletes files.""" 320 - from observe.sync import SyncService 321 - 322 - journal = sync_journal["path"] 323 - day = sync_journal["day"] 324 - monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 325 - 326 - service = SyncService("https://server/ingest/key") 327 - 328 - segment_dir = journal / day / "default" / "120000_300" 329 - audio_file = segment_dir / "audio.flac" 330 - video_file = segment_dir / "screen.webm" 331 - 332 - # Verify files exist 333 - assert audio_file.exists() 334 - assert video_file.exists() 335 - 336 - # Cleanup 337 - service._cleanup_segment(segment_dir, [audio_file, video_file]) 338 - 339 - # Files should be deleted 340 - assert not audio_file.exists() 341 - assert not video_file.exists() 342 - # Directory should be removed if empty 343 - assert not segment_dir.exists() 344 - 345 - def test_handle_observing_message( 346 - self, sync_journal, monkeypatch, mock_remote_client, mock_callosum 347 - ): 348 - """Test handling observe.observing message.""" 349 - from observe.sync import SyncService, load_sync_state 350 - 351 - journal = sync_journal["path"] 352 - day = sync_journal["day"] 353 - monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 354 - 355 - service = SyncService("https://server/ingest/key") 356 - service._callosum = mock_callosum 357 - 358 - # Simulate observing message with metadata 359 - message = { 360 - "tract": "observe", 361 - "event": "observing", 362 - "day": day, 363 - "segment": "120000_300", 364 - "files": ["audio.flac", "screen.webm"], 365 - "host": "testhost", 366 - "platform": "linux", 367 - "stream": "default", 368 - "meta": {"facet": "work"}, 369 - } 370 - 371 - service._handle_message(message) 372 - 373 - # Check pending record was written with metadata 374 - records = load_sync_state(day) 375 - assert len(records) == 1 376 - assert records[0]["status"] == "pending" 377 - assert records[0]["segment"] == "120000_300" 378 - assert len(records[0]["files"]) == 2 379 - # Verify metadata was extracted and merged 380 - assert records[0]["meta"]["host"] == "testhost" 381 - assert records[0]["meta"]["platform"] == "linux" 382 - assert records[0]["meta"]["facet"] == "work" 383 - 384 - # Check segment was queued with metadata 385 - assert service._queue.qsize() == 1 386 - seg_info = service._queue.get_nowait() 387 - assert seg_info.meta["host"] == "testhost" 388 - assert seg_info.meta["facet"] == "work" 389 - 390 - 391 - def test_sync_service_startup_with_pending(sync_journal, monkeypatch): 392 - """Test that startup loads pending segments into the queue with metadata.""" 393 - from observe.sync import SyncService, append_sync_record 394 - 395 - journal = sync_journal["path"] 396 - day = sync_journal["day"] 397 - monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 398 - 399 - # Add pending segment with metadata 400 - append_sync_record( 401 - day, 402 - { 403 - "ts": 1234567890000, 404 - "segment": "120000_300", 405 - "status": "pending", 406 - "files": [{"name": "audio.flac", "sha256": "abc123"}], 407 - "meta": {"host": "remote-host", "platform": "darwin"}, 408 - }, 409 - ) 410 - 411 - with patch("observe.sync.RemoteClient"), patch("observe.sync.CallosumConnection"): 412 - service = SyncService("https://server/ingest/key") 413 - # Replace worker with no-op so thread exits immediately 414 - service._sync_worker = lambda: None 415 - service.start() 416 - 417 - # Pending segment should have been queued with metadata 418 - assert service._queue.qsize() == 1 419 - seg_info = service._queue.get_nowait() 420 - assert seg_info.segment == "120000_300" 421 - assert seg_info.day == day 422 - assert seg_info.meta["host"] == "remote-host" 423 - assert seg_info.meta["platform"] == "darwin" 424 - 425 - service.stop() 426 - 427 - 428 - def test_process_segment_skips_upload_if_already_confirmed(sync_journal, monkeypatch): 429 - """Test that segment already on server is skipped without upload.""" 430 - from observe.sync import SegmentInfo, SyncService, UploadResult 431 - 432 - journal = sync_journal["path"] 433 - day = sync_journal["day"] 434 - monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 435 - 436 - # Create SegmentInfo 437 - seg_info = SegmentInfo( 438 - day=day, 439 - segment="120000_300", 440 - files=[ 441 - {"name": "audio.flac", "sha256": "abc123"}, 442 - {"name": "screen.webm", "sha256": "def456"}, 443 - ], 444 - meta={"stream": "default"}, 445 - ) 446 - 447 - with patch("observe.sync.CallosumConnection") as mock_callosum_class: 448 - mock_callosum = MagicMock() 449 - mock_callosum_class.return_value = mock_callosum 450 - 451 - with patch("observe.sync.RemoteClient") as mock_client_class: 452 - mock_client = MagicMock() 453 - mock_session = MagicMock() 454 - 455 - # Simulate server already has the segment with matching SHA256 456 - server_response = MagicMock() 457 - server_response.status_code = 200 458 - server_response.json.return_value = [ 459 - { 460 - "key": "120000_300", 461 - "files": [ 462 - {"name": "audio.flac", "sha256": "abc123"}, 463 - {"name": "screen.webm", "sha256": "def456"}, 464 - ], 465 - } 466 - ] 467 - mock_session.get.return_value = server_response 468 - mock_client.session = mock_session 469 - mock_client.upload_segment = MagicMock(return_value=UploadResult(True)) 470 - mock_client_class.return_value = mock_client 471 - 472 - service = SyncService("https://server/ingest/key") 473 - 474 - # Call _process_segment directly (internal method) 475 - service._process_segment(seg_info) 476 - 477 - # Upload should NOT have been called (already confirmed) 478 - mock_client.upload_segment.assert_not_called() 479 - 480 - 481 - def test_process_segment_uploads_if_not_on_server(sync_journal, monkeypatch): 482 - """Test that segment not on server is uploaded.""" 483 - from observe.sync import SegmentInfo, SyncService, UploadResult 484 - 485 - journal = sync_journal["path"] 486 - day = sync_journal["day"] 487 - monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 488 - 489 - seg_info = SegmentInfo( 490 - day=day, 491 - segment="120000_300", 492 - files=[ 493 - {"name": "audio.flac", "sha256": "abc123"}, 494 - ], 495 - meta={"stream": "default"}, 496 - ) 497 - 498 - with patch("observe.sync.CallosumConnection") as mock_callosum_class: 499 - mock_callosum = MagicMock() 500 - mock_callosum_class.return_value = mock_callosum 501 - 502 - with patch("observe.sync.RemoteClient") as mock_client_class: 503 - mock_client = MagicMock() 504 - mock_session = MagicMock() 505 - 506 - # First call: server doesn't have segment (pre-check) 507 - # Second call: server has segment (post-upload confirm) 508 - responses = [ 509 - MagicMock(status_code=200, json=MagicMock(return_value=[])), 510 - MagicMock( 511 - status_code=200, 512 - json=MagicMock( 513 - return_value=[ 514 - { 515 - "key": "120000_300", 516 - "files": [{"name": "audio.flac", "sha256": "abc123"}], 517 - } 518 - ] 519 - ), 520 - ), 521 - ] 522 - mock_session.get.side_effect = responses 523 - mock_client.session = mock_session 524 - mock_client.upload_segment = MagicMock(return_value=UploadResult(True)) 525 - mock_client_class.return_value = mock_client 526 - 527 - service = SyncService("https://server/ingest/key") 528 - service._process_segment(seg_info) 529 - 530 - # Upload SHOULD have been called 531 - mock_client.upload_segment.assert_called_once() 532 - 533 - 534 - def test_process_segment_passes_metadata_to_upload(sync_journal, monkeypatch): 535 - """Test that metadata is passed through to upload_segment call.""" 536 - from observe.sync import SegmentInfo, SyncService, UploadResult 537 - 538 - journal = sync_journal["path"] 539 - day = sync_journal["day"] 540 - monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 541 - 542 - # Create SegmentInfo with metadata 543 - seg_info = SegmentInfo( 544 - day=day, 545 - segment="120000_300", 546 - files=[ 547 - {"name": "audio.flac", "sha256": "abc123"}, 548 - ], 549 - meta={ 550 - "host": "laptop", 551 - "platform": "linux", 552 - "facet": "meetings", 553 - "stream": "default", 554 - }, 555 - ) 556 - 557 - with patch("observe.sync.CallosumConnection") as mock_callosum_class: 558 - mock_callosum = MagicMock() 559 - mock_callosum_class.return_value = mock_callosum 560 - 561 - with patch("observe.sync.RemoteClient") as mock_client_class: 562 - mock_client = MagicMock() 563 - mock_session = MagicMock() 564 - 565 - # First call: server doesn't have segment (pre-check) 566 - # Second call: server has segment (post-upload confirm) 567 - responses = [ 568 - MagicMock(status_code=200, json=MagicMock(return_value=[])), 569 - MagicMock( 570 - status_code=200, 571 - json=MagicMock( 572 - return_value=[ 573 - { 574 - "key": "120000_300", 575 - "files": [{"name": "audio.flac", "sha256": "abc123"}], 576 - } 577 - ] 578 - ), 579 - ), 580 - ] 581 - mock_session.get.side_effect = responses 582 - mock_client.session = mock_session 583 - mock_client.upload_segment = MagicMock(return_value=UploadResult(True)) 584 - mock_client_class.return_value = mock_client 585 - 586 - service = SyncService("https://server/ingest/key") 587 - service._process_segment(seg_info) 588 - 589 - # Verify upload was called with metadata 590 - mock_client.upload_segment.assert_called_once() 591 - call_kwargs = mock_client.upload_segment.call_args.kwargs 592 - assert call_kwargs["meta"] == { 593 - "host": "laptop", 594 - "platform": "linux", 595 - "facet": "meetings", 596 - "stream": "default", 597 - } 598 - 599 - 600 - def test_handle_message_skips_zero_byte_files(sync_journal, monkeypatch): 601 - """Test that 0-byte files are skipped during message handling.""" 602 - from observe.sync import SyncService, load_sync_state 603 - 604 - journal = sync_journal["path"] 605 - day = sync_journal["day"] 606 - monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 607 - 608 - # Create segment directory with mixed files 609 - seg_dir = journal / day / "default" / "120000_300" 610 - seg_dir.mkdir(parents=True, exist_ok=True) 611 - (seg_dir / "audio.flac").write_bytes(b"real audio data") 612 - (seg_dir / "screen.webm").write_bytes(b"") # 0-byte 613 - 614 - with patch("observe.sync.RemoteClient"), patch("observe.sync.CallosumConnection"): 615 - service = SyncService("https://server/ingest/key") 616 - 617 - message = { 618 - "tract": "observe", 619 - "event": "observing", 620 - "day": day, 621 - "segment": "120000_300", 622 - "files": ["audio.flac", "screen.webm"], 623 - "stream": "default", 624 - } 625 - service._handle_message(message) 626 - 627 - # Only valid file should be queued 628 - assert service._queue.qsize() == 1 629 - seg_info = service._queue.get_nowait() 630 - assert len(seg_info.files) == 1 631 - assert seg_info.files[0]["name"] == "audio.flac" 632 - 633 - # Pending record should only have 1 file 634 - records = load_sync_state(day) 635 - assert len(records) == 1 636 - assert len(records[0]["files"]) == 1 637 - 638 - 639 - def test_handle_message_skips_all_zero_byte_files(sync_journal, monkeypatch): 640 - """Test that segment is not queued when all files are 0-byte.""" 641 - from observe.sync import SyncService, load_sync_state 642 - 643 - journal = sync_journal["path"] 644 - day = sync_journal["day"] 645 - monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 646 - 647 - # Create segment directory with only 0-byte files 648 - seg_dir = journal / day / "default" / "120000_300" 649 - seg_dir.mkdir(parents=True, exist_ok=True) 650 - (seg_dir / "audio.flac").write_bytes(b"") 651 - (seg_dir / "screen.webm").write_bytes(b"") 652 - 653 - with patch("observe.sync.RemoteClient"), patch("observe.sync.CallosumConnection"): 654 - service = SyncService("https://server/ingest/key") 655 - 656 - message = { 657 - "tract": "observe", 658 - "event": "observing", 659 - "day": day, 660 - "segment": "120000_300", 661 - "files": ["audio.flac", "screen.webm"], 662 - "stream": "default", 663 - } 664 - service._handle_message(message) 665 - 666 - # No segment should be queued 667 - assert service._queue.qsize() == 0 668 - 669 - # No pending record should be written 670 - records = load_sync_state(day) 671 - assert len(records) == 0 672 - 673 - 674 - def test_process_segment_duplicate_skips_confirmation(sync_journal, monkeypatch): 675 - """Test that duplicate upload skips confirmation polling.""" 676 - from observe.sync import SegmentInfo, SyncService, UploadResult, load_sync_state 677 - 678 - journal = sync_journal["path"] 679 - day = sync_journal["day"] 680 - monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 681 - 682 - seg_info = SegmentInfo( 683 - day=day, 684 - segment="120000_300", 685 - files=[ 686 - {"name": "audio.flac", "sha256": "abc123"}, 687 - ], 688 - meta={"stream": "default"}, 689 - ) 690 - 691 - # Create the segment file so cleanup has something to work with 692 - seg_dir = journal / day / "default" / "120000_300" 693 - seg_dir.mkdir(parents=True, exist_ok=True) 694 - (seg_dir / "audio.flac").write_bytes(b"audio data") 695 - 696 - with patch("observe.sync.CallosumConnection") as mock_callosum_class: 697 - mock_callosum = MagicMock() 698 - mock_callosum_class.return_value = mock_callosum 699 - 700 - with patch("observe.sync.RemoteClient") as mock_client_class: 701 - mock_client = MagicMock() 702 - mock_session = MagicMock() 703 - 704 - # Pre-check: server doesn't have segment yet 705 - mock_session.get.return_value = MagicMock( 706 - status_code=200, json=MagicMock(return_value=[]) 707 - ) 708 - mock_client.session = mock_session 709 - # Upload returns duplicate 710 - mock_client.upload_segment = MagicMock( 711 - return_value=UploadResult(True, duplicate=True) 712 - ) 713 - mock_client_class.return_value = mock_client 714 - 715 - service = SyncService("https://server/ingest/key") 716 - service._process_segment(seg_info) 717 - 718 - # Upload should have been called 719 - mock_client.upload_segment.assert_called_once() 720 - 721 - # Confirmation polling should NOT have happened 722 - # (only 1 GET call for pre-check, no additional confirmation GETs) 723 - assert mock_session.get.call_count == 1 724 - 725 - # Confirmed record should have been written 726 - records = load_sync_state(day) 727 - confirmed = [r for r in records if r.get("status") == "confirmed"] 728 - assert len(confirmed) == 1 729 - assert confirmed[0]["segment"] == "120000_300" 730 - 731 - 732 - def test_sync_service_stop_drains_queue(sync_journal, monkeypatch): 733 - """stop() should drain queued segments before returning.""" 734 - from observe.sync import SegmentInfo, SyncService 735 - 736 - journal = sync_journal["path"] 737 - day = sync_journal["day"] 738 - monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 739 - 740 - with ( 741 - patch("observe.sync.RemoteClient"), 742 - patch("observe.sync.CallosumConnection"), 743 - ): 744 - service = SyncService("https://server/ingest/key") 745 - # Prevent start() from running the real worker 746 - real_worker = service._sync_worker 747 - service._sync_worker = lambda: None 748 - service.start() 749 - 750 - assert service._worker_thread is not None 751 - service._worker_thread.join(timeout=1.0) 752 - assert not service._worker_thread.is_alive() 753 - 754 - service._queue.put( 755 - SegmentInfo( 756 - day=day, segment="120000_301", files=[{"name": "audio.flac"}], meta={} 757 - ) 758 - ) 759 - service._queue.put( 760 - SegmentInfo( 761 - day=day, segment="120000_302", files=[{"name": "audio.flac"}], meta={} 762 - ) 763 - ) 764 - 765 - service._sync_worker = real_worker 766 - service._stop_event.clear() 767 - 768 - with patch.object(service, "_process_segment") as mock_process: 769 - mock_process.return_value = None 770 - 771 - service._worker_thread = threading.Thread( 772 - target=service._sync_worker, daemon=False 773 - ) 774 - service._worker_thread.start() 775 - 776 - service.stop() 777 - 778 - assert mock_process.call_count == 2 779 - service._worker_thread.join(timeout=1.0) 780 - 781 - 782 - def test_sync_service_stop_disconnects_callosum_first(sync_journal, monkeypatch): 783 - """stop() should stop callosum before setting the stop event.""" 784 - from observe.sync import SyncService 785 - 786 - journal = sync_journal["path"] 787 - monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 788 - 789 - mock_callosum = MagicMock() 790 - shutdown_order: list[str] = [] 791 - service = SyncService("https://server/ingest/key") 792 - service._callosum = mock_callosum 793 - service._worker_thread = None 794 - service._stop_event = MagicMock() 795 - 796 - service._callosum.stop.side_effect = lambda: shutdown_order.append("callosum") 797 - service._stop_event.set.side_effect = lambda: shutdown_order.append("stop_event") 798 - 799 - service.stop() 800 - 801 - assert shutdown_order == ["callosum", "stop_event"] 802 - 803 - 804 - def test_sync_service_worker_thread_not_daemon(sync_journal, monkeypatch): 805 - """Worker thread should be non-daemon.""" 806 - from observe.sync import SyncService 807 - 808 - journal = sync_journal["path"] 809 - monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 810 - 811 - with ( 812 - patch("observe.sync.get_pending_segments", return_value=[]), 813 - patch("observe.sync.RemoteClient"), 814 - patch("observe.sync.CallosumConnection"), 815 - ): 816 - service = SyncService("https://server/ingest/key") 817 - service.start() 818 - 819 - assert service._worker_thread is not None 820 - assert service._worker_thread.daemon is False 821 - 822 - service.stop() 823 - 824 - 825 - def test_sync_service_drain_completes_current_segment(sync_journal, monkeypatch): 826 - """Current segment should finish when stop_event is set and draining is active.""" 827 - from observe.sync import SegmentInfo, SyncService, UploadResult 828 - 829 - journal = sync_journal["path"] 830 - day = sync_journal["day"] 831 - monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 832 - 833 - service = SyncService("https://server/ingest/key") 834 - service._client.upload_segment = MagicMock(return_value=UploadResult(True)) 835 - service._check_confirmation = MagicMock(side_effect=[False, True]) 836 - 837 - segment_info = SegmentInfo( 838 - day=day, 839 - segment="120000_300", 840 - files=[{"name": "audio.flac", "sha256": "abc123"}], 841 - meta={"stream": "default"}, 842 - ) 843 - service._queue.put(segment_info) 844 - 845 - service._draining = True 846 - service._stop_event.set() 847 - 848 - def upload_side_effect(*_args, **_kwargs): 849 - service._stop_event.set() 850 - return UploadResult(True) 851 - 852 - service._client.upload_segment.side_effect = upload_side_effect 853 - 854 - service._process_segment(service._queue.get_nowait()) 855 - 856 - assert service._client.upload_segment.call_count == 1 857 - assert not (journal / day / "default" / "120000_300" / "audio.flac").exists() 858 - 859 - 860 - def test_main_sigterm_triggers_stop(monkeypatch): 861 - """main() should stop the service on SIGTERM.""" 862 - import argparse 863 - 864 - import observe.sync 865 - 866 - handlers: dict[int, object] = {} 867 - 868 - def capture_signal(signum, handler): 869 - handlers[signum] = handler 870 - return handler 871 - 872 - args = argparse.Namespace(remote="https://server/ingest/key", days_back=7) 873 - 874 - with ( 875 - patch("observe.sync.setup_cli", return_value=args), 876 - patch("observe.sync.SyncService") as mock_service, 877 - patch("observe.sync.signal.signal", side_effect=capture_signal), 878 - ): 879 - service_instance = MagicMock() 880 - mock_service.return_value = service_instance 881 - 882 - thread = threading.Thread(target=observe.sync.main) 883 - thread.start() 884 - 885 - deadline = time.time() + 1.0 886 - while signal.SIGTERM not in handlers and time.time() < deadline: 887 - time.sleep(0.01) 888 - assert signal.SIGTERM in handlers 889 - assert signal.SIGINT in handlers 890 - 891 - handlers[signal.SIGTERM](signal.SIGTERM, None) 892 - thread.join(timeout=1.0) 893 - 894 - assert thread.is_alive() is False 895 - service_instance.start.assert_called_once() 896 - service_instance.stop.assert_called_once() 897 - 898 - 899 - class TestCheckRemoteHealth: 900 - """Tests for check_remote_health() function.""" 901 - 902 - def test_health_check_success(self): 903 - """Test successful health check returns True with connection info.""" 904 - from observe.sync import check_remote_health 905 - 906 - with patch("observe.sync.requests.get") as mock_get: 907 - mock_response = MagicMock() 908 - mock_response.status_code = 200 909 - mock_get.return_value = mock_response 910 - 911 - success, message = check_remote_health( 912 - "http://server.local:8000/app/observer/ingest/abc12345xyz" 913 - ) 914 - 915 - assert success is True 916 - assert "server.local:8000" in message 917 - assert "abc12345" in message # Key prefix 918 - 919 - def test_health_check_invalid_key(self): 920 - """Test 401 response returns False with appropriate message.""" 921 - from observe.sync import check_remote_health 922 - 923 - with patch("observe.sync.requests.get") as mock_get: 924 - mock_response = MagicMock() 925 - mock_response.status_code = 401 926 - mock_get.return_value = mock_response 927 - 928 - success, message = check_remote_health( 929 - "http://server.local:8000/app/observer/ingest/badkey" 930 - ) 931 - 932 - assert success is False 933 - assert "401" in message or "Invalid key" in message 934 - 935 - def test_health_check_revoked_key(self): 936 - """Test 403 response returns False with error details.""" 937 - from observe.sync import check_remote_health 938 - 939 - with patch("observe.sync.requests.get") as mock_get: 940 - mock_response = MagicMock() 941 - mock_response.status_code = 403 942 - mock_response.text = '{"error": "Remote revoked"}' 943 - mock_response.json.return_value = {"error": "Remote revoked"} 944 - mock_get.return_value = mock_response 945 - 946 - success, message = check_remote_health( 947 - "http://server.local:8000/app/observer/ingest/revokedkey" 948 - ) 949 - 950 - assert success is False 951 - assert "403" in message or "revoked" in message.lower() 952 - 953 - def test_health_check_403_non_json_body(self): 954 - """Test 403 with non-JSON body doesn't crash.""" 955 - from observe.sync import check_remote_health 956 - 957 - with patch("observe.sync.requests.get") as mock_get: 958 - mock_response = MagicMock() 959 - mock_response.status_code = 403 960 - mock_response.text = "Forbidden" 961 - mock_response.json.side_effect = requests.exceptions.JSONDecodeError( 962 - "", "", 0 963 - ) 964 - mock_get.return_value = mock_response 965 - 966 - success, message = check_remote_health( 967 - "http://server.local:8000/app/observer/ingest/badkey" 968 - ) 969 - 970 - assert success is False 971 - assert "403" in message 972 - 973 - def test_health_check_connection_refused(self): 974 - """Test connection refused returns False with clear message.""" 975 - from observe.sync import check_remote_health 976 - 977 - with patch("observe.sync.requests.get") as mock_get: 978 - mock_get.side_effect = requests.exceptions.ConnectionError( 979 - "Connection refused" 980 - ) 981 - 982 - success, message = check_remote_health( 983 - "http://server.local:8000/app/observer/ingest/key123" 984 - ) 985 - 986 - assert success is False 987 - assert "refused" in message.lower() or "connection" in message.lower() 988 - 989 - def test_health_check_timeout(self): 990 - """Test timeout returns False with timeout message.""" 991 - from observe.sync import check_remote_health 992 - 993 - with patch("observe.sync.requests.get") as mock_get: 994 - mock_get.side_effect = requests.exceptions.Timeout("timed out") 995 - 996 - success, message = check_remote_health( 997 - "http://server.local:8000/app/observer/ingest/key123", 998 - timeout=5.0, 999 - ) 1000 - 1001 - assert success is False 1002 - assert "timeout" in message.lower() 1003 - 1004 - def test_health_check_host_not_found(self): 1005 - """Test DNS failure returns False with host not found message.""" 1006 - from observe.sync import check_remote_health 1007 - 1008 - with patch("observe.sync.requests.get") as mock_get: 1009 - mock_get.side_effect = requests.exceptions.ConnectionError( 1010 - "Name or service not known" 1011 - ) 1012 - 1013 - success, message = check_remote_health( 1014 - "http://nonexistent.invalid/app/observer/ingest/key123" 1015 - ) 1016 - 1017 - assert success is False 1018 - assert "not found" in message.lower() or "connection" in message.lower()
+3 -28
think/supervisor.py
··· 19 19 20 20 from desktop_notifier import DesktopNotifier, Urgency 21 21 22 - from observe.sync import check_remote_health 23 22 from think import routines, scheduler 24 23 from think.callosum import CallosumConnection, CallosumServer 25 24 from think.runner import DailyLogWriter ··· 790 789 return _launch_process("sense", ["sol", "sense", "-v"], restart=True) 791 790 792 791 793 - def start_sync(remote_url: str) -> ManagedProcess: 794 - """Launch sol sync with output logging. 795 - 796 - Args: 797 - remote_url: Remote ingest URL for sync service 798 - """ 799 - managed = _launch_process( 800 - "sync", 801 - ["sol", "sync", "-v", "--remote", remote_url], 802 - restart=True, 803 - ) 804 - # Sync shutdown can block while draining pending segments. 805 - # Give it extra time so the supervisor does not cut it off early. 806 - managed.shutdown_timeout = 90 807 - return managed 808 - 809 - 810 792 def start_callosum_in_process() -> CallosumServer: 811 793 """Start Callosum message bus server in-process. 812 794 ··· 1358 1340 parser.add_argument( 1359 1341 "--remote", 1360 1342 type=str, 1361 - help="Remote mode: sync to server URL instead of local processing", 1343 + help="Remote mode: URL for segment transfer (not yet implemented)", 1362 1344 ) 1363 1345 return parser 1364 1346 ··· 1486 1468 1487 1469 # Now start other services (their startup events will be captured) 1488 1470 if _is_remote_mode: 1489 - # Remote mode: verify remote server is reachable before starting sync 1490 - logging.info("Remote mode: checking server connectivity...") 1491 - success, message = check_remote_health(args.remote) 1492 - if not success: 1493 - logging.error(f"Remote health check failed: {message}") 1494 - stop_callosum_in_process() 1495 - parser.error(f"Remote server not available: {message}") 1496 - logging.info(f"Remote server verified: {message}") 1497 - procs.append(start_sync(args.remote)) 1471 + # Remote mode: transfer send will be added here 1472 + pass 1498 1473 else: 1499 1474 # Local mode: convey first, then sense for file processing 1500 1475 if not args.no_convey: