linux observer
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 497 lines 19 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""Background sync service for uploading captured segments. 5 6Modeled on solstone-macos's SyncService.swift. Runs as an asyncio 7background task in the same event loop as capture. Walks cache days 8newest-to-oldest, queries server for existing segments, uploads missing ones. 9 10Refinements over tmux baseline: 11- Respects configured sync_max_retries (no hard min(config,3) cap) 12- Circuit breaker tuned by error type: auth=immediate, transient=5-10 13- Transient circuit breaker recovers via half-open probe with exponential backoff 14- Auth/revoked circuit breaker is permanent (requires restart) 15- Synced-days pruning at 90 days to prevent unbounded cache growth 16""" 17 18from __future__ import annotations 19 20import asyncio 21import json 22import logging 23import os 24import shutil 25import time 26from datetime import datetime, timedelta 27from pathlib import Path 28from typing import Any 29 30from .config import Config 31from .upload import ErrorType, UploadClient 32 33logger = logging.getLogger(__name__) 34 35# Circuit breaker thresholds by error type 36CIRCUIT_THRESHOLD_AUTH = 1 # Auth failures open immediately 37CIRCUIT_THRESHOLD_TRANSIENT = 5 # Transient failures need 5 consecutive 38 39# Circuit breaker recovery cooldown 40CIRCUIT_COOLDOWN_INITIAL = 30 # seconds before first probe 41CIRCUIT_COOLDOWN_FACTOR = 2 # multiply cooldown on each failed probe 42CIRCUIT_COOLDOWN_MAX = 300 # cap at 5 minutes 43 44# Synced days older than this are pruned from the cache 45SYNCED_DAYS_MAX_AGE = 90 46 47 48class SyncService: 49 """Background sync service that uploads completed segments to the server.""" 50 51 def __init__(self, config: Config, client: UploadClient): 52 self._config = config 53 self._client = client 54 self._synced_days: set[str] = set() 55 self._consecutive_failures = 0 56 self._last_error_type: ErrorType | None = None 57 self._circuit_open = False 58 self._circuit_open_permanent = False 59 self._circuit_open_since: float = 0.0 60 self._circuit_cooldown: float = CIRCUIT_COOLDOWN_INITIAL 61 self._last_full_sync: float = 0 62 self._running = True 63 self._trigger = asyncio.Event() 64 self.sync_status = "synced" 65 self.sync_progress = "" 66 self._dbus_service = None 67 68 # Load synced days cache 69 self._load_synced_days() 70 71 def _synced_days_path(self) -> Path: 72 return self._config.state_dir / "synced_days.json" 73 74 def _load_synced_days(self) -> None: 75 path = self._synced_days_path() 76 if not path.exists(): 77 return 78 try: 79 with open(path, encoding="utf-8") as f: 80 data = json.load(f) 81 self._synced_days = set(data) if isinstance(data, list) else set() 82 except (json.JSONDecodeError, OSError): 83 self._synced_days = set() 84 85 def _save_synced_days(self) -> None: 86 self._config.state_dir.mkdir(parents=True, exist_ok=True) 87 path = self._synced_days_path() 88 tmp = path.with_suffix(f".{os.getpid()}.tmp") 89 try: 90 with open(tmp, "w", encoding="utf-8") as f: 91 json.dump(sorted(self._synced_days), f) 92 f.write("\n") 93 os.rename(str(tmp), str(path)) 94 except OSError as e: 95 logger.warning(f"Failed to save synced days: {e}") 96 97 def _prune_synced_days(self) -> None: 98 """Remove synced-days entries older than 90 days.""" 99 if not self._synced_days: 100 return 101 cutoff = (datetime.now() - timedelta(days=SYNCED_DAYS_MAX_AGE)).strftime( 102 "%Y%m%d" 103 ) 104 before = len(self._synced_days) 105 self._synced_days = {d for d in self._synced_days if d >= cutoff} 106 pruned = before - len(self._synced_days) 107 if pruned: 108 logger.info( 109 f"Pruned {pruned} synced-days entries older than {SYNCED_DAYS_MAX_AGE} days" 110 ) 111 self._save_synced_days() 112 113 def _quarantine_segment(self, segment_dir: Path, reason: str) -> bool: 114 """Rename a segment directory to .failed so it's never retried.""" 115 failed_path = segment_dir.with_name(segment_dir.name + ".failed") 116 try: 117 segment_dir.rename(failed_path) 118 logger.warning( 119 "Quarantined %s/%s%s", 120 segment_dir.parent.parent.name, 121 segment_dir.name, 122 reason, 123 ) 124 return True 125 except OSError as e: 126 logger.error("Failed to quarantine %s: %s", segment_dir, e) 127 return False 128 129 async def _cleanup_synced_segments(self) -> None: 130 """Delete synced segments older than cache_retention_days. 131 132 Triple-gated safety: 133 1. Day must be in _synced_days (fully synced locally) 134 2. Segment must be older than retention threshold (unless retention=0) 135 3. Segment must be confirmed present on server (fresh query) 136 """ 137 retention = self._config.cache_retention_days 138 if retention < 0: 139 return 140 141 captures_dir = self._config.captures_dir 142 if not captures_dir.exists(): 143 return 144 145 today = datetime.now().strftime("%Y%m%d") 146 if retention > 0: 147 cutoff = (datetime.now() - timedelta(days=retention)).strftime("%Y%m%d") 148 else: 149 cutoff = today # 0 means delete immediately — all days qualify 150 151 deleted_total = 0 152 153 for day_dir in sorted(captures_dir.iterdir()): 154 if not day_dir.is_dir(): 155 continue 156 157 day = day_dir.name 158 159 if not self._running: 160 break 161 162 # Gate 1: day must be in synced_days 163 if day not in self._synced_days: 164 continue 165 166 # Gate 2: day must be old enough (unless retention=0) 167 if retention > 0 and day >= cutoff: 168 continue 169 170 # Don't clean today's segments 171 if day == today: 172 continue 173 174 # Gate 3: fresh server confirmation 175 server_segments = await asyncio.to_thread( 176 self._client.get_server_segments, day 177 ) 178 if server_segments is None: 179 logger.warning("Cleanup: skipping day %s — server unreachable", day) 180 continue 181 182 server_keys: set[str] = set() 183 for seg in server_segments: 184 server_keys.add(seg.get("key", "")) 185 if "original_key" in seg: 186 server_keys.add(seg["original_key"]) 187 188 deleted_day = 0 189 190 for stream_dir in day_dir.iterdir(): 191 if not stream_dir.is_dir(): 192 continue 193 194 for seg_dir in sorted(stream_dir.iterdir()): 195 if not seg_dir.is_dir(): 196 continue 197 198 name = seg_dir.name 199 # Never touch incomplete segments 200 if name.endswith(".incomplete"): 201 continue 202 203 # Delete quarantined (.failed) segments — no server confirmation needed 204 if name.endswith(".failed"): 205 shutil.rmtree(seg_dir) 206 logger.info("Cleanup: deleted quarantined %s/%s", day, name) 207 deleted_day += 1 208 continue 209 210 if name not in server_keys: 211 logger.warning( 212 "Cleanup: keeping %s/%s — not confirmed on server", 213 day, 214 name, 215 ) 216 continue 217 218 shutil.rmtree(seg_dir) 219 logger.info("Cleanup: deleted %s/%s", day, name) 220 deleted_day += 1 221 222 # Remove empty stream dir 223 if stream_dir.is_dir() and not any(stream_dir.iterdir()): 224 stream_dir.rmdir() 225 226 # Remove empty day dir 227 if day_dir.is_dir() and not any(day_dir.iterdir()): 228 day_dir.rmdir() 229 230 if deleted_day: 231 deleted_total += deleted_day 232 233 if deleted_total: 234 logger.info("Cleanup: deleted %d segment(s) total", deleted_total) 235 236 def _circuit_threshold(self) -> int: 237 """Get circuit breaker threshold based on last error type.""" 238 if self._last_error_type == ErrorType.AUTH: 239 return CIRCUIT_THRESHOLD_AUTH 240 return CIRCUIT_THRESHOLD_TRANSIENT 241 242 def trigger(self) -> None: 243 """Trigger a sync pass (called by observer on segment completion).""" 244 self._trigger.set() 245 246 def stop(self) -> None: 247 """Stop the sync service.""" 248 self._running = False 249 self._trigger.set() 250 251 def _set_sync_status(self, status: str, progress: str = "") -> None: 252 """Update sync status and emit D-Bus signal if changed.""" 253 changed = self.sync_status != status or self.sync_progress != progress 254 self.sync_status = status 255 self.sync_progress = progress 256 if changed and self._dbus_service: 257 self._dbus_service.SyncProgressChanged(f"{status}:{progress}") 258 259 async def run(self) -> None: 260 """Main sync loop — waits for triggers, then syncs.""" 261 # Prune on startup 262 self._prune_synced_days() 263 264 while self._running: 265 try: 266 # Wait for trigger or periodic check (60s timeout) 267 try: 268 await asyncio.wait_for(self._trigger.wait(), timeout=60) 269 except asyncio.TimeoutError: 270 pass 271 272 self._trigger.clear() 273 274 if not self._running: 275 break 276 277 if self._circuit_open: 278 if self._circuit_open_permanent: 279 self._set_sync_status("offline") 280 logger.warning( 281 "Circuit breaker open (permanent) — skipping sync" 282 ) 283 continue 284 285 elapsed = time.monotonic() - self._circuit_open_since 286 if elapsed < self._circuit_cooldown: 287 remaining = self._circuit_cooldown - elapsed 288 self._set_sync_status( 289 "retrying", f"{remaining:.0f}s until probe" 290 ) 291 logger.warning( 292 f"Circuit breaker open — {remaining:.0f}s until probe" 293 ) 294 continue 295 296 self._set_sync_status("retrying", "probing server...") 297 logger.info("Circuit breaker half-open — probing server") 298 today = datetime.now().strftime("%Y%m%d") 299 probe_result = await asyncio.to_thread( 300 self._client.get_server_segments, today 301 ) 302 if probe_result is not None: 303 logger.info("Circuit breaker probe succeeded — closing circuit") 304 self._circuit_open = False 305 self._circuit_open_permanent = False 306 self._circuit_open_since = 0.0 307 self._circuit_cooldown = CIRCUIT_COOLDOWN_INITIAL 308 self._consecutive_failures = 0 309 self._last_error_type = None 310 self._set_sync_status("syncing") 311 else: 312 self._circuit_cooldown = min( 313 self._circuit_cooldown * CIRCUIT_COOLDOWN_FACTOR, 314 CIRCUIT_COOLDOWN_MAX, 315 ) 316 self._circuit_open_since = time.monotonic() 317 self._set_sync_status( 318 "retrying", 319 f"probe failed, next in {self._circuit_cooldown:.0f}s", 320 ) 321 logger.warning( 322 f"Circuit breaker probe failed — next probe in {self._circuit_cooldown:.0f}s" 323 ) 324 continue 325 326 # Force full sync daily 327 now = time.time() 328 force_full = (now - self._last_full_sync) > 86400 329 330 self._set_sync_status("syncing") 331 await self._sync(force_full=force_full) 332 self._set_sync_status("synced") 333 334 if force_full: 335 self._last_full_sync = now 336 337 except Exception as e: 338 logger.error(f"Sync error: {e}", exc_info=True) 339 await asyncio.sleep(5) 340 341 async def _sync(self, force_full: bool = False) -> None: 342 """Walk days newest-to-oldest and upload missing segments.""" 343 captures_dir = self._config.captures_dir 344 if not captures_dir.exists(): 345 return 346 347 today = datetime.now().strftime("%Y%m%d") 348 349 # Collect segments by day 350 segments_by_day = self._collect_segments(captures_dir) 351 if not segments_by_day: 352 return 353 354 for day in sorted(segments_by_day.keys(), reverse=True): 355 if not self._running: 356 break 357 358 if self._circuit_open: 359 break 360 361 # Skip past days already fully synced (unless forcing) 362 if day != today and day in self._synced_days and not force_full: 363 continue 364 365 local_segments = segments_by_day[day] 366 367 # Query server for existing segments 368 self._set_sync_status("syncing", f"checking {day}...") 369 server_segments = await asyncio.to_thread( 370 self._client.get_server_segments, day 371 ) 372 if server_segments is None: 373 logger.warning(f"Failed to query server for day {day}") 374 continue 375 376 # Build lookup 377 server_keys: set[str] = set() 378 for seg in server_segments: 379 server_keys.add(seg.get("key", "")) 380 if "original_key" in seg: 381 server_keys.add(seg["original_key"]) 382 383 any_needed_upload = False 384 385 for segment_dir in local_segments: 386 if not self._running or self._circuit_open: 387 break 388 389 segment_key = segment_dir.name 390 if segment_key in server_keys: 391 continue 392 393 # Quarantine segments where all files are zero-byte (corrupt) 394 files = [f for f in segment_dir.iterdir() if f.is_file()] 395 if files and all(f.stat().st_size == 0 for f in files): 396 self._quarantine_segment(segment_dir, "all files zero-byte") 397 continue 398 399 any_needed_upload = True 400 self._set_sync_status("uploading", f"uploading {segment_key}") 401 success = await self._upload_segment(day, segment_dir) 402 403 if not success: 404 if self._last_error_type == ErrorType.CLIENT: 405 # Non-retryable client error (e.g. 400) — quarantine, don't trip circuit 406 self._quarantine_segment( 407 segment_dir, "server rejected (client error)" 408 ) 409 continue 410 411 self._consecutive_failures += 1 412 threshold = self._circuit_threshold() 413 if self._consecutive_failures >= threshold: 414 self._circuit_open = True 415 self._circuit_open_since = time.monotonic() 416 self._circuit_cooldown = CIRCUIT_COOLDOWN_INITIAL 417 logger.error( 418 f"Circuit breaker OPEN: {self._consecutive_failures} consecutive " 419 f"{self._last_error_type.value if self._last_error_type else 'unknown'} " 420 f"failures (threshold: {threshold})" 421 ) 422 self._set_sync_status("retrying") 423 break 424 else: 425 self._consecutive_failures = 0 426 self._last_error_type = None 427 428 # Mark past days as synced if nothing needed upload 429 if day != today and not any_needed_upload: 430 self._synced_days.add(day) 431 self._save_synced_days() 432 433 # Cleanup old synced segments 434 if not self._circuit_open and self._running: 435 try: 436 await self._cleanup_synced_segments() 437 except Exception as e: 438 logger.error(f"Cleanup error: {e}", exc_info=True) 439 440 def _collect_segments(self, captures_dir: Path) -> dict[str, list[Path]]: 441 """Collect completed segments grouped by day.""" 442 result: dict[str, list[Path]] = {} 443 444 for day_dir in sorted(captures_dir.iterdir(), reverse=True): 445 if not day_dir.is_dir(): 446 continue 447 448 day = day_dir.name 449 450 for stream_dir in day_dir.iterdir(): 451 if not stream_dir.is_dir(): 452 continue 453 454 segments = [] 455 for seg_dir in sorted(stream_dir.iterdir(), reverse=True): 456 if not seg_dir.is_dir(): 457 continue 458 name = seg_dir.name 459 # Skip incomplete and failed 460 if name.endswith(".incomplete") or name.endswith(".failed"): 461 continue 462 segments.append(seg_dir) 463 464 if segments: 465 result.setdefault(day, []).extend(segments) 466 467 return result 468 469 async def _upload_segment(self, day: str, segment_dir: Path) -> bool: 470 """Upload a single segment with retry logic.""" 471 segment_key = segment_dir.name 472 files = [f for f in segment_dir.iterdir() if f.is_file()] 473 if not files: 474 return True # Nothing to upload 475 476 meta: dict[str, Any] = {"stream": self._config.stream} 477 478 result = await asyncio.to_thread( 479 self._client.upload_segment, day, segment_key, files, meta 480 ) 481 482 if result.success: 483 logger.info(f"Uploaded: {day}/{segment_key} ({len(files)} files)") 484 return True 485 486 # Track error type for circuit breaker 487 self._last_error_type = result.error_type 488 489 # Non-retryable errors 490 if self._client.is_revoked: 491 logger.error("Client revoked — disabling sync") 492 self._circuit_open = True 493 self._circuit_open_permanent = True 494 return False 495 496 logger.error(f"Upload failed: {day}/{segment_key}") 497 return False