linux observer
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add half-open recovery to sync circuit breaker for transient failures

A 19-hour server outage revealed that once the circuit breaker tripped on
transient errors (5xx/network), sync was permanently disabled until service
restart. Add cooldown-based half-open state with exponential backoff
(30s → 60s → 120s → 300s cap) that probes the server via get_server_segments
before resuming full sync. Auth/revoked circuit stays permanently open.

+189 -2
+50 -2
src/solstone_linux/sync.py
··· 10 10 Refinements over tmux baseline: 11 11 - Respects configured sync_max_retries (no hard min(config,3) cap) 12 12 - Circuit breaker tuned by error type: auth=immediate, transient=5-10 13 + - Transient circuit breaker recovers via half-open probe with exponential backoff 14 + - Auth/revoked circuit breaker is permanent (requires restart) 13 15 - Synced-days pruning at 90 days to prevent unbounded cache growth 14 16 """ 15 17 ··· 33 35 CIRCUIT_THRESHOLD_AUTH = 1 # Auth failures open immediately 34 36 CIRCUIT_THRESHOLD_TRANSIENT = 5 # Transient failures need 5 consecutive 35 37 38 + # Circuit breaker recovery cooldown 39 + CIRCUIT_COOLDOWN_INITIAL = 30 # seconds before first probe 40 + CIRCUIT_COOLDOWN_FACTOR = 2 # multiply cooldown on each failed probe 41 + CIRCUIT_COOLDOWN_MAX = 300 # cap at 5 minutes 42 + 36 43 # Synced days older than this are pruned from the cache 37 44 SYNCED_DAYS_MAX_AGE = 90 38 45 ··· 47 54 self._consecutive_failures = 0 48 55 self._last_error_type: ErrorType | None = None 49 56 self._circuit_open = False 57 + self._circuit_open_permanent = False 58 + self._circuit_open_since: float = 0.0 59 + self._circuit_cooldown: float = CIRCUIT_COOLDOWN_INITIAL 50 60 self._last_full_sync: float = 0 51 61 self._running = True 52 62 self._trigger = asyncio.Event() ··· 130 140 break 131 141 132 142 if self._circuit_open: 133 - logger.warning("Circuit breaker open — skipping sync") 134 - continue 143 + if self._circuit_open_permanent: 144 + logger.warning( 145 + "Circuit breaker open (permanent) — skipping sync" 146 + ) 147 + continue 148 + 149 + elapsed = time.monotonic() - self._circuit_open_since 150 + if elapsed < self._circuit_cooldown: 151 + remaining = self._circuit_cooldown - elapsed 152 + logger.warning( 153 + f"Circuit breaker open — {remaining:.0f}s until probe" 154 + ) 155 + continue 156 + 157 + logger.info("Circuit breaker half-open — probing server") 158 + today = datetime.now().strftime("%Y%m%d") 159 + probe_result = await asyncio.to_thread( 160 + self._client.get_server_segments, today 161 + ) 162 + if probe_result is not None: 163 + logger.info("Circuit breaker probe succeeded — closing circuit") 164 + self._circuit_open = False 165 + self._circuit_open_permanent = False 166 + self._circuit_open_since = 0.0 167 + self._circuit_cooldown = CIRCUIT_COOLDOWN_INITIAL 168 + self._consecutive_failures = 0 169 + self._last_error_type = None 170 + else: 171 + self._circuit_cooldown = min( 172 + self._circuit_cooldown * CIRCUIT_COOLDOWN_FACTOR, 173 + CIRCUIT_COOLDOWN_MAX, 174 + ) 175 + self._circuit_open_since = time.monotonic() 176 + logger.warning( 177 + f"Circuit breaker probe failed — next probe in {self._circuit_cooldown:.0f}s" 178 + ) 179 + continue 135 180 136 181 # Force full sync daily 137 182 now = time.time() ··· 205 250 threshold = self._circuit_threshold() 206 251 if self._consecutive_failures >= threshold: 207 252 self._circuit_open = True 253 + self._circuit_open_since = time.monotonic() 254 + self._circuit_cooldown = CIRCUIT_COOLDOWN_INITIAL 208 255 logger.error( 209 256 f"Circuit breaker OPEN: {self._consecutive_failures} consecutive " 210 257 f"{self._last_error_type.value if self._last_error_type else 'unknown'} " ··· 273 320 if self._client.is_revoked: 274 321 logger.error("Client revoked — disabling sync") 275 322 self._circuit_open = True 323 + self._circuit_open_permanent = True 276 324 return False 277 325 278 326 logger.error(f"Upload failed: {day}/{segment_key}")
+139
tests/test_sync.py
··· 1 1 # SPDX-License-Identifier: AGPL-3.0-only 2 2 # Copyright (c) 2026 sol pbc 3 3 4 + import asyncio 4 5 import json 5 6 import os 6 7 import time 7 8 from pathlib import Path 9 + from unittest.mock import AsyncMock, patch 10 + 11 + import pytest 8 12 9 13 from solstone_linux.config import Config 10 14 from solstone_linux.recovery import recover_incomplete_segments 15 + from solstone_linux.sync import ( 16 + CIRCUIT_COOLDOWN_INITIAL, 17 + CIRCUIT_COOLDOWN_MAX, 18 + SyncService, 19 + ) 11 20 from solstone_linux.upload import ErrorType, UploadClient 12 21 13 22 ··· 228 237 sync._last_error_type = ErrorType.TRANSIENT 229 238 assert sync._circuit_threshold() == CIRCUIT_THRESHOLD_TRANSIENT 230 239 assert CIRCUIT_THRESHOLD_TRANSIENT >= 5 240 + 241 + 242 + class TestCircuitBreakerRecovery: 243 + """Test circuit breaker recovery for transient failures.""" 244 + 245 + def _make_sync(self, tmp_path: Path) -> SyncService: 246 + """Create a SyncService with minimal config.""" 247 + config = Config(base_dir=tmp_path) 248 + config.ensure_dirs() 249 + client = UploadClient(config) 250 + return SyncService(config, client) 251 + 252 + async def _run_briefly(self, sync: SyncService) -> None: 253 + sync._trigger.set() 254 + task = asyncio.create_task(sync.run()) 255 + await asyncio.sleep(0.01) 256 + sync.stop() 257 + await asyncio.wait_for(task, timeout=1) 258 + 259 + @pytest.mark.asyncio 260 + async def test_transient_circuit_recovers_after_cooldown(self, tmp_path: Path): 261 + sync = self._make_sync(tmp_path) 262 + sync._circuit_open = True 263 + sync._circuit_open_permanent = False 264 + sync._circuit_open_since = time.monotonic() - 31 265 + sync._circuit_cooldown = CIRCUIT_COOLDOWN_INITIAL 266 + sync._consecutive_failures = 5 267 + sync._last_error_type = ErrorType.TRANSIENT 268 + sync._sync = AsyncMock(side_effect=lambda force_full=False: sync.stop()) 269 + sync._trigger.set() 270 + 271 + with patch("asyncio.to_thread", new_callable=AsyncMock, return_value=[]): 272 + await sync.run() 273 + 274 + assert not sync._circuit_open 275 + assert sync._consecutive_failures == 0 276 + assert sync._last_error_type is None 277 + assert sync._circuit_cooldown == CIRCUIT_COOLDOWN_INITIAL 278 + sync._sync.assert_awaited_once() 279 + 280 + @pytest.mark.asyncio 281 + async def test_revoked_circuit_never_recovers(self, tmp_path: Path): 282 + sync = self._make_sync(tmp_path) 283 + sync._circuit_open = True 284 + sync._circuit_open_permanent = True 285 + sync._circuit_open_since = time.monotonic() - 600 286 + sync._circuit_cooldown = CIRCUIT_COOLDOWN_INITIAL 287 + sync._sync = AsyncMock() 288 + 289 + with patch("asyncio.to_thread", new_callable=AsyncMock) as to_thread: 290 + await self._run_briefly(sync) 291 + 292 + assert sync._circuit_open 293 + assert sync._circuit_open_permanent 294 + to_thread.assert_not_called() 295 + sync._sync.assert_not_awaited() 296 + 297 + @pytest.mark.asyncio 298 + async def test_backoff_increases_on_failed_probe(self, tmp_path: Path): 299 + sync = self._make_sync(tmp_path) 300 + sync._circuit_open = True 301 + sync._circuit_open_permanent = False 302 + sync._circuit_open_since = 70.0 303 + sync._circuit_cooldown = CIRCUIT_COOLDOWN_INITIAL 304 + sync._sync = AsyncMock() 305 + before_probe = time.monotonic() 306 + 307 + with patch("asyncio.to_thread", new_callable=AsyncMock, return_value=None): 308 + await self._run_briefly(sync) 309 + 310 + assert sync._circuit_open 311 + assert sync._circuit_cooldown == CIRCUIT_COOLDOWN_INITIAL * 2 312 + assert sync._circuit_open_since >= before_probe 313 + sync._sync.assert_not_awaited() 314 + 315 + @pytest.mark.asyncio 316 + async def test_full_reset_after_successful_probe(self, tmp_path: Path): 317 + sync = self._make_sync(tmp_path) 318 + sync._circuit_open = True 319 + sync._circuit_open_permanent = False 320 + sync._circuit_open_since = time.monotonic() - 121 321 + sync._circuit_cooldown = 120 322 + sync._consecutive_failures = 5 323 + sync._last_error_type = ErrorType.TRANSIENT 324 + sync._sync = AsyncMock(side_effect=lambda force_full=False: sync.stop()) 325 + sync._trigger.set() 326 + 327 + with patch("asyncio.to_thread", new_callable=AsyncMock, return_value=[]): 328 + await sync.run() 329 + 330 + assert not sync._circuit_open 331 + assert not sync._circuit_open_permanent 332 + assert sync._circuit_open_since == 0.0 333 + assert sync._circuit_cooldown == CIRCUIT_COOLDOWN_INITIAL 334 + assert sync._consecutive_failures == 0 335 + assert sync._last_error_type is None 336 + 337 + @pytest.mark.asyncio 338 + async def test_cooldown_caps_at_max(self, tmp_path: Path): 339 + sync = self._make_sync(tmp_path) 340 + sync._circuit_open = True 341 + sync._circuit_open_permanent = False 342 + sync._circuit_open_since = 0.0 343 + sync._circuit_cooldown = CIRCUIT_COOLDOWN_MAX 344 + sync._sync = AsyncMock() 345 + before_probe = time.monotonic() 346 + 347 + with patch("asyncio.to_thread", new_callable=AsyncMock, return_value=None): 348 + await self._run_briefly(sync) 349 + 350 + assert sync._circuit_open 351 + assert sync._circuit_cooldown == CIRCUIT_COOLDOWN_MAX 352 + assert sync._circuit_open_since >= before_probe 353 + sync._sync.assert_not_awaited() 354 + 355 + @pytest.mark.asyncio 356 + async def test_skips_probe_before_cooldown_elapses(self, tmp_path: Path): 357 + sync = self._make_sync(tmp_path) 358 + sync._circuit_open = True 359 + sync._circuit_open_permanent = False 360 + sync._circuit_open_since = time.monotonic() - 10 361 + sync._circuit_cooldown = CIRCUIT_COOLDOWN_INITIAL 362 + sync._sync = AsyncMock() 363 + 364 + with patch("asyncio.to_thread", new_callable=AsyncMock) as to_thread: 365 + await self._run_briefly(sync) 366 + 367 + assert sync._circuit_open 368 + to_thread.assert_not_called() 369 + sync._sync.assert_not_awaited() 231 370 232 371 233 372 class TestRetryCapRespected: