linux observer
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Quarantine corrupt and non-retryable segments in sync service

Zero-byte segments (from GStreamer crashes) are detected before upload
and renamed to .failed. HTTP 400 (CLIENT) errors trigger quarantine
without incrementing the circuit breaker failure counter. Quarantined
.failed dirs are cleaned up on the same retention schedule as synced
segments.

Fixes a 3+ day infinite error loop where a single corrupt segment
would trip the circuit breaker, recover via probe, and immediately
re-fail.

+289 -6
+38 -2
src/solstone_linux/sync.py
··· 110 110 ) 111 111 self._save_synced_days() 112 112 113 + def _quarantine_segment(self, segment_dir: Path, reason: str) -> bool: 114 + """Rename a segment directory to .failed so it's never retried.""" 115 + failed_path = segment_dir.with_name(segment_dir.name + ".failed") 116 + try: 117 + segment_dir.rename(failed_path) 118 + logger.warning( 119 + "Quarantined %s/%s — %s", 120 + segment_dir.parent.parent.name, 121 + segment_dir.name, 122 + reason, 123 + ) 124 + return True 125 + except OSError as e: 126 + logger.error("Failed to quarantine %s: %s", segment_dir, e) 127 + return False 128 + 113 129 async def _cleanup_synced_segments(self) -> None: 114 130 """Delete synced segments older than cache_retention_days. 115 131 ··· 180 196 continue 181 197 182 198 name = seg_dir.name 183 - # Never touch incomplete or failed 184 - if name.endswith(".incomplete") or name.endswith(".failed"): 199 + # Never touch incomplete segments 200 + if name.endswith(".incomplete"): 201 + continue 202 + 203 + # Delete quarantined (.failed) segments — no server confirmation needed 204 + if name.endswith(".failed"): 205 + shutil.rmtree(seg_dir) 206 + logger.info("Cleanup: deleted quarantined %s/%s", day, name) 207 + deleted_day += 1 185 208 continue 186 209 187 210 if name not in server_keys: ··· 367 390 if segment_key in server_keys: 368 391 continue 369 392 393 + # Quarantine segments where all files are zero-byte (corrupt) 394 + files = [f for f in segment_dir.iterdir() if f.is_file()] 395 + if files and all(f.stat().st_size == 0 for f in files): 396 + self._quarantine_segment(segment_dir, "all files zero-byte") 397 + continue 398 + 370 399 any_needed_upload = True 371 400 self._set_sync_status("uploading", f"uploading {segment_key}") 372 401 success = await self._upload_segment(day, segment_dir) 373 402 374 403 if not success: 404 + if self._last_error_type == ErrorType.CLIENT: 405 + # Non-retryable client error (e.g. 400) — quarantine, don't trip circuit 406 + self._quarantine_segment( 407 + segment_dir, "server rejected (client error)" 408 + ) 409 + continue 410 + 375 411 self._consecutive_failures += 1 376 412 threshold = self._circuit_threshold() 377 413 if self._consecutive_failures >= threshold:
+251 -4
tests/test_sync.py
··· 386 386 assert client._max_retries == 1 387 387 388 388 389 + class TestQuarantineZeroByte: 390 + """Test that segments with all zero-byte files are quarantined before upload.""" 391 + 392 + def _make_sync(self, tmp_path: Path) -> SyncService: 393 + config = Config(base_dir=tmp_path) 394 + config.ensure_dirs() 395 + client = UploadClient(config) 396 + return SyncService(config, client) 397 + 398 + def _create_zero_byte_segment( 399 + self, captures_dir: Path, day: str, stream: str, name: str 400 + ) -> Path: 401 + seg_dir = captures_dir / day / stream / name 402 + seg_dir.mkdir(parents=True, exist_ok=True) 403 + (seg_dir / "screen.webm").write_bytes(b"") 404 + (seg_dir / "audio.flac").write_bytes(b"") 405 + return seg_dir 406 + 407 + @pytest.mark.asyncio 408 + async def test_zero_byte_segment_quarantined(self, tmp_path: Path): 409 + """A segment with all zero-byte files is renamed to .failed before upload.""" 410 + sync = self._make_sync(tmp_path) 411 + captures = sync._config.captures_dir 412 + 413 + seg = self._create_zero_byte_segment( 414 + captures, "20260410", "archon", "120000_300" 415 + ) 416 + server_response = [] 417 + 418 + with patch( 419 + "asyncio.to_thread", new_callable=AsyncMock, return_value=server_response 420 + ): 421 + await sync._sync() 422 + 423 + assert not seg.exists() 424 + assert seg.with_name("120000_300.failed").exists() 425 + 426 + @pytest.mark.asyncio 427 + async def test_zero_byte_does_not_trigger_upload(self, tmp_path: Path): 428 + """Zero-byte segments should never call upload_segment.""" 429 + sync = self._make_sync(tmp_path) 430 + captures = sync._config.captures_dir 431 + 432 + self._create_zero_byte_segment(captures, "20260410", "archon", "120000_300") 433 + server_response = [] 434 + 435 + with patch( 436 + "asyncio.to_thread", new_callable=AsyncMock, return_value=server_response 437 + ): 438 + with patch.object( 439 + sync, "_upload_segment", new_callable=AsyncMock 440 + ) as mock_upload: 441 + await sync._sync() 442 + mock_upload.assert_not_called() 443 + 444 + @pytest.mark.asyncio 445 + async def test_mixed_files_not_quarantined(self, tmp_path: Path): 446 + """A segment with some zero-byte and some non-zero files is NOT quarantined.""" 447 + sync = self._make_sync(tmp_path) 448 + captures = sync._config.captures_dir 449 + 450 + seg_dir = captures / "20260410" / "archon" / "120000_300" 451 + seg_dir.mkdir(parents=True) 452 + (seg_dir / "screen.webm").write_bytes(b"") 453 + (seg_dir / "audio.flac").write_bytes(b"\x00" * 100) 454 + 455 + server_response = [] 456 + 457 + with patch( 458 + "asyncio.to_thread", new_callable=AsyncMock, return_value=server_response 459 + ): 460 + with patch.object( 461 + sync, "_upload_segment", new_callable=AsyncMock, return_value=True 462 + ) as mock_upload: 463 + await sync._sync() 464 + mock_upload.assert_called_once() 465 + 466 + @pytest.mark.asyncio 467 + async def test_zero_byte_day_marked_synced(self, tmp_path: Path): 468 + """A past day with only zero-byte segments gets marked synced after quarantine.""" 469 + sync = self._make_sync(tmp_path) 470 + captures = sync._config.captures_dir 471 + 472 + self._create_zero_byte_segment(captures, "20260101", "archon", "120000_300") 473 + server_response = [] 474 + 475 + with patch( 476 + "asyncio.to_thread", new_callable=AsyncMock, return_value=server_response 477 + ): 478 + await sync._sync() 479 + 480 + assert "20260101" in sync._synced_days 481 + 482 + 483 + class TestQuarantineClientError: 484 + """Test that CLIENT errors (HTTP 400) quarantine the segment.""" 485 + 486 + def _make_sync(self, tmp_path: Path) -> SyncService: 487 + config = Config(base_dir=tmp_path) 488 + config.ensure_dirs() 489 + client = UploadClient(config) 490 + return SyncService(config, client) 491 + 492 + def _create_segment( 493 + self, captures_dir: Path, day: str, stream: str, name: str 494 + ) -> Path: 495 + seg_dir = captures_dir / day / stream / name 496 + seg_dir.mkdir(parents=True, exist_ok=True) 497 + (seg_dir / "screen.webm").write_bytes(b"\x00" * 100) 498 + return seg_dir 499 + 500 + @pytest.mark.asyncio 501 + async def test_client_error_quarantines_segment(self, tmp_path: Path): 502 + """HTTP 400 response quarantines the segment to .failed.""" 503 + sync = self._make_sync(tmp_path) 504 + captures = sync._config.captures_dir 505 + 506 + seg = self._create_segment(captures, "20260410", "archon", "120000_300") 507 + server_response = [] 508 + 509 + async def fake_upload(day, segment_dir): 510 + sync._last_error_type = ErrorType.CLIENT 511 + return False 512 + 513 + with patch( 514 + "asyncio.to_thread", new_callable=AsyncMock, return_value=server_response 515 + ): 516 + with patch.object(sync, "_upload_segment", side_effect=fake_upload): 517 + await sync._sync() 518 + 519 + assert not seg.exists() 520 + assert seg.with_name("120000_300.failed").exists() 521 + 522 + @pytest.mark.asyncio 523 + async def test_client_error_does_not_trip_circuit(self, tmp_path: Path): 524 + """CLIENT errors should not increment consecutive_failures or open circuit.""" 525 + sync = self._make_sync(tmp_path) 526 + captures = sync._config.captures_dir 527 + 528 + for i in range(10): 529 + self._create_segment(captures, "20260410", "archon", f"12000{i}_300") 530 + 531 + server_response = [] 532 + 533 + async def fake_upload(day, segment_dir): 534 + sync._last_error_type = ErrorType.CLIENT 535 + return False 536 + 537 + with patch( 538 + "asyncio.to_thread", new_callable=AsyncMock, return_value=server_response 539 + ): 540 + with patch.object(sync, "_upload_segment", side_effect=fake_upload): 541 + await sync._sync() 542 + 543 + assert sync._consecutive_failures == 0 544 + assert not sync._circuit_open 545 + 546 + @pytest.mark.asyncio 547 + async def test_transient_error_still_trips_circuit(self, tmp_path: Path): 548 + """TRANSIENT errors should still increment failures and trip circuit.""" 549 + sync = self._make_sync(tmp_path) 550 + captures = sync._config.captures_dir 551 + 552 + for i in range(6): 553 + self._create_segment(captures, "20260410", "archon", f"12000{i}_300") 554 + 555 + server_response = [] 556 + 557 + async def fake_upload(day, segment_dir): 558 + sync._last_error_type = ErrorType.TRANSIENT 559 + return False 560 + 561 + with patch( 562 + "asyncio.to_thread", new_callable=AsyncMock, return_value=server_response 563 + ): 564 + with patch.object(sync, "_upload_segment", side_effect=fake_upload): 565 + await sync._sync() 566 + 567 + assert sync._circuit_open 568 + assert sync._consecutive_failures >= 5 569 + 570 + 389 571 class TestCleanupSyncedSegments: 390 572 """Test cache retention cleanup of synced segments.""" 391 573 ··· 467 649 assert (captures / "20260101" / "archon" / "120000_300").exists() 468 650 469 651 @pytest.mark.asyncio 470 - async def test_never_touches_incomplete_or_failed(self, tmp_path: Path): 471 - """.incomplete and .failed segments are never deleted.""" 652 + async def test_never_touches_incomplete(self, tmp_path: Path): 653 + """.incomplete segments are never deleted.""" 472 654 sync = self._make_sync(tmp_path, retention=7) 473 655 captures = sync._config.captures_dir 474 656 475 657 self._create_segment(captures, "20260101", "archon", "120000.incomplete") 476 - self._create_segment(captures, "20260101", "archon", "130000.failed") 477 658 self._create_segment(captures, "20260101", "archon", "140000_300") 478 659 sync._synced_days.add("20260101") 479 660 ··· 484 665 await sync._cleanup_synced_segments() 485 666 486 667 assert (captures / "20260101" / "archon" / "120000.incomplete").exists() 487 - assert (captures / "20260101" / "archon" / "130000.failed").exists() 488 668 assert not (captures / "20260101" / "archon" / "140000_300").exists() 489 669 490 670 @pytest.mark.asyncio ··· 576 756 await sync._cleanup_synced_segments() 577 757 578 758 assert not (captures / "20260101" / "archon" / "120000_300").exists() 759 + 760 + 761 + class TestCleanupFailedSegments: 762 + """Test that .failed segments are cleaned up on retention schedule.""" 763 + 764 + def _make_sync(self, tmp_path: Path, retention: int = 7) -> SyncService: 765 + config = Config(base_dir=tmp_path) 766 + config.cache_retention_days = retention 767 + config.ensure_dirs() 768 + client = UploadClient(config) 769 + return SyncService(config, client) 770 + 771 + def _create_segment( 772 + self, captures_dir: Path, day: str, stream: str, name: str 773 + ) -> Path: 774 + seg_dir = captures_dir / day / stream / name 775 + seg_dir.mkdir(parents=True, exist_ok=True) 776 + (seg_dir / "screen.webm").write_bytes(b"\x00" * 100) 777 + return seg_dir 778 + 779 + @pytest.mark.asyncio 780 + async def test_failed_segments_deleted_on_retention(self, tmp_path: Path): 781 + """.failed segments are deleted when day meets retention age.""" 782 + sync = self._make_sync(tmp_path, retention=7) 783 + captures = sync._config.captures_dir 784 + 785 + self._create_segment(captures, "20260101", "archon", "120000_300.failed") 786 + sync._synced_days.add("20260101") 787 + 788 + server_response = [] 789 + with patch( 790 + "asyncio.to_thread", new_callable=AsyncMock, return_value=server_response 791 + ): 792 + await sync._cleanup_synced_segments() 793 + 794 + assert not (captures / "20260101" / "archon" / "120000_300.failed").exists() 795 + 796 + @pytest.mark.asyncio 797 + async def test_failed_segments_kept_if_day_not_synced(self, tmp_path: Path): 798 + """.failed segments are kept if the day is not in synced_days.""" 799 + sync = self._make_sync(tmp_path, retention=7) 800 + captures = sync._config.captures_dir 801 + 802 + self._create_segment(captures, "20260101", "archon", "120000_300.failed") 803 + 804 + with patch("asyncio.to_thread", new_callable=AsyncMock) as mock_thread: 805 + await sync._cleanup_synced_segments() 806 + 807 + assert (captures / "20260101" / "archon" / "120000_300.failed").exists() 808 + mock_thread.assert_not_called() 809 + 810 + @pytest.mark.asyncio 811 + async def test_incomplete_still_skipped(self, tmp_path: Path): 812 + """.incomplete segments are still never deleted.""" 813 + sync = self._make_sync(tmp_path, retention=7) 814 + captures = sync._config.captures_dir 815 + 816 + self._create_segment(captures, "20260101", "archon", "120000.incomplete") 817 + sync._synced_days.add("20260101") 818 + 819 + server_response = [] 820 + with patch( 821 + "asyncio.to_thread", new_callable=AsyncMock, return_value=server_response 822 + ): 823 + await sync._cleanup_synced_segments() 824 + 825 + assert (captures / "20260101" / "archon" / "120000.incomplete").exists()