personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add unified ObserverBackend for local/remote observer modes

Consolidates local (Callosum) and remote (HTTP upload) logic into a single
ObserverBackend class that both platform observers use. This eliminates
duplicate conditional code and enables --remote support on macOS.

Key changes:
- New ObserverBackend class in observe/remote.py handles mode switching
- Both Linux and macOS observers now use backend.emit() and segment_complete()
- JOURNAL_PATH optional in remote mode (uses /tmp/solstone-observer for staging)
- Added --remote CLI argument to macOS observer
- Temp staging directory persists after crashes for diagnostics

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+263 -157
+35 -100
observe/linux/observer.py
··· 37 37 from observe.hear import AudioRecorder 38 38 from observe.linux.audio import is_sink_muted 39 39 from observe.linux.screencast import Screencaster, StreamInfo 40 - from observe.remote import HOST, PLATFORM, RemoteClient 40 + from observe.remote import ObserverBackend, staging_day_path 41 41 from observe.tmux.capture import TmuxCapture, write_captures_jsonl 42 - from think.callosum import CallosumConnection 43 - from think.utils import day_path, setup_cli 42 + from think.utils import setup_cli 44 43 45 44 logger = logging.getLogger(__name__) 46 45 ··· 62 61 63 62 def __init__(self, interval: int = 300, remote_url: str | None = None): 64 63 self.interval = interval 65 - self.remote_url = remote_url 66 64 self.audio_recorder = AudioRecorder() 67 65 self.screencaster = Screencaster() 68 66 self.tmux_capture = TmuxCapture() 69 67 self.bus: MessageBus | None = None 70 68 self.running = True 71 - self.callosum: CallosumConnection | None = None 72 - self.remote_client: RemoteClient | None = None 69 + 70 + # Unified backend for local/remote modes 71 + self.backend = ObserverBackend(remote_url) 73 72 74 73 # State tracking 75 74 self.start_at = time.time() # Wall-clock for filenames ··· 128 127 else: 129 128 logger.info("Tmux not available (will only use screencast)") 130 129 131 - # Start Callosum connection for status events (or remote client) 132 - if self.remote_url: 133 - self.remote_client = RemoteClient(self.remote_url) 134 - self.remote_client.start() 135 - logger.info(f"Remote client started: {self.remote_url[:50]}...") 136 - else: 137 - self.callosum = CallosumConnection() 138 - self.callosum.start() 139 - logger.info("Callosum connection started") 130 + # Start unified backend (handles local/remote modes) 131 + self.backend.start() 140 132 141 133 return True 142 134 ··· 275 267 # Get timestamp parts for this window and calculate duration 276 268 date_part, time_part = self.get_timestamp_parts(self.start_at) 277 269 duration = int(time.time() - self.start_at) 278 - day_dir = day_path(date_part) 270 + day_dir = staging_day_path(self.backend.staging_path, date_part) 279 271 280 272 # Stop screencast first (closes file handles) 281 273 stopped_streams: list[StreamInfo] = [] ··· 371 363 372 364 # Emit observing event with what we saved this boundary 373 365 if files: 374 - if self.remote_client: 375 - # Remote mode: upload files to remote server 376 - segment_dir = day_dir / segment_key 377 - file_paths = [segment_dir / f for f in files] 378 - if self.remote_client.upload_and_cleanup( 379 - date_part, segment_key, file_paths 380 - ): 381 - logger.info(f"Segment uploaded: {segment_key} ({len(files)} files)") 382 - else: 383 - logger.error( 384 - f"Segment upload failed: {segment_key} - files kept locally" 385 - ) 386 - elif self.callosum: 387 - # Local mode: emit to local Callosum 388 - # Files are now simple names (e.g., "audio.flac" not "143022_300_audio.flac") 389 - self.callosum.emit( 390 - "observe", 391 - "observing", 392 - day=date_part, 393 - segment=segment_key, 394 - files=files, 395 - host=HOST, 396 - platform=PLATFORM, 397 - ) 398 - logger.info(f"Segment observing: {segment_key} ({len(files)} files)") 366 + segment_dir = day_dir / segment_key 367 + file_paths = [segment_dir / f for f in files] 368 + self.backend.segment_complete(date_part, segment_key, file_paths) 399 369 400 370 def _create_draft_folder(self) -> str: 401 371 """ ··· 405 375 Path to the draft folder (YYYYMMDD/HHMMSS_draft/) 406 376 """ 407 377 date_part, time_part = self.get_timestamp_parts(self.start_at) 408 - day_dir = day_path(date_part) 378 + day_dir = staging_day_path(self.backend.staging_path, date_part) 409 379 410 380 # Create draft folder: YYYYMMDD/HHMMSS_draft/ 411 381 draft_name = f"{time_part}_draft" ··· 477 447 478 448 def emit_status(self): 479 449 """Emit observe.status event with current state.""" 480 - journal_path = os.getenv("JOURNAL_PATH", "") 450 + staging_path = str(self.backend.staging_path) 481 451 elapsed = int(time.monotonic() - self.start_at_mono) 482 452 483 453 # Calculate screencast info ··· 485 455 streams_info = [] 486 456 for stream in self.current_streams: 487 457 try: 488 - rel_file = ( 489 - os.path.relpath(stream.file_path, journal_path) 490 - if journal_path 491 - else stream.file_path 492 - ) 458 + rel_file = os.path.relpath(stream.file_path, staging_path) 493 459 except ValueError: 494 460 rel_file = stream.file_path 495 461 ··· 536 502 "tmux_active": self.cached_tmux_active, 537 503 } 538 504 539 - # Emit to remote or local Callosum 540 - if self.remote_client: 541 - self.remote_client.emit( 542 - "observe", 543 - "status", 544 - mode=self.current_mode, 545 - screencast=screencast_info, 546 - tmux=tmux_info, 547 - audio=audio_info, 548 - activity=activity_info, 549 - host=HOST, 550 - platform=PLATFORM, 551 - ) 552 - elif self.callosum: 553 - self.callosum.emit( 554 - "observe", 555 - "status", 556 - mode=self.current_mode, 557 - screencast=screencast_info, 558 - tmux=tmux_info, 559 - audio=audio_info, 560 - activity=activity_info, 561 - host=HOST, 562 - platform=PLATFORM, 563 - ) 505 + # Emit status via unified backend 506 + self.backend.emit( 507 + "observe", 508 + "status", 509 + mode=self.current_mode, 510 + screencast=screencast_info, 511 + tmux=tmux_info, 512 + audio=audio_info, 513 + activity=activity_info, 514 + ) 564 515 565 516 async def main_loop(self): 566 517 """Run the main observer loop.""" ··· 673 624 # Get timestamp parts for final save 674 625 date_part, time_part = self.get_timestamp_parts(self.start_at) 675 626 duration = int(time.time() - self.start_at) 676 - day_dir = day_path(date_part) 627 + day_dir = staging_day_path(self.backend.staging_path, date_part) 677 628 678 629 # Stop screencast first (closes file handles) 679 630 stopped_streams: list[StreamInfo] = [] ··· 710 661 os.rename(self.draft_dir, final_segment_dir) 711 662 logger.info(f"Final segment: {self.draft_dir} -> {final_segment_dir}") 712 663 713 - # Emit final observing event 714 - if self.remote_client: 715 - segment_dir = day_dir / segment_key 716 - file_paths = [segment_dir / f for f in files] 717 - self.remote_client.upload_and_cleanup( 718 - date_part, segment_key, file_paths 719 - ) 720 - elif self.callosum: 721 - self.callosum.emit( 722 - "observe", 723 - "observing", 724 - day=date_part, 725 - segment=segment_key, 726 - files=files, 727 - host=HOST, 728 - platform=PLATFORM, 729 - ) 664 + # Emit final observing event via backend 665 + segment_dir = day_dir / segment_key 666 + file_paths = [segment_dir / f for f in files] 667 + self.backend.segment_complete(date_part, segment_key, file_paths) 730 668 except OSError as e: 731 669 logger.error(f"Failed to rename final draft folder: {e}") 732 670 elif self.draft_dir: ··· 742 680 self.audio_recorder.stop_recording() 743 681 logger.info("Audio recording stopped") 744 682 745 - # Stop Callosum or remote client 746 - if self.remote_client: 747 - self.remote_client.stop() 748 - logger.info("Remote client stopped") 749 - elif self.callosum: 750 - self.callosum.stop() 751 - logger.info("Callosum connection stopped") 683 + # Stop unified backend 684 + self.backend.stop() 685 + logger.info("Backend stopped") 752 686 753 687 754 688 async def async_main(args): ··· 804 738 ) 805 739 args = setup_cli(parser) 806 740 807 - # Verify journal path exists 741 + # Verify journal path exists (only required for local mode) 808 742 journal = os.getenv("JOURNAL_PATH") 809 - if not journal or not os.path.exists(journal): 743 + if not args.remote and (not journal or not os.path.exists(journal)): 810 744 logger.error(f"JOURNAL_PATH not set or does not exist: {journal}") 745 + logger.error("Set JOURNAL_PATH or use --remote for remote mode") 811 746 sys.exit(1) 812 747 813 748 # Log remote mode if enabled
+49 -51
observe/macos/observer.py
··· 30 30 is_screen_locked, 31 31 ) 32 32 from observe.macos.screencapture import AudioInfo, DisplayInfo, ScreenCaptureKitManager 33 - from observe.remote import HOST, PLATFORM 34 - from think.callosum import CallosumConnection 35 - from think.utils import day_path, setup_cli 33 + from observe.remote import ObserverBackend, staging_day_path 34 + from think.utils import setup_cli 36 35 37 36 logger = logging.getLogger(__name__) 38 37 ··· 47 46 class MacOSObserver: 48 47 """macOS audio and screencast observer using ScreenCaptureKit.""" 49 48 50 - def __init__(self, interval: int = 300, sck_cli_path: str = "sck-cli"): 49 + def __init__( 50 + self, 51 + interval: int = 300, 52 + sck_cli_path: str = "sck-cli", 53 + remote_url: str | None = None, 54 + ): 51 55 """ 52 56 Initialize the macOS observer. 53 57 54 58 Args: 55 59 interval: Window duration in seconds (default: 300 = 5 minutes) 56 60 sck_cli_path: Path to sck-cli executable 61 + remote_url: Remote server URL for uploading segments (optional) 57 62 """ 58 63 self.interval = interval 59 64 self.screencapture = ScreenCaptureKitManager(sck_cli_path=sck_cli_path) 60 65 self.running = True 61 - self.callosum: CallosumConnection | None = None 66 + 67 + # Unified backend for local/remote modes 68 + self.backend = ObserverBackend(remote_url) 62 69 63 70 # State tracking 64 71 self.start_at = time.time() # Wall-clock for filenames ··· 83 90 self.segment_is_muted = False 84 91 85 92 async def setup(self): 86 - """Initialize ScreenCaptureKit and Callosum connection.""" 93 + """Initialize ScreenCaptureKit and backend connection.""" 87 94 # Verify sck-cli is available 88 95 sck_path = shutil.which(self.screencapture.sck_cli_path) 89 96 if not sck_path: ··· 91 98 return False 92 99 logger.info(f"Found sck-cli at: {sck_path}") 93 100 94 - # Start Callosum connection for status events 95 - self.callosum = CallosumConnection() 96 - self.callosum.start() 97 - logger.info("Callosum connection started") 101 + # Start unified backend (handles local/remote modes) 102 + self.backend.start() 98 103 99 104 return True 100 105 ··· 226 231 # Get timestamp parts for this window and calculate duration 227 232 date_part, time_part = self.get_timestamp_parts(self.start_at) 228 233 duration = int(time.time() - self.start_at) 229 - day_dir = day_path(date_part) 234 + day_dir = staging_day_path(self.backend.staging_path, date_part) 230 235 segment_key = f"{time_part}_{duration}" 231 236 232 237 saved_files: list[str] = [] ··· 320 325 if is_active and not self.cached_screen_locked: 321 326 self.initialize_capture() 322 327 323 - # Emit observing event with saved files 324 - if saved_files and self.callosum: 325 - self.callosum.emit( 326 - "observe", 327 - "observing", 328 - day=date_part, 329 - segment=segment_key, 330 - files=saved_files, 331 - host=HOST, 332 - platform=PLATFORM, 328 + # Emit observing event with saved files via backend 329 + if saved_files: 330 + segment_dir = ( 331 + staging_day_path(self.backend.staging_path, date_part) / segment_key 333 332 ) 334 - logger.info(f"Segment observing: {segment_key} ({len(saved_files)} files)") 333 + file_paths = [segment_dir / f for f in saved_files] 334 + self.backend.segment_complete(date_part, segment_key, file_paths) 335 335 336 336 def _create_draft_folder(self) -> str: 337 337 """ ··· 341 341 Path to the draft folder (YYYYMMDD/HHMMSS_draft/) 342 342 """ 343 343 date_part, time_part = self.get_timestamp_parts(self.start_at) 344 - day_dir = day_path(date_part) 344 + day_dir = staging_day_path(self.backend.staging_path, date_part) 345 345 346 346 # Create draft folder: YYYYMMDD/HHMMSS_draft/ 347 347 draft_name = f"{time_part}_draft" ··· 401 401 - audio: always empty (macOS checks threshold at boundary, not real-time) 402 402 - activity: system activity status 403 403 """ 404 - if not self.callosum: 405 - return 406 - 407 - journal_path = os.getenv("JOURNAL_PATH", "") 404 + staging_path = str(self.backend.staging_path) 408 405 409 406 # Determine mode (macOS is binary: screencast or idle) 410 407 mode = "screencast" if self.capture_running else "idle" ··· 415 412 streams_info = [] 416 413 for display in self.current_displays: 417 414 try: 418 - rel_file = ( 419 - os.path.relpath(display.file_path, journal_path) 420 - if journal_path 421 - else display.file_path 422 - ) 415 + rel_file = os.path.relpath(display.file_path, staging_path) 423 416 except ValueError: 424 417 rel_file = display.file_path 425 418 ··· 457 450 "sink_muted": self.cached_is_muted, 458 451 } 459 452 460 - self.callosum.emit( 453 + # Emit status via unified backend 454 + self.backend.emit( 461 455 "observe", 462 456 "status", 463 457 mode=mode, ··· 465 459 tmux=tmux_info, 466 460 audio=audio_info, 467 461 activity=activity_info, 468 - host=HOST, 469 - platform=PLATFORM, 470 462 ) 471 463 472 464 async def main_loop(self): ··· 544 536 # Finalize segment (rename files and folder) 545 537 date_part, segment_key, saved_files = self._finalize_segment() 546 538 547 - # Emit observing event for final segment 548 - if saved_files and self.callosum: 549 - self.callosum.emit( 550 - "observe", 551 - "observing", 552 - day=date_part, 553 - segment=segment_key, 554 - files=saved_files, 555 - host=HOST, 556 - platform=PLATFORM, 539 + # Emit observing event for final segment via backend 540 + if saved_files: 541 + segment_dir = ( 542 + staging_day_path(self.backend.staging_path, date_part) / segment_key 557 543 ) 544 + file_paths = [segment_dir / f for f in saved_files] 545 + self.backend.segment_complete(date_part, segment_key, file_paths) 558 546 559 - # Stop Callosum connection 560 - if self.callosum: 561 - self.callosum.stop() 562 - logger.info("Callosum connection stopped") 547 + # Stop backend 548 + self.backend.stop() 549 + logger.info("Backend stopped") 563 550 564 551 logger.info("Shutdown complete") 565 552 ··· 569 556 observer = MacOSObserver( 570 557 interval=args.interval, 571 558 sck_cli_path=args.sck_cli_path, 559 + remote_url=getattr(args, "remote", None), 572 560 ) 573 561 574 562 # Setup signal handlers ··· 613 601 default="sck-cli", 614 602 help="Path to sck-cli executable (default: sck-cli from PATH).", 615 603 ) 604 + parser.add_argument( 605 + "--remote", 606 + type=str, 607 + help="Remote server URL for uploading segments (e.g., https://server:5000/app/remote/ingest/KEY)", 608 + ) 616 609 args = setup_cli(parser) 617 610 618 - # Verify journal path exists 611 + # Verify journal path exists (only required for local mode) 619 612 journal = os.getenv("JOURNAL_PATH") 620 - if not journal or not os.path.exists(journal): 613 + if not args.remote and (not journal or not os.path.exists(journal)): 621 614 logger.error(f"JOURNAL_PATH not set or does not exist: {journal}") 615 + logger.error("Set JOURNAL_PATH or use --remote for remote mode") 622 616 sys.exit(1) 617 + 618 + # Log remote mode if enabled 619 + if args.remote: 620 + logger.info(f"Remote mode enabled: {args.remote[:50]}...") 623 621 624 622 # Run async main 625 623 try:
+179 -6
observe/remote.py
··· 1 1 # SPDX-License-Identifier: AGPL-3.0-only 2 2 # Copyright (c) 2026 sol pbc 3 3 4 - """Remote client for uploading observer data to a remote server. 4 + """Remote observer support and unified backend for local/remote modes. 5 5 6 - This module provides functionality for: 7 - - Uploading segment files to a remote server 8 - - Sending events to the remote Callosum relay 9 - - Retry logic for failed uploads 6 + This module provides: 7 + - ObserverBackend: Unified interface for local (Callosum) and remote (HTTP) modes 8 + - RemoteClient: HTTP client for uploading segments and relaying events 9 + - Staging path management for remote observers without JOURNAL_PATH 10 10 """ 11 11 12 12 from __future__ import annotations 13 13 14 14 import logging 15 + import os 15 16 import platform 16 17 import queue 17 18 import socket 19 + import tempfile 18 20 import threading 19 21 import time 20 22 from pathlib import Path 21 23 22 24 import requests 25 + 26 + from think.callosum import CallosumConnection 23 27 24 28 logger = logging.getLogger(__name__) 25 29 ··· 61 65 self._stop_event.clear() 62 66 self._event_thread = threading.Thread(target=self._event_loop, daemon=True) 63 67 self._event_thread.start() 64 - logger.info(f"Remote client started: {self.remote_url[:50]}...") 65 68 66 69 def stop(self) -> None: 67 70 """Stop background event sender thread.""" ··· 230 233 logger.warning(f"Failed to delete {path}: {e}") 231 234 return True 232 235 return False 236 + 237 + 238 + class ObserverBackend: 239 + """Unified backend for observer segment completion and status events. 240 + 241 + Transparently handles local (Callosum) vs remote (HTTP upload) modes. 242 + Observers interact with this single interface regardless of mode. 243 + 244 + In remote mode without JOURNAL_PATH, creates a temp staging directory 245 + that persists after crashes for diagnostics. 246 + """ 247 + 248 + def __init__(self, remote_url: str | None = None): 249 + """Initialize the observer backend. 250 + 251 + Args: 252 + remote_url: Remote ingest URL (if None, uses local Callosum) 253 + """ 254 + self.remote_url = remote_url 255 + self._remote_client: RemoteClient | None = None 256 + self._callosum: CallosumConnection | None = None 257 + self._staging_path: Path | None = None 258 + 259 + @property 260 + def is_remote(self) -> bool: 261 + """True if operating in remote mode.""" 262 + return self.remote_url is not None 263 + 264 + @property 265 + def staging_path(self) -> Path: 266 + """Get the staging path for segment files. 267 + 268 + In local mode: uses JOURNAL_PATH (must be set). 269 + In remote mode: uses JOURNAL_PATH if set, otherwise creates temp directory. 270 + 271 + The temp directory persists after crashes for diagnostics. 272 + """ 273 + if self._staging_path is not None: 274 + return self._staging_path 275 + 276 + journal = os.getenv("JOURNAL_PATH") 277 + 278 + if journal and os.path.exists(journal): 279 + self._staging_path = Path(journal) 280 + elif self.is_remote: 281 + # Remote mode without JOURNAL_PATH - create temp staging directory 282 + # Use fixed subdirectory name so it persists and is findable 283 + staging = Path(tempfile.gettempdir()) / "solstone-observer" 284 + staging.mkdir(parents=True, exist_ok=True) 285 + self._staging_path = staging 286 + logger.info(f"Using temp staging directory: {staging}") 287 + else: 288 + raise RuntimeError( 289 + "JOURNAL_PATH not set (required for local mode). " 290 + "Set JOURNAL_PATH or use --remote for remote mode." 291 + ) 292 + 293 + return self._staging_path 294 + 295 + def start(self) -> None: 296 + """Start the backend (connects to Callosum or starts remote client).""" 297 + if self.is_remote: 298 + self._remote_client = RemoteClient(self.remote_url) 299 + self._remote_client.start() 300 + logger.info(f"Remote backend started: {self.remote_url[:50]}...") 301 + else: 302 + self._callosum = CallosumConnection() 303 + self._callosum.start() 304 + logger.info("Local backend started (Callosum)") 305 + 306 + def stop(self) -> None: 307 + """Stop the backend.""" 308 + if self._remote_client: 309 + self._remote_client.stop() 310 + self._remote_client = None 311 + if self._callosum: 312 + self._callosum.stop() 313 + self._callosum = None 314 + 315 + def emit(self, tract: str, event: str, **fields) -> bool: 316 + """Emit an event (status, observing, etc). 317 + 318 + Automatically adds host and platform fields. 319 + 320 + Args: 321 + tract: Event tract (e.g., "observe") 322 + event: Event name (e.g., "status") 323 + **fields: Additional event fields 324 + 325 + Returns: 326 + True if emitted successfully 327 + """ 328 + # Always include host/platform 329 + fields.setdefault("host", HOST) 330 + fields.setdefault("platform", PLATFORM) 331 + 332 + if self._remote_client: 333 + return self._remote_client.emit(tract, event, **fields) 334 + elif self._callosum: 335 + self._callosum.emit(tract, event, **fields) 336 + return True 337 + return False 338 + 339 + def segment_complete( 340 + self, 341 + day: str, 342 + segment: str, 343 + file_paths: list[Path], 344 + ) -> bool: 345 + """Handle segment completion - upload or emit locally. 346 + 347 + In remote mode: uploads files and deletes local copies on success. 348 + In local mode: emits observe.observing event. 349 + 350 + Args: 351 + day: Day string (YYYYMMDD) 352 + segment: Segment key (HHMMSS_LEN) 353 + file_paths: List of file paths in the segment 354 + 355 + Returns: 356 + True if successful 357 + """ 358 + if not file_paths: 359 + return True 360 + 361 + if self._remote_client: 362 + success = self._remote_client.upload_and_cleanup(day, segment, file_paths) 363 + if success: 364 + logger.info(f"Segment uploaded: {segment} ({len(file_paths)} files)") 365 + # Try to clean up empty segment directory 366 + segment_dir = file_paths[0].parent 367 + try: 368 + segment_dir.rmdir() 369 + except OSError: 370 + pass # Directory not empty or other error 371 + else: 372 + logger.error(f"Segment upload failed: {segment} - files kept locally") 373 + return success 374 + 375 + elif self._callosum: 376 + # Local mode: emit observe.observing with file names 377 + file_names = [f.name for f in file_paths] 378 + self._callosum.emit( 379 + "observe", 380 + "observing", 381 + day=day, 382 + segment=segment, 383 + files=file_names, 384 + host=HOST, 385 + platform=PLATFORM, 386 + ) 387 + logger.info(f"Segment observing: {segment} ({len(file_names)} files)") 388 + return True 389 + 390 + return False 391 + 392 + 393 + def staging_day_path(base: Path, day: str) -> Path: 394 + """Create and return day directory under staging base. 395 + 396 + Args: 397 + base: Staging base path (from ObserverBackend.staging_path) 398 + day: Day string (YYYYMMDD) 399 + 400 + Returns: 401 + Path to day directory (created if needed) 402 + """ 403 + day_dir = base / day 404 + day_dir.mkdir(parents=True, exist_ok=True) 405 + return day_dir