linux observer
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 757 lines 27 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4""" 5Standalone Linux desktop observer — screen + audio capture. 6 7Continuously captures audio and manages screencast recording based on activity. 8Creates 5-minute segments in a local cache directory. The sync service handles 9all uploads — the observer only writes locally. 10 11Key architectural change from monorepo version: 12- Capture writes completed segments to local cache only 13- No ObserverClient usage in boundary handling — no network calls in capture loop 14- Sync service picks up completed segments and uploads asynchronously 15 16State machine: 17 SCREENCAST: Screen is active, recording video 18 IDLE: Screen is inactive 19""" 20 21import asyncio 22import datetime 23import logging 24import os 25import platform 26import signal 27import socket 28import time 29from pathlib import Path 30 31import numpy as np 32from dbus_next.aio import MessageBus 33from dbus_next.constants import BusType 34 35from .activity import ( 36 is_power_save_active, 37 is_screen_locked, 38 probe_activity_services, 39) 40from .audio_mute import is_sink_muted 41from .audio_recorder import AudioRecorder 42from .chat_bridge import run_chat_bridge 43from .config import Config 44from .recovery import write_segment_metadata 45from .screencast import Screencaster, StreamInfo 46from .sync import SyncService 47from .upload import UploadClient 48 49logger = logging.getLogger(__name__) 50 51# Host identification 52HOST = socket.gethostname() 53PLATFORM = platform.system().lower() 54 55# Constants 56RMS_THRESHOLD = 0.01 57MIN_HITS_FOR_SAVE = 3 58CHUNK_DURATION = 5 # seconds 59 60# Capture modes 61MODE_IDLE = "idle" 62MODE_SCREENCAST = "screencast" 63 64# Audio detection retry 65DETECT_RETRIES = 3 66DETECT_RETRY_DELAY = 5 # seconds 67 68 69def _get_timestamp_parts(timestamp: float | None = None) -> tuple[str, str]: 70 """Get date and time parts from timestamp.""" 71 if timestamp is None: 72 timestamp = time.time() 73 dt = datetime.datetime.fromtimestamp(timestamp) 74 return dt.strftime("%Y%m%d"), dt.strftime("%H%M%S") 75 76 77class Observer: 78 """Unified audio and screencast observer with local cache + sync.""" 79 80 def __init__(self, config: Config): 81 self.config = config 82 self.interval = config.segment_interval 83 self.audio_recorder = AudioRecorder() 84 self.screencaster = Screencaster(config.restore_token_path) 85 self.bus: MessageBus | None = None 86 self.running = True 87 self.stream = config.stream 88 89 self._client: UploadClient | None = None 90 self._sync: SyncService | None = None 91 92 # State tracking 93 self.start_at = time.time() 94 self.start_at_mono = time.monotonic() 95 self._start_mono = time.monotonic() 96 self.threshold_hits = 0 97 self.accumulated_audio_buffer = np.array([], dtype=np.float32).reshape(0, 2) 98 99 # Mode tracking 100 self.current_mode = MODE_IDLE 101 102 # Segment directory (HHMMSS.incomplete/) 103 self.segment_dir: Path | None = None 104 105 # Multi-file screencast tracking 106 self.current_streams: list[StreamInfo] = [] 107 108 # Activity status cache (updated each loop) 109 self.cached_is_active = False 110 self.cached_screen_locked = False 111 self.cached_is_muted = False 112 self.cached_power_save = False 113 114 # Mute state at segment start (determines save format) 115 self.segment_is_muted = False 116 117 # Pause state 118 self._paused = False 119 self._pause_until = 0.0 120 121 # D-Bus service interface 122 self._dbus_service = None 123 self._tray = None 124 125 async def setup(self) -> bool: 126 """Initialize audio devices, DBus connection, and sync service.""" 127 # Detect audio devices with retry (devices may still be initializing) 128 detected = False 129 for attempt in range(DETECT_RETRIES): 130 if self.audio_recorder.detect(): 131 detected = True 132 break 133 if attempt < DETECT_RETRIES - 1: 134 logger.info( 135 "Audio detection attempt %d/%d failed, retrying in %ds", 136 attempt + 1, 137 DETECT_RETRIES, 138 DETECT_RETRY_DELAY, 139 ) 140 await asyncio.sleep(DETECT_RETRY_DELAY) 141 if not detected: 142 logger.error("Failed to detect audio devices") 143 return False 144 145 self.audio_recorder.start_recording() 146 logger.info("Audio recording started") 147 148 # Connect to DBus for activity detection 149 self.bus = await MessageBus(bus_type=BusType.SESSION).connect() 150 logger.info("DBus connection established") 151 152 # Probe which activity signals are available (logging only) 153 await probe_activity_services(self.bus) 154 155 # Verify portal is available (exit if not) 156 if not await self.screencaster.connect(): 157 logger.error("Screencast portal not available") 158 return False 159 logger.info("Screencast portal connected") 160 161 # Initialize upload client and sync service 162 self._client = UploadClient(self.config) 163 if self.config.server_url: 164 self._client.ensure_registered(self.config) 165 self._sync = SyncService(self.config, self._client) 166 167 from .dbus_service import BUS_NAME, OBJECT_PATH, ObserverService 168 169 self._dbus_service = ObserverService(self) 170 self.bus.export(OBJECT_PATH, self._dbus_service) 171 await self.bus.request_name(BUS_NAME) 172 self._sync._dbus_service = self._dbus_service 173 logger.info("D-Bus service exported as %s", BUS_NAME) 174 175 # Initialize system tray (graceful: skip if no StatusNotifierWatcher) 176 try: 177 from .tray import TrayApp 178 179 tray = TrayApp(self, self.bus) 180 started = await tray.start() 181 if started: 182 self._tray = tray 183 logger.info("System tray active") 184 else: 185 logger.info("System tray unavailable (no StatusNotifierWatcher)") 186 except Exception as e: 187 logger.info("System tray disabled: %s", e) 188 189 logger.info("Sync service initialized") 190 191 return True 192 193 async def check_activity_status(self) -> str: 194 """Check system activity status and determine capture mode.""" 195 screen_locked = await is_screen_locked(self.bus) 196 power_save = await is_power_save_active(self.bus) 197 sink_muted = await is_sink_muted() 198 199 # Cache values for status events 200 self.cached_screen_locked = screen_locked 201 self.cached_is_muted = sink_muted 202 self.cached_power_save = power_save 203 204 # Determine screen activity 205 screen_idle = screen_locked or power_save 206 screen_active = not screen_idle 207 208 # Determine mode 209 if screen_active: 210 mode = MODE_SCREENCAST 211 else: 212 mode = MODE_IDLE 213 214 # Cache legacy is_active for audio threshold logic 215 has_audio_activity = self.threshold_hits >= MIN_HITS_FOR_SAVE 216 self.cached_is_active = screen_active or has_audio_activity 217 218 return mode 219 220 def compute_rms(self, audio_buffer: np.ndarray) -> float: 221 """Compute per-channel RMS and return maximum (stereo: mic=left, sys=right).""" 222 if audio_buffer.size == 0: 223 return 0.0 224 rms_left = float(np.sqrt(np.mean(audio_buffer[:, 0] ** 2))) 225 rms_right = float(np.sqrt(np.mean(audio_buffer[:, 1] ** 2))) 226 return max(rms_left, rms_right) 227 228 def _save_audio_segment(self, segment_dir: Path, is_muted: bool) -> list[str]: 229 """Save accumulated audio buffer to segment directory.""" 230 if self.accumulated_audio_buffer.size == 0: 231 logger.warning("No audio buffer to save") 232 return [] 233 234 if is_muted: 235 # Split mode: save mic and sys as separate mono files 236 mic_data = self.accumulated_audio_buffer[:, 0] 237 sys_data = self.accumulated_audio_buffer[:, 1] 238 239 mic_bytes = self.audio_recorder.create_mono_flac_bytes(mic_data) 240 sys_bytes = self.audio_recorder.create_mono_flac_bytes(sys_data) 241 242 (segment_dir / "mic_audio.flac").write_bytes(mic_bytes) 243 (segment_dir / "sys_audio.flac").write_bytes(sys_bytes) 244 245 logger.info(f"Saved split audio (muted): {segment_dir}") 246 return ["mic_audio.flac", "sys_audio.flac"] 247 else: 248 # Normal mode: save combined stereo file 249 flac_bytes = self.audio_recorder.create_flac_bytes( 250 self.accumulated_audio_buffer 251 ) 252 (segment_dir / "audio.flac").write_bytes(flac_bytes) 253 254 logger.info(f"Saved audio to {segment_dir}/audio.flac") 255 return ["audio.flac"] 256 257 def _start_segment(self) -> Path: 258 """Start a new segment with .incomplete directory.""" 259 self.start_at = time.time() 260 self.start_at_mono = time.monotonic() 261 262 date_part, time_part = _get_timestamp_parts(self.start_at) 263 captures_dir = self.config.captures_dir 264 265 # Create YYYYMMDD/stream/HHMMSS.incomplete/ 266 segment_dir = captures_dir / date_part / self.stream / f"{time_part}.incomplete" 267 segment_dir.mkdir(parents=True, exist_ok=True) 268 self.segment_dir = segment_dir 269 270 # Write metadata for recovery 271 write_segment_metadata(segment_dir, self.start_at) 272 273 return segment_dir 274 275 def _finalize_segment(self) -> str | None: 276 """Rename .incomplete to HHMMSS_DDD/ and return segment key.""" 277 if not self.segment_dir or not self.segment_dir.exists(): 278 return None 279 280 # Remove .metadata before finalizing 281 meta_path = self.segment_dir / ".metadata" 282 if meta_path.exists(): 283 try: 284 meta_path.unlink() 285 except OSError: 286 pass 287 288 # Check if there are any actual files 289 contents = [f for f in self.segment_dir.iterdir() if f.is_file()] 290 if not contents: 291 # Empty segment, remove it 292 try: 293 os.rmdir(str(self.segment_dir)) 294 except OSError: 295 pass 296 return None 297 298 _, time_part = _get_timestamp_parts(self.start_at) 299 duration = int(time.time() - self.start_at) 300 segment_key = f"{time_part}_{duration}" 301 final_dir = self.segment_dir.parent / segment_key 302 303 try: 304 os.rename(str(self.segment_dir), str(final_dir)) 305 logger.info(f"Segment finalized: {segment_key}") 306 return segment_key 307 except OSError as e: 308 logger.error(f"Failed to finalize segment: {e}") 309 return None 310 311 async def handle_boundary(self, new_mode: str): 312 """Handle window boundary rollover. 313 314 Closes the current segment, writes audio, finalizes to local cache, 315 and triggers sync. No network calls in the capture loop. 316 """ 317 # Stop screencast first (closes file handles) 318 if self.current_mode == MODE_SCREENCAST: 319 logger.info("Stopping previous screencast") 320 await self.screencaster.stop() 321 self.current_streams = [] 322 323 # Save audio if we have enough threshold hits 324 did_save_audio = self.threshold_hits >= MIN_HITS_FOR_SAVE 325 if did_save_audio and self.segment_dir: 326 audio_files = self._save_audio_segment( 327 self.segment_dir, self.segment_is_muted 328 ) 329 if audio_files: 330 logger.info( 331 f"Saved {len(audio_files)} audio file(s) ({self.threshold_hits} hits)" 332 ) 333 else: 334 logger.debug( 335 f"Skipping audio save (only {self.threshold_hits}/{MIN_HITS_FOR_SAVE} hits)" 336 ) 337 338 # Reset audio state 339 self.accumulated_audio_buffer = np.array([], dtype=np.float32).reshape(0, 2) 340 self.threshold_hits = 0 341 342 # Finalize segment (rename .incomplete -> HHMMSS_DDD/) 343 segment_key = self._finalize_segment() 344 self.segment_dir = None 345 346 # Trigger sync to upload the completed segment 347 if segment_key and self._sync: 348 self._sync.trigger() 349 350 # Update segment mute state for new segment 351 self.segment_is_muted = self.cached_is_muted 352 353 # Update mode 354 old_mode = self.current_mode 355 self.current_mode = new_mode 356 357 # Start new capture based on mode 358 if new_mode == MODE_SCREENCAST and not self.cached_screen_locked: 359 await self.initialize_screencast() 360 else: 361 self._start_segment() 362 363 logger.info(f"Mode transition: {old_mode} -> {new_mode}") 364 365 async def initialize_screencast(self) -> bool: 366 """Start a new screencast recording. 367 368 Creates a segment directory and starts GStreamer recording to it. 369 """ 370 segment_dir = self._start_segment() 371 372 try: 373 streams = await self.screencaster.start( 374 str(segment_dir), framerate=1, draw_cursor=True 375 ) 376 except RuntimeError as e: 377 logger.error(f"Failed to start screencast: {e}") 378 raise 379 380 if not streams: 381 logger.error("No streams returned from screencast start") 382 raise RuntimeError("No streams available") 383 384 self.current_streams = streams 385 386 logger.info(f"Started screencast with {len(streams)} stream(s)") 387 for stream in streams: 388 logger.info(f" {stream.position} ({stream.connector}): {stream.file_path}") 389 390 return True 391 392 def emit_status(self): 393 """Emit observe.status event with current state (fire-and-forget).""" 394 if not self._client: 395 return 396 397 elapsed = int(time.monotonic() - self.start_at_mono) 398 399 # Screencast info 400 if self.current_mode == MODE_SCREENCAST and self.current_streams: 401 streams_info = [ 402 { 403 "position": stream.position, 404 "connector": stream.connector, 405 "file": stream.file_path, 406 } 407 for stream in self.current_streams 408 ] 409 screencast_info = { 410 "recording": True, 411 "streams": streams_info, 412 "window_elapsed_seconds": elapsed, 413 } 414 else: 415 screencast_info = {"recording": False} 416 417 # Audio info 418 audio_info = { 419 "threshold_hits": self.threshold_hits, 420 "will_save": self.threshold_hits >= MIN_HITS_FOR_SAVE, 421 } 422 423 # Activity info 424 activity_info = { 425 "active": self.cached_is_active, 426 "screen_locked": self.cached_screen_locked, 427 "sink_muted": self.cached_is_muted, 428 "power_save": self.cached_power_save, 429 } 430 431 self._client.relay_event( 432 "observe", 433 "status", 434 mode=self.current_mode, 435 screencast=screencast_info, 436 audio=audio_info, 437 activity=activity_info, 438 host=HOST, 439 platform=PLATFORM, 440 stream=self.stream, 441 ) 442 443 def _refresh_tray(self): 444 """Refresh the SNI tray UI. Safe when tray is unavailable; disables on failure.""" 445 if self._tray is None: 446 return 447 try: 448 self._tray.update() 449 except Exception: 450 logger.warning("Tray update failed, disabling tray", exc_info=True) 451 self._tray = None 452 453 def pause(self, duration_seconds: int): 454 """Pause capture. duration_seconds=0 means indefinite.""" 455 self._paused = True 456 if duration_seconds > 0: 457 self._pause_until = time.monotonic() + duration_seconds 458 else: 459 self._pause_until = 0.0 460 if self._dbus_service: 461 self._dbus_service.StatusChanged("paused") 462 logger.info("Paused for %ss", duration_seconds) 463 self._refresh_tray() 464 465 def resume(self): 466 """Resume capture from pause.""" 467 self._paused = False 468 self._pause_until = 0.0 469 if self._dbus_service: 470 self._dbus_service.StatusChanged( 471 "recording" if self.current_mode == MODE_SCREENCAST else "idle" 472 ) 473 logger.info("Resumed") 474 self._refresh_tray() 475 476 async def main_loop(self): 477 """Run the main observer loop with background sync.""" 478 logger.info(f"Starting observer loop (interval={self.interval}s)") 479 480 # Start sync service as background task 481 bridge_stop_event = asyncio.Event() 482 bridge_task = None 483 sync_task = None 484 if self._sync: 485 sync_task = asyncio.create_task(self._sync.run()) 486 if self.config.chat_bridge_enabled: 487 bridge_task = asyncio.create_task( 488 run_chat_bridge(self.config, bridge_stop_event) 489 ) 490 491 # Determine initial mode (default to screencast if check fails) 492 try: 493 new_mode = await self.check_activity_status() 494 except Exception as e: 495 logger.warning( 496 "Initial activity check failed: %s — defaulting to screencast", e 497 ) 498 new_mode = MODE_SCREENCAST 499 self.segment_is_muted = self.cached_is_muted 500 self.current_mode = new_mode 501 502 # Start initial capture based on mode 503 if new_mode == MODE_SCREENCAST and not self.cached_screen_locked: 504 try: 505 await self.initialize_screencast() 506 except RuntimeError: 507 self.running = False 508 if sync_task: 509 if self._sync: 510 self._sync.stop() 511 sync_task.cancel() 512 try: 513 await sync_task 514 except asyncio.CancelledError: 515 pass 516 bridge_stop_event.set() 517 if bridge_task: 518 bridge_task.cancel() 519 try: 520 await bridge_task 521 except (asyncio.CancelledError, Exception): 522 pass 523 return 524 else: 525 self._start_segment() 526 527 logger.info(f"Initial mode: {self.current_mode}") 528 529 try: 530 while self.running: 531 await asyncio.sleep(CHUNK_DURATION) 532 533 # Check auto-resume from timed pause 534 if ( 535 self._paused 536 and self._pause_until > 0 537 and time.monotonic() >= self._pause_until 538 ): 539 self._paused = False 540 self._pause_until = 0.0 541 if self._dbus_service: 542 self._dbus_service.StatusChanged( 543 "recording" 544 if self.current_mode == MODE_SCREENCAST 545 else "idle" 546 ) 547 logger.info("Auto-resumed from timed pause") 548 self._refresh_tray() 549 550 # Handle paused state 551 if self._paused: 552 if self.segment_dir: 553 if self.current_mode == MODE_SCREENCAST: 554 await self.screencaster.stop() 555 self.current_streams = [] 556 if self.threshold_hits >= MIN_HITS_FOR_SAVE: 557 self._save_audio_segment( 558 self.segment_dir, self.segment_is_muted 559 ) 560 self.accumulated_audio_buffer = np.array( 561 [], dtype=np.float32 562 ).reshape(0, 2) 563 self.threshold_hits = 0 564 segment_key = self._finalize_segment() 565 self.segment_dir = None 566 if segment_key and self._sync: 567 self._sync.trigger() 568 self.audio_recorder.get_buffers() 569 self.emit_status() 570 self._refresh_tray() 571 continue 572 573 # Resume: start new segment if needed (segment_dir is None after pause) 574 if self.segment_dir is None: 575 try: 576 new_mode = await self.check_activity_status() 577 except Exception: 578 new_mode = self.current_mode 579 self.segment_is_muted = self.cached_is_muted 580 self.current_mode = new_mode 581 if new_mode == MODE_SCREENCAST and not self.cached_screen_locked: 582 try: 583 await self.initialize_screencast() 584 except RuntimeError: 585 self._start_segment() 586 else: 587 self._start_segment() 588 self.emit_status() 589 continue 590 591 # Check activity status and determine new mode 592 try: 593 new_mode = await self.check_activity_status() 594 except Exception as e: 595 logger.warning( 596 "Activity check failed: %s — keeping current mode", e 597 ) 598 new_mode = self.current_mode 599 600 # Check for GStreamer failure mid-recording 601 if ( 602 self.current_mode == MODE_SCREENCAST 603 and not self.screencaster.is_healthy() 604 ): 605 logger.warning("Screencast recording failed, stopping gracefully") 606 await self.screencaster.stop() 607 self.current_streams = [] 608 self.current_mode = MODE_IDLE 609 610 # Detect mode change 611 mode_changed = new_mode != self.current_mode 612 if mode_changed: 613 logger.info(f"Mode changing: {self.current_mode} -> {new_mode}") 614 615 # Only trigger segment boundary on screencast transitions 616 screencast_transition = mode_changed and ( 617 self.current_mode == MODE_SCREENCAST or new_mode == MODE_SCREENCAST 618 ) 619 620 # Detect mute state transition 621 mute_transition = self.cached_is_muted != self.segment_is_muted 622 if mute_transition: 623 logger.info( 624 f"Mute state changed: " 625 f"{'muted' if self.segment_is_muted else 'unmuted'} -> " 626 f"{'muted' if self.cached_is_muted else 'unmuted'}" 627 ) 628 629 # Capture audio buffer for this chunk 630 audio_chunk = self.audio_recorder.get_buffers() 631 632 if audio_chunk.size > 0: 633 self.accumulated_audio_buffer = np.vstack( 634 (self.accumulated_audio_buffer, audio_chunk) 635 ) 636 rms = self.compute_rms(audio_chunk) 637 if rms > RMS_THRESHOLD: 638 self.threshold_hits += 1 639 logger.debug( 640 f"RMS {rms:.4f} > threshold (hit {self.threshold_hits})" 641 ) 642 else: 643 logger.debug(f"RMS {rms:.4f} below threshold") 644 else: 645 logger.debug("No audio data in chunk") 646 647 # Check for window boundary (monotonic to avoid DST/clock jumps) 648 elapsed = time.monotonic() - self.start_at_mono 649 is_boundary = ( 650 (elapsed >= self.interval) 651 or screencast_transition 652 or mute_transition 653 ) 654 655 if is_boundary: 656 logger.info( 657 f"Boundary: elapsed={elapsed:.1f}s screencast_change={screencast_transition} " 658 f"mute_change={mute_transition} " 659 f"hits={self.threshold_hits}/{MIN_HITS_FOR_SAVE}" 660 ) 661 await self.handle_boundary(new_mode) 662 if mode_changed and self._dbus_service: 663 status = "recording" if new_mode == MODE_SCREENCAST else "idle" 664 self._dbus_service.StatusChanged(status) 665 self._refresh_tray() 666 667 # Emit status event 668 self.emit_status() 669 self._refresh_tray() 670 finally: 671 # Cleanup on exit 672 logger.info("Observer loop stopped, cleaning up...") 673 await self.shutdown() 674 if sync_task: 675 if self._sync: 676 self._sync.stop() 677 sync_task.cancel() 678 try: 679 await sync_task 680 except asyncio.CancelledError: 681 pass 682 bridge_stop_event.set() 683 if bridge_task: 684 bridge_task.cancel() 685 try: 686 await bridge_task 687 except (asyncio.CancelledError, Exception): 688 pass 689 690 async def shutdown(self): 691 """Clean shutdown of observer.""" 692 # Stop screencast first (closes file handles) 693 if self.current_mode == MODE_SCREENCAST: 694 logger.info("Stopping screencast for shutdown") 695 await self.screencaster.stop() 696 await asyncio.sleep(0.5) 697 698 # Save final audio if threshold met 699 if self.threshold_hits >= MIN_HITS_FOR_SAVE and self.segment_dir: 700 audio_files = self._save_audio_segment( 701 self.segment_dir, self.segment_is_muted 702 ) 703 if audio_files: 704 logger.info(f"Saved final audio: {len(audio_files)} file(s)") 705 706 # Finalize segment locally 707 segment_key = self._finalize_segment() 708 self.segment_dir = None 709 710 if segment_key: 711 logger.info(f"Finalized segment locally: {segment_key} (shutdown)") 712 713 # Stop audio recorder 714 self.audio_recorder.stop_recording() 715 logger.info("Audio recording stopped") 716 717 if self._client: 718 self._client.stop() 719 self._client = None 720 logger.info("Client stopped") 721 722 723async def async_run(config: Config) -> int: 724 """Async entry point for the observer.""" 725 from .session_env import check_session_ready 726 727 # Pre-flight: check session prerequisites 728 not_ready = check_session_ready() 729 if not_ready: 730 logger.warning("Session not ready: %s", not_ready) 731 return 75 # EXIT_TEMPFAIL 732 733 observer = Observer(config) 734 735 loop = asyncio.get_running_loop() 736 737 def signal_handler(): 738 logger.info("Received shutdown signal") 739 observer.running = False 740 741 for sig in (signal.SIGINT, signal.SIGTERM): 742 loop.add_signal_handler(sig, signal_handler) 743 744 if not await observer.setup(): 745 logger.error("Observer setup failed") 746 return 1 747 748 try: 749 await observer.main_loop() 750 except RuntimeError as e: 751 logger.error(f"Observer runtime error: {e}") 752 return 1 753 except Exception as e: 754 logger.error(f"Observer error: {e}", exc_info=True) 755 return 1 756 757 return 0