personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

add ObserverClient, convert linux and tmux observers to HTTP upload

Phase 3 of observer decoupling: replace direct journal writes and
Callosum events with HTTP upload through a shared ObserverClient.

Both observers now upload segments to the Convey ingest server and
relay status events through the event endpoint. CallosumConnection,
update_stream(), and write_segment_stream() removed from both
observers — the server handles stream identity and event emission.

Includes formatting fixes for pre-existing issues in remote_cli.py
and gemini.py.

+680 -185
+349
apps/remote/tests/test_observer_client.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Tests for ObserverClient with mocked HTTP calls.""" 5 + 6 + from __future__ import annotations 7 + 8 + import json 9 + from unittest.mock import MagicMock, patch 10 + 11 + import pytest 12 + import requests 13 + 14 + 15 + @pytest.fixture 16 + def mock_session(): 17 + with patch("observe.remote_client.requests.Session") as mock: 18 + session = MagicMock() 19 + mock.return_value = session 20 + yield session 21 + 22 + 23 + @pytest.fixture 24 + def mock_config(): 25 + with patch("observe.remote_client.get_config") as mock: 26 + mock.return_value = {} 27 + yield mock 28 + 29 + 30 + @pytest.fixture 31 + def mock_journal(tmp_path): 32 + with patch("observe.remote_client.get_journal") as mock: 33 + mock.return_value = str(tmp_path) 34 + yield tmp_path 35 + 36 + 37 + def test_observer_client_init(mock_session, mock_config): 38 + from observe.remote_client import DEFAULT_URL, ObserverClient 39 + 40 + client = ObserverClient("main-stream") 41 + 42 + assert client._url == DEFAULT_URL 43 + assert client._key is None 44 + assert client._name == "main-stream" 45 + assert client._stream == "main-stream" 46 + assert client._auto_register is True 47 + 48 + 49 + def test_observer_client_init_with_config(mock_session, mock_config): 50 + from observe.remote_client import ObserverClient 51 + 52 + mock_config.return_value = { 53 + "observe": { 54 + "remote": { 55 + "url": "https://example.test/", 56 + "key": "abc123", 57 + "name": "named-observer", 58 + "auto_register": False, 59 + } 60 + } 61 + } 62 + 63 + client = ObserverClient("main-stream") 64 + 65 + assert client._url == "https://example.test" 66 + assert client._key == "abc123" 67 + assert client._name == "named-observer" 68 + assert client._auto_register is False 69 + 70 + 71 + def test_auto_registration(mock_session, mock_config, mock_journal, tmp_path): 72 + from observe.remote_client import ObserverClient 73 + 74 + file1 = tmp_path / "audio.flac" 75 + file1.write_bytes(b"audio") 76 + 77 + create_response = MagicMock() 78 + create_response.status_code = 200 79 + create_response.json.return_value = {"key": "registered-key"} 80 + 81 + upload_response = MagicMock() 82 + upload_response.status_code = 200 83 + upload_response.json.return_value = {"files": ["audio.flac"], "bytes": 5} 84 + 85 + mock_session.post.side_effect = [create_response, upload_response] 86 + 87 + client = ObserverClient("main-stream") 88 + result = client.upload_segment("20250103", "120000_300", [file1]) 89 + 90 + assert result.success is True 91 + assert client._key == "registered-key" 92 + assert mock_session.post.call_args_list[0][0][0].endswith("/app/remote/api/create") 93 + config = json.loads((mock_journal / "config" / "journal.json").read_text()) 94 + assert config["observe"]["remote"]["key"] == "registered-key" 95 + 96 + 97 + def test_existing_key_skips_registration(mock_session, mock_config, tmp_path): 98 + from observe.remote_client import ObserverClient 99 + 100 + mock_config.return_value = {"observe": {"remote": {"key": "testkey123"}}} 101 + 102 + file1 = tmp_path / "audio.flac" 103 + file1.write_bytes(b"audio") 104 + 105 + upload_response = MagicMock() 106 + upload_response.status_code = 200 107 + upload_response.json.return_value = {"files": ["audio.flac"], "bytes": 5} 108 + mock_session.post.return_value = upload_response 109 + 110 + client = ObserverClient("main-stream") 111 + result = client.upload_segment("20250103", "120000_300", [file1]) 112 + 113 + assert result.success is True 114 + assert mock_session.post.call_count == 1 115 + assert mock_session.post.call_args[0][0].endswith("/app/remote/ingest/testkey123") 116 + 117 + 118 + def test_registration_retry(mock_session, mock_config, mock_journal, tmp_path): 119 + from observe.remote_client import ObserverClient 120 + 121 + file1 = tmp_path / "audio.flac" 122 + file1.write_bytes(b"audio") 123 + 124 + create_response = MagicMock() 125 + create_response.status_code = 200 126 + create_response.json.return_value = {"key": "registered-key"} 127 + 128 + upload_response = MagicMock() 129 + upload_response.status_code = 200 130 + upload_response.json.return_value = {"files": ["audio.flac"], "bytes": 5} 131 + 132 + mock_session.post.side_effect = [ 133 + requests.ConnectionError("no route"), 134 + create_response, 135 + upload_response, 136 + ] 137 + 138 + with patch("observe.remote_client.time.sleep"): 139 + client = ObserverClient("main-stream") 140 + result = client.upload_segment("20250103", "120000_300", [file1]) 141 + 142 + assert result.success is True 143 + assert mock_session.post.call_count == 3 144 + 145 + 146 + def test_registration_403(mock_session, mock_config): 147 + from observe.remote_client import ObserverClient 148 + 149 + response = MagicMock() 150 + response.status_code = 403 151 + mock_session.post.return_value = response 152 + 153 + client = ObserverClient("main-stream") 154 + client._ensure_registered() 155 + 156 + assert client._revoked is True 157 + assert client._key is None 158 + 159 + 160 + def test_upload_segment_success(mock_session, mock_config, tmp_path): 161 + from observe.remote_client import ObserverClient 162 + 163 + mock_config.return_value = {"observe": {"remote": {"key": "testkey123"}}} 164 + 165 + file1 = tmp_path / "audio.flac" 166 + file1.write_bytes(b"audio data") 167 + 168 + mock_response = MagicMock() 169 + mock_response.status_code = 200 170 + mock_response.json.return_value = {"files": ["audio.flac"], "bytes": 10} 171 + mock_session.post.return_value = mock_response 172 + 173 + client = ObserverClient("main-stream") 174 + result = client.upload_segment("20250103", "120000_300", [file1]) 175 + 176 + assert result.success is True 177 + assert result.duplicate is False 178 + 179 + 180 + def test_upload_segment_retry(mock_session, mock_config, tmp_path): 181 + from observe.remote_client import ObserverClient 182 + 183 + mock_config.return_value = {"observe": {"remote": {"key": "testkey123"}}} 184 + 185 + file1 = tmp_path / "audio.flac" 186 + file1.write_bytes(b"audio data") 187 + 188 + failure = MagicMock() 189 + failure.status_code = 500 190 + failure.text = "Server error" 191 + 192 + success = MagicMock() 193 + success.status_code = 200 194 + success.json.return_value = {"files": ["audio.flac"], "bytes": 10} 195 + 196 + mock_session.post.side_effect = [failure, success] 197 + 198 + with patch("observe.remote_client.time.sleep"): 199 + client = ObserverClient("main-stream") 200 + result = client.upload_segment("20250103", "120000_300", [file1]) 201 + 202 + assert result.success is True 203 + assert mock_session.post.call_count == 2 204 + 205 + 206 + def test_upload_segment_403(mock_session, mock_config, tmp_path): 207 + from observe.remote_client import ObserverClient 208 + 209 + mock_config.return_value = {"observe": {"remote": {"key": "testkey123"}}} 210 + 211 + file1 = tmp_path / "audio.flac" 212 + file1.write_bytes(b"audio data") 213 + 214 + response = MagicMock() 215 + response.status_code = 403 216 + response.text = "Forbidden" 217 + mock_session.post.return_value = response 218 + 219 + client = ObserverClient("main-stream") 220 + result = client.upload_segment("20250103", "120000_300", [file1]) 221 + 222 + assert result.success is False 223 + assert client._revoked is True 224 + 225 + 226 + def test_upload_segment_all_retries_fail(mock_session, mock_config, tmp_path): 227 + from observe.remote_client import ObserverClient 228 + 229 + mock_config.return_value = {"observe": {"remote": {"key": "testkey123"}}} 230 + 231 + file1 = tmp_path / "audio.flac" 232 + file1.write_bytes(b"audio data") 233 + 234 + failure = MagicMock() 235 + failure.status_code = 500 236 + failure.text = "Server error" 237 + mock_session.post.return_value = failure 238 + 239 + with patch("observe.remote_client.time.sleep"): 240 + client = ObserverClient("main-stream") 241 + result = client.upload_segment("20250103", "120000_300", [file1]) 242 + 243 + assert result.success is False 244 + assert mock_session.post.call_count == 3 245 + 246 + 247 + def test_relay_event_success(mock_session, mock_config): 248 + from observe.remote_client import ObserverClient 249 + 250 + mock_config.return_value = {"observe": {"remote": {"key": "testkey123"}}} 251 + 252 + response = MagicMock() 253 + response.status_code = 200 254 + mock_session.post.return_value = response 255 + 256 + client = ObserverClient("main-stream") 257 + result = client.relay_event("observe", "status", mode="idle") 258 + 259 + assert result is True 260 + assert mock_session.post.call_args[1]["json"] == { 261 + "tract": "observe", 262 + "event": "status", 263 + "mode": "idle", 264 + } 265 + 266 + 267 + def test_relay_event_403(mock_session, mock_config): 268 + from observe.remote_client import ObserverClient 269 + 270 + mock_config.return_value = {"observe": {"remote": {"key": "testkey123"}}} 271 + 272 + response = MagicMock() 273 + response.status_code = 403 274 + response.text = "Forbidden" 275 + mock_session.post.return_value = response 276 + 277 + client = ObserverClient("main-stream") 278 + result = client.relay_event("observe", "status", mode="idle") 279 + 280 + assert result is False 281 + assert client._revoked is True 282 + 283 + 284 + def test_key_persistence(mock_session, mock_config, mock_journal): 285 + from observe.remote_client import ObserverClient 286 + 287 + client = ObserverClient("main-stream") 288 + client._persist_key("persisted-key") 289 + 290 + config = json.loads((mock_journal / "config" / "journal.json").read_text()) 291 + assert config == {"observe": {"remote": {"key": "persisted-key"}}} 292 + 293 + 294 + def test_key_persistence_preserves_existing(mock_session, mock_config, mock_journal): 295 + from observe.remote_client import ObserverClient 296 + 297 + config_dir = mock_journal / "config" 298 + config_dir.mkdir() 299 + config_path = config_dir / "journal.json" 300 + config_path.write_text( 301 + json.dumps( 302 + {"identity": {"name": "Jer"}, "observe": {"tmux": {"enabled": True}}} 303 + ) 304 + ) 305 + 306 + client = ObserverClient("main-stream") 307 + client._persist_key("persisted-key") 308 + 309 + config = json.loads(config_path.read_text()) 310 + assert config["identity"]["name"] == "Jer" 311 + assert config["observe"]["tmux"]["enabled"] is True 312 + assert config["observe"]["remote"]["key"] == "persisted-key" 313 + 314 + 315 + def test_cleanup_draft(tmp_path): 316 + from observe.remote_client import cleanup_draft 317 + 318 + draft_dir = tmp_path / "draft" 319 + draft_dir.mkdir() 320 + (draft_dir / "a.txt").write_text("a") 321 + (draft_dir / "b.txt").write_text("b") 322 + 323 + cleanup_draft(str(draft_dir)) 324 + 325 + assert not draft_dir.exists() 326 + 327 + 328 + def test_upload_duplicate_response(mock_session, mock_config, tmp_path): 329 + from observe.remote_client import ObserverClient 330 + 331 + mock_config.return_value = {"observe": {"remote": {"key": "testkey123"}}} 332 + 333 + file1 = tmp_path / "audio.flac" 334 + file1.write_bytes(b"audio data") 335 + 336 + response = MagicMock() 337 + response.status_code = 200 338 + response.json.return_value = { 339 + "status": "duplicate", 340 + "existing_segment": "120000_300", 341 + "message": "All files already received", 342 + } 343 + mock_session.post.return_value = response 344 + 345 + client = ObserverClient("main-stream") 346 + result = client.upload_segment("20250103", "120000_300", [file1]) 347 + 348 + assert result.success is True 349 + assert result.duplicate is True
+50 -125
observe/linux/observer.py
··· 39 39 from observe.hear import AudioRecorder 40 40 from observe.linux.audio import is_sink_muted 41 41 from observe.linux.screencast import Screencaster, StreamInfo 42 + from observe.remote_client import ObserverClient, cleanup_draft 42 43 from observe.utils import create_draft_folder, get_timestamp_parts 43 - from think.callosum import CallosumConnection 44 - from think.streams import stream_name, update_stream, write_segment_stream 45 - from think.utils import day_path, get_journal, get_rev, setup_cli 44 + from think.streams import stream_name 45 + from think.utils import setup_cli 46 46 47 47 logger = logging.getLogger(__name__) 48 48 ··· 78 78 self.running = True 79 79 self.stream = stream_name(host=HOST) 80 80 81 - # Callosum connection for events 82 - self._callosum: CallosumConnection | None = None 83 - self._journal_path: Path | None = None 81 + self._client: ObserverClient | None = None 84 82 85 83 # State tracking 86 84 self.start_at = time.time() # Wall-clock for filenames ··· 140 138 return False 141 139 logger.info("Screencast portal connected") 142 140 143 - # Start Callosum connection for events 144 - self._callosum = CallosumConnection(defaults={"rev": get_rev()}) 145 - self._callosum.start() 146 - self._journal_path = Path(get_journal()) 147 - logger.info("Callosum connection started") 141 + self._client = ObserverClient(self.stream) 142 + logger.info("Remote client initialized") 148 143 149 144 return True 150 145 ··· 247 242 """ 248 243 Handle window boundary rollover. 249 244 250 - Closes the current draft folder, renames it to final segment name, 251 - and emits the observing event. 245 + Closes the current draft folder, uploads segment files, and starts 246 + the next segment. 252 247 253 248 Args: 254 249 new_mode: The mode for the new segment ··· 256 251 # Get timestamp parts for this window and calculate duration 257 252 date_part, time_part = get_timestamp_parts(self.start_at) 258 253 duration = int(time.time() - self.start_at) 259 - day_dir = day_path(date_part) 260 254 261 255 # Stop screencast first (closes file handles) 262 256 stopped_streams: list[StreamInfo] = [] ··· 294 288 files = audio_files + screen_files 295 289 segment_key = f"{time_part}_{duration}" 296 290 297 - # Rename draft folder to final segment name (atomic handoff) 291 + # Upload segment files from draft directory 298 292 if self.draft_dir and files: 299 - final_segment_dir = str(day_dir / self.stream / segment_key) 300 - try: 301 - os.rename(self.draft_dir, final_segment_dir) 302 - logger.info( 303 - f"Segment finalized: {self.draft_dir} -> {final_segment_dir}" 293 + draft_path = Path(self.draft_dir) 294 + draft_files = [ 295 + draft_path / f 296 + for f in os.listdir(self.draft_dir) 297 + if (draft_path / f).is_file() 298 + ] 299 + if draft_files and self._client: 300 + meta = {"host": HOST, "platform": PLATFORM, "stream": self.stream} 301 + result = self._client.upload_segment( 302 + date_part, segment_key, draft_files, meta 304 303 ) 305 - except OSError as e: 306 - logger.error(f"Failed to rename draft folder: {e}") 307 - # Files stay in draft folder, won't be processed 308 - files = [] 309 - 310 - # Write stream identity for this segment 311 - if files: 312 - try: 313 - result = update_stream( 314 - self.stream, 315 - date_part, 316 - segment_key, 317 - type="observer", 318 - host=HOST, 319 - platform=PLATFORM, 320 - ) 321 - write_segment_stream( 322 - final_segment_dir, 323 - self.stream, 324 - result["prev_day"], 325 - result["prev_segment"], 326 - result["seq"], 304 + if result.success: 305 + logger.info( 306 + f"Segment uploaded: {segment_key} ({len(draft_files)} files)" 327 307 ) 328 - except Exception as e: 329 - logger.warning(f"Failed to write stream identity: {e}") 308 + else: 309 + logger.error(f"Segment upload failed: {segment_key}") 310 + cleanup_draft(self.draft_dir) 330 311 elif self.draft_dir and not files: 331 - # No files to save, remove empty draft folder 332 - try: 333 - os.rmdir(self.draft_dir) 334 - logger.debug(f"Removed empty draft folder: {self.draft_dir}") 335 - except OSError: 336 - pass # May have other files, ignore 312 + cleanup_draft(self.draft_dir) 337 313 338 314 self.draft_dir = None 339 315 ··· 356 332 357 333 logger.info(f"Mode transition: {old_mode} -> {new_mode}") 358 334 359 - # Emit observing event with what we saved this boundary 360 - if files and self._callosum: 361 - self._callosum.emit( 362 - "observe", 363 - "observing", 364 - day=date_part, 365 - segment=segment_key, 366 - files=files, 367 - host=HOST, 368 - platform=PLATFORM, 369 - stream=self.stream, 370 - ) 371 - logger.info(f"Segment observing: {segment_key} ({len(files)} files)") 372 - 373 335 def _create_draft_folder(self) -> str: 374 336 """Create a draft folder for the current segment.""" 375 337 self.draft_dir = create_draft_folder(self.start_at, self.stream) ··· 413 375 414 376 def emit_status(self): 415 377 """Emit observe.status event with current state.""" 416 - if not self._callosum: 378 + if not self._client: 417 379 return 418 380 419 - journal_path = str(self._journal_path) if self._journal_path else "" 420 381 elapsed = int(time.monotonic() - self.start_at_mono) 421 382 422 383 # Calculate screencast info 423 384 if self.current_mode == MODE_SCREENCAST and self.current_streams: 424 385 streams_info = [] 425 386 for stream in self.current_streams: 426 - try: 427 - rel_file = os.path.relpath(stream.file_path, journal_path) 428 - except ValueError: 429 - rel_file = stream.file_path 430 - 431 387 streams_info.append( 432 388 { 433 389 "position": stream.position, 434 390 "connector": stream.connector, 435 - "file": rel_file, 391 + "file": stream.file_path, 436 392 } 437 393 ) 438 394 ··· 465 421 else: 466 422 reported_mode = MODE_IDLE 467 423 468 - # Emit status 469 - self._callosum.emit( 424 + self._client.relay_event( 470 425 "observe", 471 426 "status", 472 427 mode=reported_mode, ··· 588 543 # Get timestamp parts for final save 589 544 date_part, time_part = get_timestamp_parts(self.start_at) 590 545 duration = int(time.time() - self.start_at) 591 - day_dir = day_path(date_part) 592 546 593 547 # Stop screencast first (closes file handles) 594 548 stopped_streams: list[StreamInfo] = [] ··· 613 567 segment_key = f"{time_part}_{duration}" 614 568 615 569 if self.draft_dir and files: 616 - final_segment_dir = str(day_dir / self.stream / segment_key) 617 - try: 618 - os.rename(self.draft_dir, final_segment_dir) 619 - logger.info(f"Final segment: {self.draft_dir} -> {final_segment_dir}") 620 - 621 - # Write stream identity for this segment 622 - try: 623 - result = update_stream( 624 - self.stream, 625 - date_part, 626 - segment_key, 627 - type="observer", 628 - host=HOST, 629 - platform=PLATFORM, 630 - ) 631 - write_segment_stream( 632 - final_segment_dir, 633 - self.stream, 634 - result["prev_day"], 635 - result["prev_segment"], 636 - result["seq"], 637 - ) 638 - except Exception as e: 639 - logger.warning(f"Failed to write stream identity: {e}") 640 - 641 - # Emit final observing event 642 - if self._callosum: 643 - self._callosum.emit( 644 - "observe", 645 - "observing", 646 - day=date_part, 647 - segment=segment_key, 648 - files=files, 649 - host=HOST, 650 - platform=PLATFORM, 651 - stream=self.stream, 652 - ) 570 + draft_path = Path(self.draft_dir) 571 + draft_files = [ 572 + draft_path / f 573 + for f in os.listdir(self.draft_dir) 574 + if (draft_path / f).is_file() 575 + ] 576 + if draft_files and self._client: 577 + meta = {"host": HOST, "platform": PLATFORM, "stream": self.stream} 578 + result = self._client.upload_segment( 579 + date_part, segment_key, draft_files, meta 580 + ) 581 + if result.success: 653 582 logger.info( 654 - f"Segment observing: {segment_key} ({len(files)} files)" 583 + f"Final segment uploaded: {segment_key} ({len(draft_files)} files)" 655 584 ) 656 - except OSError as e: 657 - logger.error(f"Failed to rename final draft folder: {e}") 585 + else: 586 + logger.error(f"Final segment upload failed: {segment_key}") 587 + cleanup_draft(self.draft_dir) 658 588 elif self.draft_dir: 659 - # No files, remove empty draft folder 660 - try: 661 - os.rmdir(self.draft_dir) 662 - except OSError: 663 - pass 589 + cleanup_draft(self.draft_dir) 664 590 665 591 self.draft_dir = None 666 592 ··· 668 594 self.audio_recorder.stop_recording() 669 595 logger.info("Audio recording stopped") 670 596 671 - # Stop Callosum connection 672 - if self._callosum: 673 - self._callosum.stop() 674 - self._callosum = None 675 - logger.info("Callosum connection stopped") 597 + if self._client: 598 + self._client.stop() 599 + self._client = None 600 + logger.info("Remote client stopped") 676 601 677 602 678 603 def _recover_session_env() -> None:
+9 -4
observe/remote_cli.py
··· 137 137 params={"name": name, "key_prefix": key[:8]}, 138 138 ) 139 139 140 - print(f"Remote observer created:") 140 + print("Remote observer created:") 141 141 print(f" Name: {name}") 142 142 print(f" Prefix: {key[:8]}") 143 143 print(f" Key: {key}") ··· 257 257 if hist_dir.exists(): 258 258 day_files = sorted(hist_dir.glob("*.jsonl"), reverse=True)[:7] 259 259 if day_files: 260 - print(f"\n Recent days:") 260 + print("\n Recent days:") 261 261 for df in day_files: 262 262 day = df.stem 263 263 records = load_history(key_prefix, day) ··· 278 278 connected = sum(1 for r in remotes if _status_label(r) == "connected") 279 279 disconnected = sum(1 for r in remotes if _status_label(r) == "disconnected") 280 280 revoked = sum(1 for r in remotes if _status_label(r) == "revoked") 281 - total_segments = sum(r.get("stats", {}).get("segments_received", 0) for r in remotes) 281 + total_segments = sum( 282 + r.get("stats", {}).get("segments_received", 0) for r in remotes 283 + ) 282 284 total_bytes = sum(r.get("stats", {}).get("bytes_received", 0) for r in remotes) 283 285 284 286 print(f"Remote observers: {len(remotes)} total") ··· 325 327 # status 326 328 p_status = sub.add_parser("status", help="Show remote status details") 327 329 p_status.add_argument( 328 - "identifier", nargs="?", default=None, help="Remote name or key prefix (omit for overview)" 330 + "identifier", 331 + nargs="?", 332 + default=None, 333 + help="Remote name or key prefix (omit for overview)", 329 334 ) 330 335 331 336 args = setup_cli(parser)
+236
observe/remote_client.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + from __future__ import annotations 5 + 6 + import json 7 + import logging 8 + import os 9 + import platform 10 + import socket 11 + import time 12 + from pathlib import Path 13 + from typing import Any, NamedTuple 14 + 15 + import requests 16 + 17 + from think.utils import get_config, get_journal 18 + 19 + logger = logging.getLogger(__name__) 20 + HOST = socket.gethostname() 21 + PLATFORM = platform.system().lower() 22 + DEFAULT_URL = "http://localhost:5173" 23 + RETRY_BACKOFF = [1, 5, 15] 24 + MAX_RETRIES = 3 25 + UPLOAD_TIMEOUT = 300 26 + EVENT_TIMEOUT = 30 27 + 28 + 29 + class UploadResult(NamedTuple): 30 + success: bool 31 + duplicate: bool = False 32 + 33 + 34 + def cleanup_draft(draft_dir: str) -> None: 35 + """Remove all files in a draft directory and delete the directory.""" 36 + try: 37 + for name in os.listdir(draft_dir): 38 + fp = os.path.join(draft_dir, name) 39 + if os.path.isfile(fp): 40 + os.remove(fp) 41 + os.rmdir(draft_dir) 42 + except OSError: 43 + pass 44 + 45 + 46 + class ObserverClient: 47 + """HTTP client for uploading observer segments to the remote ingest server.""" 48 + 49 + def __init__( 50 + self, 51 + stream: str, 52 + host: str = HOST, 53 + platform_name: str = PLATFORM, 54 + ): 55 + config = get_config() 56 + remote_cfg = config.get("observe", {}).get("remote", {}) 57 + self._url = remote_cfg.get("url", DEFAULT_URL).rstrip("/") 58 + self._key = remote_cfg.get("key") 59 + self._auto_register = remote_cfg.get("auto_register", True) 60 + self._name = remote_cfg.get("name") or stream 61 + self._stream = stream 62 + self._host = host 63 + self._platform = platform_name 64 + self._revoked = False 65 + self._session = requests.Session() 66 + 67 + def _persist_key(self, key: str) -> None: 68 + journal = get_journal() 69 + config_path = Path(journal) / "config" / "journal.json" 70 + config_path.parent.mkdir(parents=True, exist_ok=True) 71 + 72 + config: dict[str, Any] = {} 73 + if config_path.exists(): 74 + try: 75 + with open(config_path, encoding="utf-8") as f: 76 + config = json.load(f) 77 + except (json.JSONDecodeError, OSError) as e: 78 + logger.error( 79 + f"Cannot read {config_path}: {e} — skipping key persistence" 80 + ) 81 + return 82 + 83 + config.setdefault("observe", {}).setdefault("remote", {})["key"] = key 84 + 85 + with open(config_path, "w", encoding="utf-8") as f: 86 + json.dump(config, f, indent=2) 87 + f.write("\n") 88 + 89 + logger.info(f"Persisted remote key to {config_path}") 90 + 91 + def _ensure_registered(self) -> None: 92 + if self._key: 93 + return 94 + if not self._auto_register: 95 + logger.error( 96 + "No remote key configured and auto_register disabled. " 97 + "Set observe.remote.key in journal config or enable auto_register." 98 + ) 99 + return 100 + 101 + url = f"{self._url}/app/remote/api/create" 102 + for attempt, delay in enumerate(RETRY_BACKOFF): 103 + try: 104 + resp = self._session.post( 105 + url, 106 + json={"name": self._name}, 107 + timeout=EVENT_TIMEOUT, 108 + ) 109 + if resp.status_code == 200: 110 + data = resp.json() 111 + self._key = data["key"] 112 + self._persist_key(self._key) 113 + logger.info( 114 + f"Auto-registered as '{self._name}' (key: {self._key[:8]}...)" 115 + ) 116 + return 117 + elif resp.status_code == 403: 118 + self._revoked = True 119 + logger.error("Registration rejected (403)") 120 + return 121 + else: 122 + logger.warning( 123 + f"Registration attempt {attempt + 1} failed: {resp.status_code}" 124 + ) 125 + except requests.RequestException as e: 126 + logger.warning(f"Registration attempt {attempt + 1} failed: {e}") 127 + if attempt < len(RETRY_BACKOFF) - 1: 128 + time.sleep(delay) 129 + logger.error(f"Registration failed after {MAX_RETRIES} attempts") 130 + 131 + def upload_segment( 132 + self, 133 + day: str, 134 + segment: str, 135 + files: list[Path], 136 + meta: dict[str, Any] | None = None, 137 + ) -> UploadResult: 138 + if self._revoked: 139 + logger.warning("Client revoked, skipping upload") 140 + return UploadResult(False) 141 + 142 + self._ensure_registered() 143 + if not self._key: 144 + return UploadResult(False) 145 + 146 + url = f"{self._url}/app/remote/ingest/{self._key}" 147 + for attempt, delay in enumerate(RETRY_BACKOFF): 148 + file_handles = [] 149 + files_data = [] 150 + try: 151 + for path in files: 152 + if not path.exists(): 153 + logger.warning(f"File not found, skipping: {path}") 154 + continue 155 + fh = open(path, "rb") 156 + file_handles.append(fh) 157 + files_data.append( 158 + ("files", (path.name, fh, "application/octet-stream")) 159 + ) 160 + 161 + if not files_data: 162 + logger.error("No valid files to upload") 163 + return UploadResult(False) 164 + 165 + data: dict[str, Any] = { 166 + "day": day, 167 + "segment": segment, 168 + } 169 + if not meta or "host" not in meta: 170 + data["host"] = self._host 171 + if not meta or "platform" not in meta: 172 + data["platform"] = self._platform 173 + if meta: 174 + data["meta"] = json.dumps(meta) 175 + 176 + response = self._session.post( 177 + url, 178 + data=data, 179 + files=files_data, 180 + timeout=UPLOAD_TIMEOUT, 181 + ) 182 + 183 + if response.status_code == 200: 184 + resp_data = response.json() 185 + is_duplicate = resp_data.get("status") == "duplicate" 186 + return UploadResult(True, duplicate=is_duplicate) 187 + if response.status_code == 403: 188 + self._revoked = True 189 + logger.error("Upload rejected (403)") 190 + return UploadResult(False) 191 + 192 + logger.warning( 193 + f"Upload attempt {attempt + 1} failed: " 194 + f"{response.status_code} {response.text}" 195 + ) 196 + except requests.RequestException as e: 197 + logger.warning(f"Upload attempt {attempt + 1} failed: {e}") 198 + finally: 199 + for fh in file_handles: 200 + try: 201 + fh.close() 202 + except Exception: 203 + pass 204 + 205 + if attempt < len(RETRY_BACKOFF) - 1: 206 + time.sleep(delay) 207 + 208 + logger.error(f"Upload failed after {MAX_RETRIES} attempts: {day}/{segment}") 209 + return UploadResult(False) 210 + 211 + def relay_event(self, tract: str, event: str, **fields: Any) -> bool: 212 + if self._revoked: 213 + return False 214 + 215 + self._ensure_registered() 216 + if not self._key: 217 + return False 218 + 219 + url = f"{self._url}/app/remote/ingest/{self._key}/event" 220 + payload = {"tract": tract, "event": event, **fields} 221 + try: 222 + resp = self._session.post(url, json=payload, timeout=EVENT_TIMEOUT) 223 + if resp.status_code == 200: 224 + return True 225 + if resp.status_code == 403: 226 + self._revoked = True 227 + logger.error("Event relay rejected (403)") 228 + return False 229 + logger.warning(f"Event relay failed: {resp.status_code} {resp.text}") 230 + return False 231 + except requests.RequestException as e: 232 + logger.debug(f"Event relay failed: {e}") 233 + return False 234 + 235 + def stop(self) -> None: 236 + self._session.close()
+33 -55
observe/tmux/observer.py
··· 6 6 Standalone tmux terminal capture observer. 7 7 8 8 Continuously polls all active tmux sessions and captures terminal content, 9 - creating 5-minute segments with draft-to-final atomic rename. 9 + creating 5-minute segments uploaded via HTTP to the ingest server. 10 10 11 11 Always-on: no idle detection, no screen activity checks. Just captures 12 12 whatever tmux sessions exist on the configurable interval. ··· 23 23 import time 24 24 from pathlib import Path 25 25 26 + from observe.remote_client import ObserverClient, cleanup_draft 26 27 from observe.tmux.capture import TmuxCapture, write_captures_jsonl 27 28 from observe.utils import create_draft_folder, get_timestamp_parts 28 - from think.callosum import CallosumConnection 29 - from think.streams import stream_name, update_stream, write_segment_stream 30 - from think.utils import day_path, get_config, get_rev, setup_cli 29 + from think.streams import stream_name 30 + from think.utils import get_config, setup_cli 31 31 32 32 logger = logging.getLogger(__name__) 33 33 ··· 41 41 self.tmux_capture = TmuxCapture() 42 42 self.running = True 43 43 self.stream = stream_name(host=HOST, qualifier="tmux") 44 - self._callosum: CallosumConnection | None = None 44 + self._client: ObserverClient | None = None 45 45 self.start_at = time.time() 46 46 self.start_at_mono = time.monotonic() 47 47 self.draft_dir: str | None = None ··· 66 66 return enabled, capture_interval 67 67 68 68 def setup(self) -> bool: 69 - """Initialize config, tmux availability, and Callosum.""" 69 + """Initialize config, tmux availability, and remote client.""" 70 70 enabled, self.capture_interval = self._load_config() 71 71 if not enabled: 72 72 logger.info("Tmux capture disabled in config") ··· 76 76 logger.error("Tmux not available") 77 77 return False 78 78 79 - self._callosum = CallosumConnection(defaults={"rev": get_rev()}) 80 - self._callosum.start() 81 - logger.info("Callosum connection started") 79 + self._client = ObserverClient(self.stream) 80 + logger.info("Remote client initialized") 82 81 return True 83 82 84 83 def capture(self): ··· 128 127 pass 129 128 130 129 def finalize_segment(self) -> list[str]: 131 - """Write captures to disk, finalize the segment, and emit observing.""" 130 + """Write captures to disk, upload the segment, and clean up draft state.""" 132 131 if not self.captures or not self.draft_dir: 133 132 self._remove_empty_draft() 134 133 self._reset_capture_state() ··· 142 141 143 142 date_part, time_part = get_timestamp_parts(self.start_at) 144 143 duration = int(time.time() - self.start_at) 145 - day_dir = day_path(date_part) 146 144 segment_key = f"{time_part}_{duration}" 147 - final_segment_dir = str(day_dir / self.stream / segment_key) 148 145 149 - try: 150 - os.rename(self.draft_dir, final_segment_dir) 151 - logger.info(f"Segment finalized: {self.draft_dir} -> {final_segment_dir}") 152 - except OSError as e: 153 - logger.error(f"Failed to rename draft folder: {e}") 154 - tmux_files = [] 155 - 156 - if tmux_files: 157 - try: 158 - result = update_stream( 159 - self.stream, 160 - date_part, 161 - segment_key, 162 - type="observer", 163 - host=HOST, 164 - platform=PLATFORM, 165 - ) 166 - write_segment_stream( 167 - final_segment_dir, 168 - self.stream, 169 - result["prev_day"], 170 - result["prev_segment"], 171 - result["seq"], 172 - ) 173 - except Exception as e: 174 - logger.warning(f"Failed to write stream identity: {e}") 175 - 176 - if self._callosum: 177 - self._callosum.emit( 178 - "observe", 179 - "observing", 180 - day=date_part, 181 - segment=segment_key, 182 - files=tmux_files, 183 - host=HOST, 184 - platform=PLATFORM, 185 - stream=self.stream, 146 + # Upload from draft directory 147 + draft_path = Path(self.draft_dir) 148 + draft_files = [ 149 + draft_path / f 150 + for f in os.listdir(self.draft_dir) 151 + if (draft_path / f).is_file() 152 + ] 153 + if draft_files and self._client: 154 + meta = {"host": HOST, "platform": PLATFORM, "stream": self.stream} 155 + result = self._client.upload_segment( 156 + date_part, segment_key, draft_files, meta 157 + ) 158 + if result.success: 159 + logger.info( 160 + f"Segment uploaded: {segment_key} ({len(draft_files)} files)" 186 161 ) 162 + else: 163 + logger.error(f"Segment upload failed: {segment_key}") 164 + cleanup_draft(self.draft_dir) 187 165 188 166 self._reset_capture_state() 189 167 return tmux_files ··· 196 174 197 175 def emit_status(self): 198 176 """Emit observe.status with current tmux capture state.""" 199 - if not self._callosum: 177 + if not self._client: 200 178 return 201 179 202 180 elapsed = int(time.monotonic() - self.start_at_mono) ··· 206 184 "sessions": sorted(self.sessions_seen), 207 185 "window_elapsed_seconds": elapsed, 208 186 } 209 - self._callosum.emit( 187 + self._client.relay_event( 210 188 "observe", 211 189 "status", 212 190 mode="tmux", ··· 234 212 await self.shutdown() 235 213 236 214 async def shutdown(self): 237 - """Finalize the current segment and stop Callosum.""" 215 + """Finalize the current segment and stop the remote client.""" 238 216 self.finalize_segment() 239 217 self.draft_dir = None 240 - if self._callosum: 241 - self._callosum.stop() 242 - self._callosum = None 218 + if self._client: 219 + self._client.stop() 220 + self._client = None 243 221 244 222 245 223 async def async_main(args):
+3 -1
observe/transcribe/gemini.py
··· 383 383 ) 384 384 385 385 transcribe_time = time.perf_counter() - t0 386 - logger.debug("Gemini raw response (%d chars):\n%s", len(response_text), response_text[:2000]) 386 + logger.debug( 387 + "Gemini raw response (%d chars):\n%s", len(response_text), response_text[:2000] 388 + ) 387 389 388 390 # Parse JSON response 389 391 try: