linux observer
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""Background sync service for uploading captured segments.
5
6Modeled on solstone-macos's SyncService.swift. Runs as an asyncio
7background task in the same event loop as capture. Walks cache days
8newest-to-oldest, queries server for existing segments, uploads missing ones.
9
10Refinements over tmux baseline:
11- Respects configured sync_max_retries (no hard min(config,3) cap)
12- Circuit breaker tuned by error type: auth=immediate, transient=5-10
13- Transient circuit breaker recovers via half-open probe with exponential backoff
14- Auth/revoked circuit breaker is permanent (requires restart)
15- Synced-days pruning at 90 days to prevent unbounded cache growth
16"""
17
18from __future__ import annotations
19
20import asyncio
21import json
22import logging
23import os
24import shutil
25import time
26from datetime import datetime, timedelta
27from pathlib import Path
28from typing import Any
29
30from .config import Config
31from .upload import ErrorType, UploadClient
32
33logger = logging.getLogger(__name__)
34
35# Circuit breaker thresholds by error type
36CIRCUIT_THRESHOLD_AUTH = 1 # Auth failures open immediately
37CIRCUIT_THRESHOLD_TRANSIENT = 5 # Transient failures need 5 consecutive
38
39# Circuit breaker recovery cooldown
40CIRCUIT_COOLDOWN_INITIAL = 30 # seconds before first probe
41CIRCUIT_COOLDOWN_FACTOR = 2 # multiply cooldown on each failed probe
42CIRCUIT_COOLDOWN_MAX = 300 # cap at 5 minutes
43
44# Synced days older than this are pruned from the cache
45SYNCED_DAYS_MAX_AGE = 90
46
47
48class SyncService:
49 """Background sync service that uploads completed segments to the server."""
50
51 def __init__(self, config: Config, client: UploadClient):
52 self._config = config
53 self._client = client
54 self._synced_days: set[str] = set()
55 self._consecutive_failures = 0
56 self._last_error_type: ErrorType | None = None
57 self._circuit_open = False
58 self._circuit_open_permanent = False
59 self._circuit_open_since: float = 0.0
60 self._circuit_cooldown: float = CIRCUIT_COOLDOWN_INITIAL
61 self._last_full_sync: float = 0
62 self._running = True
63 self._trigger = asyncio.Event()
64 self.sync_status = "synced"
65 self.sync_progress = ""
66 self._dbus_service = None
67
68 # Load synced days cache
69 self._load_synced_days()
70
71 def _synced_days_path(self) -> Path:
72 return self._config.state_dir / "synced_days.json"
73
74 def _load_synced_days(self) -> None:
75 path = self._synced_days_path()
76 if not path.exists():
77 return
78 try:
79 with open(path, encoding="utf-8") as f:
80 data = json.load(f)
81 self._synced_days = set(data) if isinstance(data, list) else set()
82 except (json.JSONDecodeError, OSError):
83 self._synced_days = set()
84
85 def _save_synced_days(self) -> None:
86 self._config.state_dir.mkdir(parents=True, exist_ok=True)
87 path = self._synced_days_path()
88 tmp = path.with_suffix(f".{os.getpid()}.tmp")
89 try:
90 with open(tmp, "w", encoding="utf-8") as f:
91 json.dump(sorted(self._synced_days), f)
92 f.write("\n")
93 os.rename(str(tmp), str(path))
94 except OSError as e:
95 logger.warning(f"Failed to save synced days: {e}")
96
97 def _prune_synced_days(self) -> None:
98 """Remove synced-days entries older than 90 days."""
99 if not self._synced_days:
100 return
101 cutoff = (datetime.now() - timedelta(days=SYNCED_DAYS_MAX_AGE)).strftime(
102 "%Y%m%d"
103 )
104 before = len(self._synced_days)
105 self._synced_days = {d for d in self._synced_days if d >= cutoff}
106 pruned = before - len(self._synced_days)
107 if pruned:
108 logger.info(
109 f"Pruned {pruned} synced-days entries older than {SYNCED_DAYS_MAX_AGE} days"
110 )
111 self._save_synced_days()
112
113 def _quarantine_segment(self, segment_dir: Path, reason: str) -> bool:
114 """Rename a segment directory to .failed so it's never retried."""
115 failed_path = segment_dir.with_name(segment_dir.name + ".failed")
116 try:
117 segment_dir.rename(failed_path)
118 logger.warning(
119 "Quarantined %s/%s — %s",
120 segment_dir.parent.parent.name,
121 segment_dir.name,
122 reason,
123 )
124 return True
125 except OSError as e:
126 logger.error("Failed to quarantine %s: %s", segment_dir, e)
127 return False
128
129 async def _cleanup_synced_segments(self) -> None:
130 """Delete synced segments older than cache_retention_days.
131
132 Triple-gated safety:
133 1. Day must be in _synced_days (fully synced locally)
134 2. Segment must be older than retention threshold (unless retention=0)
135 3. Segment must be confirmed present on server (fresh query)
136 """
137 retention = self._config.cache_retention_days
138 if retention < 0:
139 return
140
141 captures_dir = self._config.captures_dir
142 if not captures_dir.exists():
143 return
144
145 today = datetime.now().strftime("%Y%m%d")
146 if retention > 0:
147 cutoff = (datetime.now() - timedelta(days=retention)).strftime("%Y%m%d")
148 else:
149 cutoff = today # 0 means delete immediately — all days qualify
150
151 deleted_total = 0
152
153 for day_dir in sorted(captures_dir.iterdir()):
154 if not day_dir.is_dir():
155 continue
156
157 day = day_dir.name
158
159 if not self._running:
160 break
161
162 # Gate 1: day must be in synced_days
163 if day not in self._synced_days:
164 continue
165
166 # Gate 2: day must be old enough (unless retention=0)
167 if retention > 0 and day >= cutoff:
168 continue
169
170 # Don't clean today's segments
171 if day == today:
172 continue
173
174 # Gate 3: fresh server confirmation
175 server_segments = await asyncio.to_thread(
176 self._client.get_server_segments, day
177 )
178 if server_segments is None:
179 logger.warning("Cleanup: skipping day %s — server unreachable", day)
180 continue
181
182 server_keys: set[str] = set()
183 for seg in server_segments:
184 server_keys.add(seg.get("key", ""))
185 if "original_key" in seg:
186 server_keys.add(seg["original_key"])
187
188 deleted_day = 0
189
190 for stream_dir in day_dir.iterdir():
191 if not stream_dir.is_dir():
192 continue
193
194 for seg_dir in sorted(stream_dir.iterdir()):
195 if not seg_dir.is_dir():
196 continue
197
198 name = seg_dir.name
199 # Never touch incomplete segments
200 if name.endswith(".incomplete"):
201 continue
202
203 # Delete quarantined (.failed) segments — no server confirmation needed
204 if name.endswith(".failed"):
205 shutil.rmtree(seg_dir)
206 logger.info("Cleanup: deleted quarantined %s/%s", day, name)
207 deleted_day += 1
208 continue
209
210 if name not in server_keys:
211 logger.warning(
212 "Cleanup: keeping %s/%s — not confirmed on server",
213 day,
214 name,
215 )
216 continue
217
218 shutil.rmtree(seg_dir)
219 logger.info("Cleanup: deleted %s/%s", day, name)
220 deleted_day += 1
221
222 # Remove empty stream dir
223 if stream_dir.is_dir() and not any(stream_dir.iterdir()):
224 stream_dir.rmdir()
225
226 # Remove empty day dir
227 if day_dir.is_dir() and not any(day_dir.iterdir()):
228 day_dir.rmdir()
229
230 if deleted_day:
231 deleted_total += deleted_day
232
233 if deleted_total:
234 logger.info("Cleanup: deleted %d segment(s) total", deleted_total)
235
236 def _circuit_threshold(self) -> int:
237 """Get circuit breaker threshold based on last error type."""
238 if self._last_error_type == ErrorType.AUTH:
239 return CIRCUIT_THRESHOLD_AUTH
240 return CIRCUIT_THRESHOLD_TRANSIENT
241
242 def trigger(self) -> None:
243 """Trigger a sync pass (called by observer on segment completion)."""
244 self._trigger.set()
245
246 def stop(self) -> None:
247 """Stop the sync service."""
248 self._running = False
249 self._trigger.set()
250
251 def _set_sync_status(self, status: str, progress: str = "") -> None:
252 """Update sync status and emit D-Bus signal if changed."""
253 changed = self.sync_status != status or self.sync_progress != progress
254 self.sync_status = status
255 self.sync_progress = progress
256 if changed and self._dbus_service:
257 self._dbus_service.SyncProgressChanged(f"{status}:{progress}")
258
259 async def run(self) -> None:
260 """Main sync loop — waits for triggers, then syncs."""
261 # Prune on startup
262 self._prune_synced_days()
263
264 while self._running:
265 try:
266 # Wait for trigger or periodic check (60s timeout)
267 try:
268 await asyncio.wait_for(self._trigger.wait(), timeout=60)
269 except asyncio.TimeoutError:
270 pass
271
272 self._trigger.clear()
273
274 if not self._running:
275 break
276
277 if self._circuit_open:
278 if self._circuit_open_permanent:
279 self._set_sync_status("offline")
280 logger.warning(
281 "Circuit breaker open (permanent) — skipping sync"
282 )
283 continue
284
285 elapsed = time.monotonic() - self._circuit_open_since
286 if elapsed < self._circuit_cooldown:
287 remaining = self._circuit_cooldown - elapsed
288 self._set_sync_status(
289 "retrying", f"{remaining:.0f}s until probe"
290 )
291 logger.warning(
292 f"Circuit breaker open — {remaining:.0f}s until probe"
293 )
294 continue
295
296 self._set_sync_status("retrying", "probing server...")
297 logger.info("Circuit breaker half-open — probing server")
298 today = datetime.now().strftime("%Y%m%d")
299 probe_result = await asyncio.to_thread(
300 self._client.get_server_segments, today
301 )
302 if probe_result is not None:
303 logger.info("Circuit breaker probe succeeded — closing circuit")
304 self._circuit_open = False
305 self._circuit_open_permanent = False
306 self._circuit_open_since = 0.0
307 self._circuit_cooldown = CIRCUIT_COOLDOWN_INITIAL
308 self._consecutive_failures = 0
309 self._last_error_type = None
310 self._set_sync_status("syncing")
311 else:
312 self._circuit_cooldown = min(
313 self._circuit_cooldown * CIRCUIT_COOLDOWN_FACTOR,
314 CIRCUIT_COOLDOWN_MAX,
315 )
316 self._circuit_open_since = time.monotonic()
317 self._set_sync_status(
318 "retrying",
319 f"probe failed, next in {self._circuit_cooldown:.0f}s",
320 )
321 logger.warning(
322 f"Circuit breaker probe failed — next probe in {self._circuit_cooldown:.0f}s"
323 )
324 continue
325
326 # Force full sync daily
327 now = time.time()
328 force_full = (now - self._last_full_sync) > 86400
329
330 self._set_sync_status("syncing")
331 await self._sync(force_full=force_full)
332 self._set_sync_status("synced")
333
334 if force_full:
335 self._last_full_sync = now
336
337 except Exception as e:
338 logger.error(f"Sync error: {e}", exc_info=True)
339 await asyncio.sleep(5)
340
341 async def _sync(self, force_full: bool = False) -> None:
342 """Walk days newest-to-oldest and upload missing segments."""
343 captures_dir = self._config.captures_dir
344 if not captures_dir.exists():
345 return
346
347 today = datetime.now().strftime("%Y%m%d")
348
349 # Collect segments by day
350 segments_by_day = self._collect_segments(captures_dir)
351 if not segments_by_day:
352 return
353
354 for day in sorted(segments_by_day.keys(), reverse=True):
355 if not self._running:
356 break
357
358 if self._circuit_open:
359 break
360
361 # Skip past days already fully synced (unless forcing)
362 if day != today and day in self._synced_days and not force_full:
363 continue
364
365 local_segments = segments_by_day[day]
366
367 # Query server for existing segments
368 self._set_sync_status("syncing", f"checking {day}...")
369 server_segments = await asyncio.to_thread(
370 self._client.get_server_segments, day
371 )
372 if server_segments is None:
373 logger.warning(f"Failed to query server for day {day}")
374 continue
375
376 # Build lookup
377 server_keys: set[str] = set()
378 for seg in server_segments:
379 server_keys.add(seg.get("key", ""))
380 if "original_key" in seg:
381 server_keys.add(seg["original_key"])
382
383 any_needed_upload = False
384
385 for segment_dir in local_segments:
386 if not self._running or self._circuit_open:
387 break
388
389 segment_key = segment_dir.name
390 if segment_key in server_keys:
391 continue
392
393 # Quarantine segments where all files are zero-byte (corrupt)
394 files = [f for f in segment_dir.iterdir() if f.is_file()]
395 if files and all(f.stat().st_size == 0 for f in files):
396 self._quarantine_segment(segment_dir, "all files zero-byte")
397 continue
398
399 any_needed_upload = True
400 self._set_sync_status("uploading", f"uploading {segment_key}")
401 success = await self._upload_segment(day, segment_dir)
402
403 if not success:
404 if self._last_error_type == ErrorType.CLIENT:
405 # Non-retryable client error (e.g. 400) — quarantine, don't trip circuit
406 self._quarantine_segment(
407 segment_dir, "server rejected (client error)"
408 )
409 continue
410
411 self._consecutive_failures += 1
412 threshold = self._circuit_threshold()
413 if self._consecutive_failures >= threshold:
414 self._circuit_open = True
415 self._circuit_open_since = time.monotonic()
416 self._circuit_cooldown = CIRCUIT_COOLDOWN_INITIAL
417 logger.error(
418 f"Circuit breaker OPEN: {self._consecutive_failures} consecutive "
419 f"{self._last_error_type.value if self._last_error_type else 'unknown'} "
420 f"failures (threshold: {threshold})"
421 )
422 self._set_sync_status("retrying")
423 break
424 else:
425 self._consecutive_failures = 0
426 self._last_error_type = None
427
428 # Mark past days as synced if nothing needed upload
429 if day != today and not any_needed_upload:
430 self._synced_days.add(day)
431 self._save_synced_days()
432
433 # Cleanup old synced segments
434 if not self._circuit_open and self._running:
435 try:
436 await self._cleanup_synced_segments()
437 except Exception as e:
438 logger.error(f"Cleanup error: {e}", exc_info=True)
439
440 def _collect_segments(self, captures_dir: Path) -> dict[str, list[Path]]:
441 """Collect completed segments grouped by day."""
442 result: dict[str, list[Path]] = {}
443
444 for day_dir in sorted(captures_dir.iterdir(), reverse=True):
445 if not day_dir.is_dir():
446 continue
447
448 day = day_dir.name
449
450 for stream_dir in day_dir.iterdir():
451 if not stream_dir.is_dir():
452 continue
453
454 segments = []
455 for seg_dir in sorted(stream_dir.iterdir(), reverse=True):
456 if not seg_dir.is_dir():
457 continue
458 name = seg_dir.name
459 # Skip incomplete and failed
460 if name.endswith(".incomplete") or name.endswith(".failed"):
461 continue
462 segments.append(seg_dir)
463
464 if segments:
465 result.setdefault(day, []).extend(segments)
466
467 return result
468
469 async def _upload_segment(self, day: str, segment_dir: Path) -> bool:
470 """Upload a single segment with retry logic."""
471 segment_key = segment_dir.name
472 files = [f for f in segment_dir.iterdir() if f.is_file()]
473 if not files:
474 return True # Nothing to upload
475
476 meta: dict[str, Any] = {"stream": self._config.stream}
477
478 result = await asyncio.to_thread(
479 self._client.upload_segment, day, segment_key, files, meta
480 )
481
482 if result.success:
483 logger.info(f"Uploaded: {day}/{segment_key} ({len(files)} files)")
484 return True
485
486 # Track error type for circuit breaker
487 self._last_error_type = result.error_type
488
489 # Non-retryable errors
490 if self._client.is_revoked:
491 logger.error("Client revoked — disabling sync")
492 self._circuit_open = True
493 self._circuit_open_permanent = True
494 return False
495
496 logger.error(f"Upload failed: {day}/{segment_key}")
497 return False