linux observer
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Graceful degradation for non-GNOME desktops

Make activity detection crash-proof so the observer runs on any desktop:
- Wrap get_idle_time_ms() in try/except (was the sole crash cause on KDE)
- Add probe_activity_services() startup diagnostic — logs which DBus
services are available vs missing
- Protect GTK4 imports — module loads without GTK4, monitor geometry
detection degrades gracefully
- Wrap check_activity_status() calls in main loop — transient DBus
errors don't crash the observer

On non-GNOME desktops, observer logs "Activity signals unavailable"
and runs in always-capture mode. Includes ruff formatting fixes.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+177 -51
+80 -12
src/solstone_linux/activity.py
··· 1 1 # SPDX-License-Identifier: AGPL-3.0-only 2 2 # Copyright (c) 2026 sol pbc 3 3 4 - """GNOME-specific activity detection using Mutter and GTK DBus APIs. 4 + """Activity detection using DBus APIs. 5 5 6 - Extracted from solstone's observe/gnome/activity.py. 6 + Extracted from solstone's observe/gnome/activity.py. The DBus services 7 + probed here are GNOME-specific; on other desktops (KDE, etc.) they may 8 + not be available. Every function degrades gracefully — returning a safe 9 + default — so the observer keeps running regardless of desktop environment. 7 10 8 11 Changes from monorepo version: 9 12 - Replaces `from observe.utils import assign_monitor_positions` with local module 10 13 """ 11 14 15 + import logging 12 16 import os 13 17 14 - import gi 15 18 from dbus_next.aio import MessageBus 16 19 17 - gi.require_version("Gdk", "4.0") # noqa: E402 18 - gi.require_version("Gtk", "4.0") # noqa: E402 19 - from gi.repository import Gdk, Gtk # noqa: E402 20 + logger = logging.getLogger(__name__) 21 + 22 + # GTK4/GDK4 — optional, only needed for monitor geometry detection. 23 + # On systems without GTK4, get_monitor_geometries() will raise RuntimeError 24 + # but screencast recording still works (monitors labeled as "monitor-N"). 25 + try: 26 + import gi 27 + 28 + gi.require_version("Gdk", "4.0") 29 + gi.require_version("Gtk", "4.0") 30 + from gi.repository import Gdk, Gtk 31 + 32 + _HAS_GTK = True 33 + except (ImportError, ValueError): 34 + _HAS_GTK = False 20 35 21 36 # DBus service constants 22 37 IDLE_MONITOR_BUS = "org.gnome.Mutter.IdleMonitor" ··· 32 47 DISPLAY_CONFIG_IFACE = "org.gnome.Mutter.DisplayConfig" 33 48 34 49 50 + async def probe_activity_services(bus: MessageBus) -> dict[str, bool]: 51 + """Check which activity DBus services are reachable. 52 + 53 + Returns a dict of service name -> available. Used for startup logging 54 + only — the observer runs regardless of what's available. 55 + """ 56 + services = { 57 + "idle_monitor": IDLE_MONITOR_BUS, 58 + "screensaver": SCREENSAVER_BUS, 59 + "display_config": DISPLAY_CONFIG_BUS, 60 + } 61 + results = {} 62 + for name, bus_name in services.items(): 63 + try: 64 + await bus.introspect(bus_name, "/") 65 + results[name] = True 66 + except Exception: 67 + results[name] = False 68 + 69 + available = [k for k, v in results.items() if v] 70 + missing = [k for k, v in results.items() if not v] 71 + if missing: 72 + logger.warning( 73 + "Activity signals unavailable: %s — observer will assume active", 74 + ", ".join(missing), 75 + ) 76 + if available: 77 + logger.info("Activity signals available: %s", ", ".join(available)) 78 + if not available: 79 + logger.warning( 80 + "No activity signals available (non-GNOME desktop?) " 81 + "— running in always-capture mode" 82 + ) 83 + 84 + results["gtk4"] = _HAS_GTK 85 + if not _HAS_GTK: 86 + logger.warning("GTK4 not available — monitor geometry labels will be missing") 87 + 88 + return results 89 + 90 + 35 91 async def get_idle_time_ms(bus: MessageBus) -> int: 36 92 """ 37 93 Get the current idle time in milliseconds. ··· 40 96 bus: Connected DBus session bus 41 97 42 98 Returns: 43 - Idle time in milliseconds 99 + Idle time in milliseconds, or 0 if the service is unavailable 100 + (0 = assume active, so the observer keeps capturing). 44 101 """ 45 - introspection = await bus.introspect(IDLE_MONITOR_BUS, IDLE_MONITOR_PATH) 46 - proxy_obj = bus.get_proxy_object(IDLE_MONITOR_BUS, IDLE_MONITOR_PATH, introspection) 47 - idle_monitor = proxy_obj.get_interface(IDLE_MONITOR_IFACE) 48 - idle_time = await idle_monitor.call_get_idletime() 49 - return idle_time 102 + try: 103 + introspection = await bus.introspect(IDLE_MONITOR_BUS, IDLE_MONITOR_PATH) 104 + proxy_obj = bus.get_proxy_object( 105 + IDLE_MONITOR_BUS, IDLE_MONITOR_PATH, introspection 106 + ) 107 + idle_monitor = proxy_obj.get_interface(IDLE_MONITOR_IFACE) 108 + idle_time = await idle_monitor.call_get_idletime() 109 + return idle_time 110 + except Exception: 111 + return 0 50 112 51 113 52 114 async def is_screen_locked(bus: MessageBus) -> bool: ··· 97 159 List of dicts with format: 98 160 [{"id": "connector-id", "box": [x1, y1, x2, y2], "position": "center|left|right|..."}, ...] 99 161 where box contains [left, top, right, bottom] coordinates 162 + 163 + Raises: 164 + RuntimeError: If GTK4/GDK4 is not available. 100 165 """ 166 + if not _HAS_GTK: 167 + raise RuntimeError("GTK4 not available for monitor geometry detection") 168 + 101 169 from .monitor_positions import assign_monitor_positions 102 170 103 171 # Initialize GTK before using GDK functions
+18 -7
src/solstone_linux/cli.py
··· 22 22 import sys 23 23 from pathlib import Path 24 24 25 - from .config import Config, load_config, save_config 25 + from .config import load_config, save_config 26 26 from .streams import stream_name 27 27 28 28 ··· 100 100 try: 101 101 result = subprocess.run( 102 102 [sol, "observer", "--json", "create", config.stream], 103 - capture_output=True, text=True, timeout=10, 103 + capture_output=True, 104 + text=True, 105 + timeout=10, 104 106 ) 105 107 if result.returncode == 0: 106 108 data = json.loads(result.stdout) ··· 108 110 save_config(config) 109 111 print(f"Registered (key: {config.key[:8]}...)") 110 112 else: 111 - print(f"CLI registration failed, trying HTTP...") 113 + print("CLI registration failed, trying HTTP...") 112 114 except (subprocess.TimeoutExpired, json.JSONDecodeError, KeyError, OSError): 113 115 print("CLI registration failed, trying HTTP...") 114 116 ··· 119 121 config = load_config() 120 122 print(f"Registered (key: {config.key[:8]}...)") 121 123 else: 122 - print("Warning: registration failed. Run setup again when server is available.") 124 + print( 125 + "Warning: registration failed. Run setup again when server is available." 126 + ) 123 127 else: 124 128 print(f"Already registered (key: {config.key[:8]}...)") 125 129 126 130 print(f"\nConfig saved to {config.config_path}") 127 131 print(f"Captures will go to {config.captures_dir}") 128 - print(f"\nRun 'solstone-linux run' to start, or 'solstone-linux install-service' for systemd.") 132 + print( 133 + "\nRun 'solstone-linux run' to start, or 'solstone-linux install-service' for systemd." 134 + ) 129 135 return 0 130 136 131 137 ··· 134 140 binary = shutil.which("solstone-linux") 135 141 if not binary: 136 142 print("Error: solstone-linux not found on PATH", file=sys.stderr) 137 - print("Install with: pipx install --system-site-packages solstone-linux", file=sys.stderr) 143 + print( 144 + "Install with: pipx install --system-site-packages solstone-linux", 145 + file=sys.stderr, 146 + ) 138 147 return 1 139 148 140 149 unit_dir = Path.home() / ".config" / "systemd" / "user" ··· 223 232 224 233 size_mb = total_size / (1024 * 1024) 225 234 print(f"Cache: {captures_dir}") 226 - print(f" {segment_count} segments across {day_count} day(s), {size_mb:.1f} MB") 235 + print( 236 + f" {segment_count} segments across {day_count} day(s), {size_mb:.1f} MB" 237 + ) 227 238 if incomplete_count: 228 239 print(f" {incomplete_count} incomplete segment(s)") 229 240 else:
+3 -1
src/solstone_linux/config.py
··· 33 33 key: str = "" 34 34 stream: str = "" 35 35 segment_interval: int = DEFAULT_SEGMENT_INTERVAL 36 - sync_retry_delays: list[int] = field(default_factory=lambda: list(DEFAULT_SYNC_RETRY_DELAYS)) 36 + sync_retry_delays: list[int] = field( 37 + default_factory=lambda: list(DEFAULT_SYNC_RETRY_DELAYS) 38 + ) 37 39 sync_max_retries: int = DEFAULT_SYNC_MAX_RETRIES 38 40 base_dir: Path = DEFAULT_BASE_DIR 39 41
+27 -6
src/solstone_linux/observer.py
··· 32 32 from dbus_next.aio import MessageBus 33 33 from dbus_next.constants import BusType 34 34 35 - from .activity import get_idle_time_ms, is_power_save_active, is_screen_locked 35 + from .activity import ( 36 + get_idle_time_ms, 37 + is_power_save_active, 38 + is_screen_locked, 39 + probe_activity_services, 40 + ) 36 41 from .audio_mute import is_sink_muted 37 42 from .audio_recorder import AudioRecorder 38 43 from .config import Config 39 44 from .recovery import write_segment_metadata 40 45 from .screencast import Screencaster, StreamInfo 41 - from .streams import stream_name 42 46 from .sync import SyncService 43 47 from .upload import UploadClient 44 48 ··· 137 141 # Connect to DBus for idle/lock detection 138 142 self.bus = await MessageBus(bus_type=BusType.SESSION).connect() 139 143 logger.info("DBus connection established") 144 + 145 + # Probe which activity signals are available (logging only) 146 + await probe_activity_services(self.bus) 140 147 141 148 # Verify portal is available (exit if not) 142 149 if not await self.screencaster.connect(): ··· 415 422 if self._sync: 416 423 sync_task = asyncio.create_task(self._sync.run()) 417 424 418 - # Determine initial mode 419 - new_mode = await self.check_activity_status() 425 + # Determine initial mode (default to screencast if check fails) 426 + try: 427 + new_mode = await self.check_activity_status() 428 + except Exception as e: 429 + logger.warning( 430 + "Initial activity check failed: %s — defaulting to screencast", e 431 + ) 432 + new_mode = MODE_SCREENCAST 420 433 self.segment_is_muted = self.cached_is_muted 421 434 self.current_mode = new_mode 422 435 ··· 445 458 await asyncio.sleep(CHUNK_DURATION) 446 459 447 460 # Check activity status and determine new mode 448 - new_mode = await self.check_activity_status() 461 + try: 462 + new_mode = await self.check_activity_status() 463 + except Exception as e: 464 + logger.warning( 465 + "Activity check failed: %s — keeping current mode", e 466 + ) 467 + new_mode = self.current_mode 449 468 450 469 # Check for GStreamer failure mid-recording 451 470 if ( ··· 497 516 # Check for window boundary (monotonic to avoid DST/clock jumps) 498 517 elapsed = time.monotonic() - self.start_at_mono 499 518 is_boundary = ( 500 - (elapsed >= self.interval) or screencast_transition or mute_transition 519 + (elapsed >= self.interval) 520 + or screencast_transition 521 + or mute_transition 501 522 ) 502 523 503 524 if is_boundary:
+1 -4
src/solstone_linux/recovery.py
··· 129 129 130 130 # Check there are actual files inside (ignore .metadata) 131 131 try: 132 - contents = [ 133 - f for f in segment_dir.iterdir() 134 - if f.name != METADATA_FILENAME 135 - ] 132 + contents = [f for f in segment_dir.iterdir() if f.name != METADATA_FILENAME] 136 133 if not contents: 137 134 logger.warning(f"Empty incomplete segment: {dir_name}") 138 135 return _mark_failed(segment_dir)
+1
src/solstone_linux/screencast.py
··· 251 251 252 252 # Get monitor info from GDK for connector IDs 253 253 from .activity import get_monitor_geometries 254 + 254 255 try: 255 256 monitors = get_monitor_geometries() 256 257 except Exception as e:
+7 -3
src/solstone_linux/sync.py
··· 30 30 logger = logging.getLogger(__name__) 31 31 32 32 # Circuit breaker thresholds by error type 33 - CIRCUIT_THRESHOLD_AUTH = 1 # Auth failures open immediately 33 + CIRCUIT_THRESHOLD_AUTH = 1 # Auth failures open immediately 34 34 CIRCUIT_THRESHOLD_TRANSIENT = 5 # Transient failures need 5 consecutive 35 35 36 36 # Synced days older than this are pruned from the cache ··· 84 84 """Remove synced-days entries older than 90 days.""" 85 85 if not self._synced_days: 86 86 return 87 - cutoff = (datetime.now() - timedelta(days=SYNCED_DAYS_MAX_AGE)).strftime("%Y%m%d") 87 + cutoff = (datetime.now() - timedelta(days=SYNCED_DAYS_MAX_AGE)).strftime( 88 + "%Y%m%d" 89 + ) 88 90 before = len(self._synced_days) 89 91 self._synced_days = {d for d in self._synced_days if d >= cutoff} 90 92 pruned = before - len(self._synced_days) 91 93 if pruned: 92 - logger.info(f"Pruned {pruned} synced-days entries older than {SYNCED_DAYS_MAX_AGE} days") 94 + logger.info( 95 + f"Pruned {pruned} synced-days entries older than {SYNCED_DAYS_MAX_AGE} days" 96 + ) 93 97 self._save_synced_days() 94 98 95 99 def _circuit_threshold(self) -> int:
+25 -9
src/solstone_linux/upload.py
··· 34 34 35 35 class ErrorType(Enum): 36 36 """Classification of upload errors for circuit breaker tuning.""" 37 - AUTH = "auth" # 401, 403 — open circuit immediately 38 - CLIENT = "client" # 400 — non-retryable, don't count for circuit 39 - TRANSIENT = "transient" # 5xx, network, timeout — allow more failures 37 + 38 + AUTH = "auth" # 401, 403 — open circuit immediately 39 + CLIENT = "client" # 400 — non-retryable, don't count for circuit 40 + TRANSIENT = "transient" # 5xx, network, timeout — allow more failures 40 41 41 42 42 43 class UploadResult(NamedTuple): ··· 85 86 try: 86 87 result = subprocess.run( 87 88 [sol, "observer", "--json", "create", name], 88 - capture_output=True, text=True, timeout=10, 89 + capture_output=True, 90 + text=True, 91 + timeout=10, 89 92 ) 90 93 if result.returncode == 0: 91 94 data = json.loads(result.stdout) ··· 93 96 self._persist_key(config, self._key) 94 97 logger.info(f"CLI-registered as '{name}' (key: {self._key[:8]}...)") 95 98 return True 96 - except (subprocess.TimeoutExpired, json.JSONDecodeError, KeyError, OSError) as e: 99 + except ( 100 + subprocess.TimeoutExpired, 101 + json.JSONDecodeError, 102 + KeyError, 103 + OSError, 104 + ) as e: 97 105 logger.debug(f"CLI registration failed: {e}") 98 106 99 107 if not self._url: ··· 112 120 data = resp.json() 113 121 self._key = data["key"] 114 122 self._persist_key(config, self._key) 115 - logger.info(f"Auto-registered as '{name}' (key: {self._key[:8]}...)") 123 + logger.info( 124 + f"Auto-registered as '{name}' (key: {self._key[:8]}...)" 125 + ) 116 126 return True 117 127 elif resp.status_code == 403: 118 128 self._revoked = True ··· 131 141 return False 132 142 133 143 @staticmethod 134 - def classify_error(status_code: int | None, is_network_error: bool = False) -> ErrorType: 144 + def classify_error( 145 + status_code: int | None, is_network_error: bool = False 146 + ) -> ErrorType: 135 147 """Classify an error for circuit breaker and retry decisions.""" 136 148 if is_network_error: 137 149 return ErrorType.TRANSIENT ··· 153 165 ) -> UploadResult: 154 166 """Upload a segment's files to the ingest server.""" 155 167 if self._revoked or not self._key or not self._url: 156 - return UploadResult(False, error_type=ErrorType.AUTH if self._revoked else None) 168 + return UploadResult( 169 + False, error_type=ErrorType.AUTH if self._revoked else None 170 + ) 157 171 158 172 url = f"{self._url}/app/observer/ingest/{self._key}" 159 173 ··· 222 236 delay = self._retry_backoff[min(attempt, len(self._retry_backoff) - 1)] 223 237 time.sleep(delay) 224 238 225 - logger.error(f"Upload failed after {self._max_retries} attempts: {day}/{segment}") 239 + logger.error( 240 + f"Upload failed after {self._max_retries} attempts: {day}/{segment}" 241 + ) 226 242 return UploadResult(False, error_type=error_type) 227 243 228 244 def get_server_segments(self, day: str) -> list[dict] | None:
+3 -5
tests/test_session_env.py
··· 14 14 15 15 def test_no_display_server(self): 16 16 env = { 17 - k: v for k, v in os.environ.items() 17 + k: v 18 + for k, v in os.environ.items() 18 19 if k not in ("DISPLAY", "WAYLAND_DISPLAY") 19 20 } 20 21 with patch.dict(os.environ, env, clear=True): ··· 24 25 assert "display server" in result 25 26 26 27 def test_no_dbus(self): 27 - env = { 28 - k: v for k, v in os.environ.items() 29 - if k != "DBUS_SESSION_BUS_ADDRESS" 30 - } 28 + env = {k: v for k, v in os.environ.items() if k != "DBUS_SESSION_BUS_ADDRESS"} 31 29 env["DISPLAY"] = ":0" 32 30 with patch.dict(os.environ, env, clear=True): 33 31 with patch("solstone_linux.session_env._recover_session_env"):
+12 -4
tests/test_sync.py
··· 5 5 import os 6 6 import time 7 7 from pathlib import Path 8 - from unittest.mock import MagicMock, patch 9 8 10 9 from solstone_linux.config import Config 11 10 from solstone_linux.recovery import recover_incomplete_segments 12 - from solstone_linux.upload import ErrorType, UploadClient, UploadResult 11 + from solstone_linux.upload import ErrorType, UploadClient 13 12 14 13 15 14 class TestRecovery: 16 15 """Test crash recovery for incomplete segments.""" 17 16 18 17 def _make_incomplete( 19 - self, captures_dir: Path, day: str, stream: str, time_prefix: str, age: int = 300 18 + self, 19 + captures_dir: Path, 20 + day: str, 21 + stream: str, 22 + time_prefix: str, 23 + age: int = 300, 20 24 ) -> Path: 21 25 """Create an incomplete segment directory with a dummy file.""" 22 26 seg_dir = captures_dir / day / stream / f"{time_prefix}.incomplete" ··· 161 165 162 166 # Add entries spanning 100 days 163 167 from datetime import datetime, timedelta 168 + 164 169 today = datetime.now() 165 170 for i in range(100): 166 171 day = (today - timedelta(days=i)).strftime("%Y%m%d") ··· 188 193 assert UploadClient.classify_error(503) == ErrorType.TRANSIENT 189 194 190 195 def test_network_errors(self): 191 - assert UploadClient.classify_error(None, is_network_error=True) == ErrorType.TRANSIENT 196 + assert ( 197 + UploadClient.classify_error(None, is_network_error=True) 198 + == ErrorType.TRANSIENT 199 + ) 192 200 193 201 def test_unknown_status(self): 194 202 assert UploadClient.classify_error(418) == ErrorType.TRANSIENT