linux observer
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""
5Standalone Linux desktop observer — screen + audio capture.
6
7Continuously captures audio and manages screencast recording based on activity.
8Creates 5-minute segments in a local cache directory. The sync service handles
9all uploads — the observer only writes locally.
10
11Key architectural change from monorepo version:
12- Capture writes completed segments to local cache only
13- No ObserverClient usage in boundary handling — no network calls in capture loop
14- Sync service picks up completed segments and uploads asynchronously
15
16State machine:
17 SCREENCAST: Screen is active, recording video
18 IDLE: Screen is inactive
19"""
20
21import asyncio
22import datetime
23import logging
24import os
25import platform
26import signal
27import socket
28import time
29from pathlib import Path
30
31import numpy as np
32from dbus_next.aio import MessageBus
33from dbus_next.constants import BusType
34
35from .activity import (
36 is_power_save_active,
37 is_screen_locked,
38 probe_activity_services,
39)
40from .audio_mute import is_sink_muted
41from .audio_recorder import AudioRecorder
42from .chat_bridge import run_chat_bridge
43from .config import Config
44from .recovery import write_segment_metadata
45from .screencast import Screencaster, StreamInfo
46from .sync import SyncService
47from .upload import UploadClient
48
49logger = logging.getLogger(__name__)
50
51# Host identification
52HOST = socket.gethostname()
53PLATFORM = platform.system().lower()
54
55# Constants
56RMS_THRESHOLD = 0.01
57MIN_HITS_FOR_SAVE = 3
58CHUNK_DURATION = 5 # seconds
59
60# Capture modes
61MODE_IDLE = "idle"
62MODE_SCREENCAST = "screencast"
63
64# Audio detection retry
65DETECT_RETRIES = 3
66DETECT_RETRY_DELAY = 5 # seconds
67
68
69def _get_timestamp_parts(timestamp: float | None = None) -> tuple[str, str]:
70 """Get date and time parts from timestamp."""
71 if timestamp is None:
72 timestamp = time.time()
73 dt = datetime.datetime.fromtimestamp(timestamp)
74 return dt.strftime("%Y%m%d"), dt.strftime("%H%M%S")
75
76
77class Observer:
78 """Unified audio and screencast observer with local cache + sync."""
79
80 def __init__(self, config: Config):
81 self.config = config
82 self.interval = config.segment_interval
83 self.audio_recorder = AudioRecorder()
84 self.screencaster = Screencaster(config.restore_token_path)
85 self.bus: MessageBus | None = None
86 self.running = True
87 self.stream = config.stream
88
89 self._client: UploadClient | None = None
90 self._sync: SyncService | None = None
91
92 # State tracking
93 self.start_at = time.time()
94 self.start_at_mono = time.monotonic()
95 self._start_mono = time.monotonic()
96 self.threshold_hits = 0
97 self.accumulated_audio_buffer = np.array([], dtype=np.float32).reshape(0, 2)
98
99 # Mode tracking
100 self.current_mode = MODE_IDLE
101
102 # Segment directory (HHMMSS.incomplete/)
103 self.segment_dir: Path | None = None
104
105 # Multi-file screencast tracking
106 self.current_streams: list[StreamInfo] = []
107
108 # Activity status cache (updated each loop)
109 self.cached_is_active = False
110 self.cached_screen_locked = False
111 self.cached_is_muted = False
112 self.cached_power_save = False
113
114 # Mute state at segment start (determines save format)
115 self.segment_is_muted = False
116
117 # Pause state
118 self._paused = False
119 self._pause_until = 0.0
120
121 # D-Bus service interface
122 self._dbus_service = None
123 self._tray = None
124
125 async def setup(self) -> bool:
126 """Initialize audio devices, DBus connection, and sync service."""
127 # Detect audio devices with retry (devices may still be initializing)
128 detected = False
129 for attempt in range(DETECT_RETRIES):
130 if self.audio_recorder.detect():
131 detected = True
132 break
133 if attempt < DETECT_RETRIES - 1:
134 logger.info(
135 "Audio detection attempt %d/%d failed, retrying in %ds",
136 attempt + 1,
137 DETECT_RETRIES,
138 DETECT_RETRY_DELAY,
139 )
140 await asyncio.sleep(DETECT_RETRY_DELAY)
141 if not detected:
142 logger.error("Failed to detect audio devices")
143 return False
144
145 self.audio_recorder.start_recording()
146 logger.info("Audio recording started")
147
148 # Connect to DBus for activity detection
149 self.bus = await MessageBus(bus_type=BusType.SESSION).connect()
150 logger.info("DBus connection established")
151
152 # Probe which activity signals are available (logging only)
153 await probe_activity_services(self.bus)
154
155 # Verify portal is available (exit if not)
156 if not await self.screencaster.connect():
157 logger.error("Screencast portal not available")
158 return False
159 logger.info("Screencast portal connected")
160
161 # Initialize upload client and sync service
162 self._client = UploadClient(self.config)
163 if self.config.server_url:
164 self._client.ensure_registered(self.config)
165 self._sync = SyncService(self.config, self._client)
166
167 from .dbus_service import BUS_NAME, OBJECT_PATH, ObserverService
168
169 self._dbus_service = ObserverService(self)
170 self.bus.export(OBJECT_PATH, self._dbus_service)
171 await self.bus.request_name(BUS_NAME)
172 self._sync._dbus_service = self._dbus_service
173 logger.info("D-Bus service exported as %s", BUS_NAME)
174
175 # Initialize system tray (graceful: skip if no StatusNotifierWatcher)
176 try:
177 from .tray import TrayApp
178
179 tray = TrayApp(self, self.bus)
180 started = await tray.start()
181 if started:
182 self._tray = tray
183 logger.info("System tray active")
184 else:
185 logger.info("System tray unavailable (no StatusNotifierWatcher)")
186 except Exception as e:
187 logger.info("System tray disabled: %s", e)
188
189 logger.info("Sync service initialized")
190
191 return True
192
193 async def check_activity_status(self) -> str:
194 """Check system activity status and determine capture mode."""
195 screen_locked = await is_screen_locked(self.bus)
196 power_save = await is_power_save_active(self.bus)
197 sink_muted = await is_sink_muted()
198
199 # Cache values for status events
200 self.cached_screen_locked = screen_locked
201 self.cached_is_muted = sink_muted
202 self.cached_power_save = power_save
203
204 # Determine screen activity
205 screen_idle = screen_locked or power_save
206 screen_active = not screen_idle
207
208 # Determine mode
209 if screen_active:
210 mode = MODE_SCREENCAST
211 else:
212 mode = MODE_IDLE
213
214 # Cache legacy is_active for audio threshold logic
215 has_audio_activity = self.threshold_hits >= MIN_HITS_FOR_SAVE
216 self.cached_is_active = screen_active or has_audio_activity
217
218 return mode
219
220 def compute_rms(self, audio_buffer: np.ndarray) -> float:
221 """Compute per-channel RMS and return maximum (stereo: mic=left, sys=right)."""
222 if audio_buffer.size == 0:
223 return 0.0
224 rms_left = float(np.sqrt(np.mean(audio_buffer[:, 0] ** 2)))
225 rms_right = float(np.sqrt(np.mean(audio_buffer[:, 1] ** 2)))
226 return max(rms_left, rms_right)
227
228 def _save_audio_segment(self, segment_dir: Path, is_muted: bool) -> list[str]:
229 """Save accumulated audio buffer to segment directory."""
230 if self.accumulated_audio_buffer.size == 0:
231 logger.warning("No audio buffer to save")
232 return []
233
234 if is_muted:
235 # Split mode: save mic and sys as separate mono files
236 mic_data = self.accumulated_audio_buffer[:, 0]
237 sys_data = self.accumulated_audio_buffer[:, 1]
238
239 mic_bytes = self.audio_recorder.create_mono_flac_bytes(mic_data)
240 sys_bytes = self.audio_recorder.create_mono_flac_bytes(sys_data)
241
242 (segment_dir / "mic_audio.flac").write_bytes(mic_bytes)
243 (segment_dir / "sys_audio.flac").write_bytes(sys_bytes)
244
245 logger.info(f"Saved split audio (muted): {segment_dir}")
246 return ["mic_audio.flac", "sys_audio.flac"]
247 else:
248 # Normal mode: save combined stereo file
249 flac_bytes = self.audio_recorder.create_flac_bytes(
250 self.accumulated_audio_buffer
251 )
252 (segment_dir / "audio.flac").write_bytes(flac_bytes)
253
254 logger.info(f"Saved audio to {segment_dir}/audio.flac")
255 return ["audio.flac"]
256
257 def _start_segment(self) -> Path:
258 """Start a new segment with .incomplete directory."""
259 self.start_at = time.time()
260 self.start_at_mono = time.monotonic()
261
262 date_part, time_part = _get_timestamp_parts(self.start_at)
263 captures_dir = self.config.captures_dir
264
265 # Create YYYYMMDD/stream/HHMMSS.incomplete/
266 segment_dir = captures_dir / date_part / self.stream / f"{time_part}.incomplete"
267 segment_dir.mkdir(parents=True, exist_ok=True)
268 self.segment_dir = segment_dir
269
270 # Write metadata for recovery
271 write_segment_metadata(segment_dir, self.start_at)
272
273 return segment_dir
274
275 def _finalize_segment(self) -> str | None:
276 """Rename .incomplete to HHMMSS_DDD/ and return segment key."""
277 if not self.segment_dir or not self.segment_dir.exists():
278 return None
279
280 # Remove .metadata before finalizing
281 meta_path = self.segment_dir / ".metadata"
282 if meta_path.exists():
283 try:
284 meta_path.unlink()
285 except OSError:
286 pass
287
288 # Check if there are any actual files
289 contents = [f for f in self.segment_dir.iterdir() if f.is_file()]
290 if not contents:
291 # Empty segment, remove it
292 try:
293 os.rmdir(str(self.segment_dir))
294 except OSError:
295 pass
296 return None
297
298 _, time_part = _get_timestamp_parts(self.start_at)
299 duration = int(time.time() - self.start_at)
300 segment_key = f"{time_part}_{duration}"
301 final_dir = self.segment_dir.parent / segment_key
302
303 try:
304 os.rename(str(self.segment_dir), str(final_dir))
305 logger.info(f"Segment finalized: {segment_key}")
306 return segment_key
307 except OSError as e:
308 logger.error(f"Failed to finalize segment: {e}")
309 return None
310
311 async def handle_boundary(self, new_mode: str):
312 """Handle window boundary rollover.
313
314 Closes the current segment, writes audio, finalizes to local cache,
315 and triggers sync. No network calls in the capture loop.
316 """
317 # Stop screencast first (closes file handles)
318 if self.current_mode == MODE_SCREENCAST:
319 logger.info("Stopping previous screencast")
320 await self.screencaster.stop()
321 self.current_streams = []
322
323 # Save audio if we have enough threshold hits
324 did_save_audio = self.threshold_hits >= MIN_HITS_FOR_SAVE
325 if did_save_audio and self.segment_dir:
326 audio_files = self._save_audio_segment(
327 self.segment_dir, self.segment_is_muted
328 )
329 if audio_files:
330 logger.info(
331 f"Saved {len(audio_files)} audio file(s) ({self.threshold_hits} hits)"
332 )
333 else:
334 logger.debug(
335 f"Skipping audio save (only {self.threshold_hits}/{MIN_HITS_FOR_SAVE} hits)"
336 )
337
338 # Reset audio state
339 self.accumulated_audio_buffer = np.array([], dtype=np.float32).reshape(0, 2)
340 self.threshold_hits = 0
341
342 # Finalize segment (rename .incomplete -> HHMMSS_DDD/)
343 segment_key = self._finalize_segment()
344 self.segment_dir = None
345
346 # Trigger sync to upload the completed segment
347 if segment_key and self._sync:
348 self._sync.trigger()
349
350 # Update segment mute state for new segment
351 self.segment_is_muted = self.cached_is_muted
352
353 # Update mode
354 old_mode = self.current_mode
355 self.current_mode = new_mode
356
357 # Start new capture based on mode
358 if new_mode == MODE_SCREENCAST and not self.cached_screen_locked:
359 await self.initialize_screencast()
360 else:
361 self._start_segment()
362
363 logger.info(f"Mode transition: {old_mode} -> {new_mode}")
364
365 async def initialize_screencast(self) -> bool:
366 """Start a new screencast recording.
367
368 Creates a segment directory and starts GStreamer recording to it.
369 """
370 segment_dir = self._start_segment()
371
372 try:
373 streams = await self.screencaster.start(
374 str(segment_dir), framerate=1, draw_cursor=True
375 )
376 except RuntimeError as e:
377 logger.error(f"Failed to start screencast: {e}")
378 raise
379
380 if not streams:
381 logger.error("No streams returned from screencast start")
382 raise RuntimeError("No streams available")
383
384 self.current_streams = streams
385
386 logger.info(f"Started screencast with {len(streams)} stream(s)")
387 for stream in streams:
388 logger.info(f" {stream.position} ({stream.connector}): {stream.file_path}")
389
390 return True
391
392 def emit_status(self):
393 """Emit observe.status event with current state (fire-and-forget)."""
394 if not self._client:
395 return
396
397 elapsed = int(time.monotonic() - self.start_at_mono)
398
399 # Screencast info
400 if self.current_mode == MODE_SCREENCAST and self.current_streams:
401 streams_info = [
402 {
403 "position": stream.position,
404 "connector": stream.connector,
405 "file": stream.file_path,
406 }
407 for stream in self.current_streams
408 ]
409 screencast_info = {
410 "recording": True,
411 "streams": streams_info,
412 "window_elapsed_seconds": elapsed,
413 }
414 else:
415 screencast_info = {"recording": False}
416
417 # Audio info
418 audio_info = {
419 "threshold_hits": self.threshold_hits,
420 "will_save": self.threshold_hits >= MIN_HITS_FOR_SAVE,
421 }
422
423 # Activity info
424 activity_info = {
425 "active": self.cached_is_active,
426 "screen_locked": self.cached_screen_locked,
427 "sink_muted": self.cached_is_muted,
428 "power_save": self.cached_power_save,
429 }
430
431 self._client.relay_event(
432 "observe",
433 "status",
434 mode=self.current_mode,
435 screencast=screencast_info,
436 audio=audio_info,
437 activity=activity_info,
438 host=HOST,
439 platform=PLATFORM,
440 stream=self.stream,
441 )
442
443 def _refresh_tray(self):
444 """Refresh the SNI tray UI. Safe when tray is unavailable; disables on failure."""
445 if self._tray is None:
446 return
447 try:
448 self._tray.update()
449 except Exception:
450 logger.warning("Tray update failed, disabling tray", exc_info=True)
451 self._tray = None
452
453 def pause(self, duration_seconds: int):
454 """Pause capture. duration_seconds=0 means indefinite."""
455 self._paused = True
456 if duration_seconds > 0:
457 self._pause_until = time.monotonic() + duration_seconds
458 else:
459 self._pause_until = 0.0
460 if self._dbus_service:
461 self._dbus_service.StatusChanged("paused")
462 logger.info("Paused for %ss", duration_seconds)
463 self._refresh_tray()
464
465 def resume(self):
466 """Resume capture from pause."""
467 self._paused = False
468 self._pause_until = 0.0
469 if self._dbus_service:
470 self._dbus_service.StatusChanged(
471 "recording" if self.current_mode == MODE_SCREENCAST else "idle"
472 )
473 logger.info("Resumed")
474 self._refresh_tray()
475
476 async def main_loop(self):
477 """Run the main observer loop with background sync."""
478 logger.info(f"Starting observer loop (interval={self.interval}s)")
479
480 # Start sync service as background task
481 bridge_stop_event = asyncio.Event()
482 bridge_task = None
483 sync_task = None
484 if self._sync:
485 sync_task = asyncio.create_task(self._sync.run())
486 if self.config.chat_bridge_enabled:
487 bridge_task = asyncio.create_task(
488 run_chat_bridge(self.config, bridge_stop_event)
489 )
490
491 # Determine initial mode (default to screencast if check fails)
492 try:
493 new_mode = await self.check_activity_status()
494 except Exception as e:
495 logger.warning(
496 "Initial activity check failed: %s — defaulting to screencast", e
497 )
498 new_mode = MODE_SCREENCAST
499 self.segment_is_muted = self.cached_is_muted
500 self.current_mode = new_mode
501
502 # Start initial capture based on mode
503 if new_mode == MODE_SCREENCAST and not self.cached_screen_locked:
504 try:
505 await self.initialize_screencast()
506 except RuntimeError:
507 self.running = False
508 if sync_task:
509 if self._sync:
510 self._sync.stop()
511 sync_task.cancel()
512 try:
513 await sync_task
514 except asyncio.CancelledError:
515 pass
516 bridge_stop_event.set()
517 if bridge_task:
518 bridge_task.cancel()
519 try:
520 await bridge_task
521 except (asyncio.CancelledError, Exception):
522 pass
523 return
524 else:
525 self._start_segment()
526
527 logger.info(f"Initial mode: {self.current_mode}")
528
529 try:
530 while self.running:
531 await asyncio.sleep(CHUNK_DURATION)
532
533 # Check auto-resume from timed pause
534 if (
535 self._paused
536 and self._pause_until > 0
537 and time.monotonic() >= self._pause_until
538 ):
539 self._paused = False
540 self._pause_until = 0.0
541 if self._dbus_service:
542 self._dbus_service.StatusChanged(
543 "recording"
544 if self.current_mode == MODE_SCREENCAST
545 else "idle"
546 )
547 logger.info("Auto-resumed from timed pause")
548 self._refresh_tray()
549
550 # Handle paused state
551 if self._paused:
552 if self.segment_dir:
553 if self.current_mode == MODE_SCREENCAST:
554 await self.screencaster.stop()
555 self.current_streams = []
556 if self.threshold_hits >= MIN_HITS_FOR_SAVE:
557 self._save_audio_segment(
558 self.segment_dir, self.segment_is_muted
559 )
560 self.accumulated_audio_buffer = np.array(
561 [], dtype=np.float32
562 ).reshape(0, 2)
563 self.threshold_hits = 0
564 segment_key = self._finalize_segment()
565 self.segment_dir = None
566 if segment_key and self._sync:
567 self._sync.trigger()
568 self.audio_recorder.get_buffers()
569 self.emit_status()
570 self._refresh_tray()
571 continue
572
573 # Resume: start new segment if needed (segment_dir is None after pause)
574 if self.segment_dir is None:
575 try:
576 new_mode = await self.check_activity_status()
577 except Exception:
578 new_mode = self.current_mode
579 self.segment_is_muted = self.cached_is_muted
580 self.current_mode = new_mode
581 if new_mode == MODE_SCREENCAST and not self.cached_screen_locked:
582 try:
583 await self.initialize_screencast()
584 except RuntimeError:
585 self._start_segment()
586 else:
587 self._start_segment()
588 self.emit_status()
589 continue
590
591 # Check activity status and determine new mode
592 try:
593 new_mode = await self.check_activity_status()
594 except Exception as e:
595 logger.warning(
596 "Activity check failed: %s — keeping current mode", e
597 )
598 new_mode = self.current_mode
599
600 # Check for GStreamer failure mid-recording
601 if (
602 self.current_mode == MODE_SCREENCAST
603 and not self.screencaster.is_healthy()
604 ):
605 logger.warning("Screencast recording failed, stopping gracefully")
606 await self.screencaster.stop()
607 self.current_streams = []
608 self.current_mode = MODE_IDLE
609
610 # Detect mode change
611 mode_changed = new_mode != self.current_mode
612 if mode_changed:
613 logger.info(f"Mode changing: {self.current_mode} -> {new_mode}")
614
615 # Only trigger segment boundary on screencast transitions
616 screencast_transition = mode_changed and (
617 self.current_mode == MODE_SCREENCAST or new_mode == MODE_SCREENCAST
618 )
619
620 # Detect mute state transition
621 mute_transition = self.cached_is_muted != self.segment_is_muted
622 if mute_transition:
623 logger.info(
624 f"Mute state changed: "
625 f"{'muted' if self.segment_is_muted else 'unmuted'} -> "
626 f"{'muted' if self.cached_is_muted else 'unmuted'}"
627 )
628
629 # Capture audio buffer for this chunk
630 audio_chunk = self.audio_recorder.get_buffers()
631
632 if audio_chunk.size > 0:
633 self.accumulated_audio_buffer = np.vstack(
634 (self.accumulated_audio_buffer, audio_chunk)
635 )
636 rms = self.compute_rms(audio_chunk)
637 if rms > RMS_THRESHOLD:
638 self.threshold_hits += 1
639 logger.debug(
640 f"RMS {rms:.4f} > threshold (hit {self.threshold_hits})"
641 )
642 else:
643 logger.debug(f"RMS {rms:.4f} below threshold")
644 else:
645 logger.debug("No audio data in chunk")
646
647 # Check for window boundary (monotonic to avoid DST/clock jumps)
648 elapsed = time.monotonic() - self.start_at_mono
649 is_boundary = (
650 (elapsed >= self.interval)
651 or screencast_transition
652 or mute_transition
653 )
654
655 if is_boundary:
656 logger.info(
657 f"Boundary: elapsed={elapsed:.1f}s screencast_change={screencast_transition} "
658 f"mute_change={mute_transition} "
659 f"hits={self.threshold_hits}/{MIN_HITS_FOR_SAVE}"
660 )
661 await self.handle_boundary(new_mode)
662 if mode_changed and self._dbus_service:
663 status = "recording" if new_mode == MODE_SCREENCAST else "idle"
664 self._dbus_service.StatusChanged(status)
665 self._refresh_tray()
666
667 # Emit status event
668 self.emit_status()
669 self._refresh_tray()
670 finally:
671 # Cleanup on exit
672 logger.info("Observer loop stopped, cleaning up...")
673 await self.shutdown()
674 if sync_task:
675 if self._sync:
676 self._sync.stop()
677 sync_task.cancel()
678 try:
679 await sync_task
680 except asyncio.CancelledError:
681 pass
682 bridge_stop_event.set()
683 if bridge_task:
684 bridge_task.cancel()
685 try:
686 await bridge_task
687 except (asyncio.CancelledError, Exception):
688 pass
689
690 async def shutdown(self):
691 """Clean shutdown of observer."""
692 # Stop screencast first (closes file handles)
693 if self.current_mode == MODE_SCREENCAST:
694 logger.info("Stopping screencast for shutdown")
695 await self.screencaster.stop()
696 await asyncio.sleep(0.5)
697
698 # Save final audio if threshold met
699 if self.threshold_hits >= MIN_HITS_FOR_SAVE and self.segment_dir:
700 audio_files = self._save_audio_segment(
701 self.segment_dir, self.segment_is_muted
702 )
703 if audio_files:
704 logger.info(f"Saved final audio: {len(audio_files)} file(s)")
705
706 # Finalize segment locally
707 segment_key = self._finalize_segment()
708 self.segment_dir = None
709
710 if segment_key:
711 logger.info(f"Finalized segment locally: {segment_key} (shutdown)")
712
713 # Stop audio recorder
714 self.audio_recorder.stop_recording()
715 logger.info("Audio recording stopped")
716
717 if self._client:
718 self._client.stop()
719 self._client = None
720 logger.info("Client stopped")
721
722
723async def async_run(config: Config) -> int:
724 """Async entry point for the observer."""
725 from .session_env import check_session_ready
726
727 # Pre-flight: check session prerequisites
728 not_ready = check_session_ready()
729 if not_ready:
730 logger.warning("Session not ready: %s", not_ready)
731 return 75 # EXIT_TEMPFAIL
732
733 observer = Observer(config)
734
735 loop = asyncio.get_running_loop()
736
737 def signal_handler():
738 logger.info("Received shutdown signal")
739 observer.running = False
740
741 for sig in (signal.SIGINT, signal.SIGTERM):
742 loop.add_signal_handler(sig, signal_handler)
743
744 if not await observer.setup():
745 logger.error("Observer setup failed")
746 return 1
747
748 try:
749 await observer.main_loop()
750 except RuntimeError as e:
751 logger.error(f"Observer runtime error: {e}")
752 return 1
753 except Exception as e:
754 logger.error(f"Observer error: {e}", exc_info=True)
755 return 1
756
757 return 0