personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add zero-byte rejection and duplicate handling to remote sync

- Client: skip 0-byte files in _handle_message() before SHA256 computation
- Server: skip 0-byte files in ingest_upload() before processing
- Client: parse "duplicate" upload response and immediately confirm
segment instead of entering the confirmation polling loop
- Change upload_segment() return type from bool to UploadResult
NamedTuple carrying success + duplicate flags

+283 -30
+3
apps/remote/routes.py
··· 367 367 368 368 # Read content and compute SHA256 369 369 content = upload.read() 370 + if len(content) == 0: 371 + logger.warning(f"Skipping 0-byte file: {submitted_filename}") 372 + continue 370 373 sha256 = compute_bytes_sha256(content) 371 374 372 375 file_data.append((submitted_filename, simple_filename, content, sha256))
+30 -6
apps/remote/tests/test_client.py
··· 50 50 client = RemoteClient("https://server/ingest/key") 51 51 result = client.upload_segment("20250103", "120000_300", [file1, file2]) 52 52 53 - assert result is True 53 + assert result.success is True 54 + assert result.duplicate is False 54 55 mock_session.post.assert_called_once() 55 56 56 57 # Check the call arguments ··· 88 89 client = RemoteClient("https://server/ingest/key") 89 90 result = client.upload_segment("20250103", "120000_300", [file1]) 90 91 91 - assert result is True 92 + assert result.success is True 92 93 assert mock_session.post.call_count == 2 93 94 94 95 ··· 111 112 client = RemoteClient("https://server/ingest/key") 112 113 result = client.upload_segment("20250103", "120000_300", [file1]) 113 114 114 - assert result is False 115 + assert result.success is False 115 116 assert mock_session.post.call_count == len(RETRY_BACKOFF) 116 117 117 118 ··· 135 136 client = RemoteClient("https://server/ingest/key") 136 137 result = client.upload_segment("20250103", "120000_300", [file1, file2]) 137 138 138 - assert result is True 139 + assert result.success is True 139 140 140 141 141 142 def test_upload_segment_fails_if_all_missing(mock_session, tmp_path): ··· 149 150 client = RemoteClient("https://server/ingest/key") 150 151 result = client.upload_segment("20250103", "120000_300", [file1, file2]) 151 152 152 - assert result is False 153 + assert result.success is False 153 154 mock_session.post.assert_not_called() 154 155 155 156 ··· 160 161 client = RemoteClient("https://server/ingest/key") 161 162 result = client.upload_segment("20250103", "120000_300", []) 162 163 163 - assert result is False 164 + assert result.success is False 164 165 mock_session.post.assert_not_called() 166 + 167 + 168 + def test_upload_segment_duplicate_response(mock_session, tmp_path): 169 + """Test that duplicate server response is detected.""" 170 + from observe.sync import RemoteClient 171 + 172 + file1 = tmp_path / "audio.flac" 173 + file1.write_bytes(b"audio data") 174 + 175 + mock_response = MagicMock() 176 + mock_response.status_code = 200 177 + mock_response.json.return_value = { 178 + "status": "duplicate", 179 + "existing_segment": "120000_300", 180 + "message": "All files already received", 181 + } 182 + mock_session.post.return_value = mock_response 183 + 184 + client = RemoteClient("https://server/ingest/key") 185 + result = client.upload_segment("20250103", "120000_300", [file1]) 186 + 187 + assert result.success is True 188 + assert result.duplicate is True
+64
apps/remote/tests/test_routes.py
··· 1464 1464 data = resp.get_json() 1465 1465 assert data["status"] == "collision" 1466 1466 assert data["segment"] != "120000_300" # Adjusted 1467 + 1468 + 1469 + def test_ingest_zero_byte_file_rejected(remote_env): 1470 + """Test that uploading only 0-byte files returns 400.""" 1471 + env = remote_env() 1472 + 1473 + # Create a remote 1474 + resp = env.client.post( 1475 + "/app/remote/api/create", 1476 + json={"name": "test-remote"}, 1477 + content_type="application/json", 1478 + ) 1479 + key = resp.get_json()["key"] 1480 + 1481 + # Upload a 0-byte file 1482 + resp = env.client.post( 1483 + f"/app/remote/ingest/{key}", 1484 + data={ 1485 + "day": "20250103", 1486 + "segment": "120000_300", 1487 + "files": (io.BytesIO(b""), "empty_audio.flac"), 1488 + }, 1489 + ) 1490 + assert resp.status_code == 400 1491 + assert "No valid files" in resp.get_json()["error"] 1492 + 1493 + 1494 + def test_ingest_mixed_zero_byte_files(remote_env): 1495 + """Test that 0-byte files are skipped but valid files are accepted.""" 1496 + env = remote_env() 1497 + 1498 + # Create a remote 1499 + resp = env.client.post( 1500 + "/app/remote/api/create", 1501 + json={"name": "test-remote"}, 1502 + content_type="application/json", 1503 + ) 1504 + key = resp.get_json()["key"] 1505 + 1506 + # Upload one valid file and one 0-byte file 1507 + valid_data = b"real audio content" 1508 + resp = env.client.post( 1509 + f"/app/remote/ingest/{key}", 1510 + data={ 1511 + "day": "20250103", 1512 + "segment": "120000_300", 1513 + "files": [ 1514 + (io.BytesIO(b""), "empty.flac"), 1515 + (io.BytesIO(valid_data), "audio.flac"), 1516 + ], 1517 + }, 1518 + ) 1519 + assert resp.status_code == 200 1520 + data = resp.get_json() 1521 + assert data["status"] == "ok" 1522 + assert data["files"] == ["audio.flac"] 1523 + assert data["bytes"] == len(valid_data) 1524 + 1525 + # Verify only valid file was written 1526 + expected_file = ( 1527 + env.journal / "20250103" / "test-remote" / "120000_300" / "audio.flac" 1528 + ) 1529 + assert expected_file.exists() 1530 + assert expected_file.read_bytes() == valid_data
+48 -18
observe/sync.py
··· 22 22 from dataclasses import dataclass 23 23 from datetime import datetime 24 24 from pathlib import Path 25 - from typing import Any 25 + from typing import Any, NamedTuple 26 26 from urllib.parse import urlparse 27 27 28 28 import requests ··· 43 43 RETRY_BACKOFF = [1, 5, 15] # seconds 44 44 UPLOAD_TIMEOUT = 300 # 5 minutes for large files 45 45 HEALTH_CHECK_TIMEOUT = 10 # seconds for startup health check 46 + 47 + 48 + class UploadResult(NamedTuple): 49 + """Result of an upload_segment() call.""" 50 + 51 + success: bool 52 + duplicate: bool = False 46 53 47 54 48 55 def check_remote_health( ··· 129 136 segment: str, 130 137 files: list[Path], 131 138 meta: dict | None = None, 132 - ) -> bool: 139 + ) -> UploadResult: 133 140 """Upload segment files to remote server. 134 141 135 142 Args: ··· 142 149 fields and the server merges them into meta. 143 150 144 151 Returns: 145 - True if upload succeeded, False otherwise 152 + UploadResult with success=True/False and duplicate=True if server 153 + reported duplicate 146 154 """ 147 155 if not files: 148 156 logger.warning("No files to upload") 149 - return False 157 + return UploadResult(False) 150 158 151 159 for attempt, delay in enumerate(RETRY_BACKOFF): 152 160 # Open file handles and ensure they're closed ··· 166 174 167 175 if not files_data: 168 176 logger.error("No valid files to upload") 169 - return False 177 + return UploadResult(False) 170 178 171 179 # Build request data 172 180 data: dict[str, Any] = { ··· 190 198 ) 191 199 192 200 if response.status_code == 200: 193 - result = response.json() 194 - logger.info( 195 - f"Uploaded {len(result.get('files', []))} files " 196 - f"({result.get('bytes', 0)} bytes) for {day}/{segment}" 197 - ) 198 - return True 201 + resp_data = response.json() 202 + is_duplicate = resp_data.get("status") == "duplicate" 203 + if is_duplicate: 204 + logger.info(f"Server reported duplicate for {day}/{segment}") 205 + else: 206 + logger.info( 207 + f"Uploaded {len(resp_data.get('files', []))} files " 208 + f"({resp_data.get('bytes', 0)} bytes) for {day}/{segment}" 209 + ) 210 + return UploadResult(True, duplicate=is_duplicate) 199 211 200 212 logger.warning(f"Upload failed: {response.status_code} {response.text}") 201 213 ··· 215 227 time.sleep(delay) 216 228 217 229 logger.error(f"Upload failed after {MAX_RETRIES} attempts: {day}/{segment}") 218 - return False 230 + return UploadResult(False) 219 231 220 232 221 233 # Confirmation polling configuration ··· 426 438 file_info = [] 427 439 for filename in files: 428 440 file_path = segment_dir / filename 429 - if file_path.exists(): 430 - sha = compute_file_sha256(file_path) 431 - file_info.append({"name": filename, "sha256": sha}) 432 - else: 441 + if not file_path.exists(): 433 442 logger.warning(f"File not found: {file_path}") 443 + continue 444 + if file_path.stat().st_size == 0: 445 + logger.warning(f"Skipping 0-byte file: {file_path}") 446 + continue 447 + sha = compute_file_sha256(file_path) 448 + file_info.append({"name": filename, "sha256": sha}) 434 449 435 450 if not file_info: 436 451 logger.error(f"No valid files for segment {day}/{segment}") ··· 530 545 logger.warning(f"No files found for segment {day}/{segment}, skipping") 531 546 break 532 547 533 - success = self._client.upload_segment( 548 + result = self._client.upload_segment( 534 549 day, segment, existing_files, meta=seg_info.meta 535 550 ) 536 - if not success: 551 + if not result.success: 537 552 logger.error(f"Upload failed for {day}/{segment}, will retry") 538 553 time.sleep(CONFIRM_POLL_INTERVAL) 539 554 continue 555 + 556 + if result.duplicate: 557 + # Server already has these files - mark confirmed immediately 558 + # without entering the confirmation polling loop 559 + record = { 560 + "ts": now_ms(), 561 + "segment": segment, 562 + "status": "confirmed", 563 + } 564 + append_sync_record(day, record) 565 + self._cleanup_segment(seg_dir, existing_files) 566 + with self._lock: 567 + self._last_confirmed = f"{day}/{segment}" 568 + logger.info(f"Duplicate upload confirmed, cleaned up: {day}/{segment}") 569 + break 540 570 541 571 logger.info(f"Upload complete for {day}/{segment}, confirming...") 542 572
+138 -6
tests/test_sync.py
··· 424 424 425 425 def test_process_segment_skips_upload_if_already_confirmed(sync_journal, monkeypatch): 426 426 """Test that segment already on server is skipped without upload.""" 427 - from observe.sync import SegmentInfo, SyncService 427 + from observe.sync import SegmentInfo, SyncService, UploadResult 428 428 429 429 journal = sync_journal["path"] 430 430 day = sync_journal["day"] ··· 463 463 ] 464 464 mock_session.get.return_value = server_response 465 465 mock_client.session = mock_session 466 - mock_client.upload_segment = MagicMock(return_value=True) 466 + mock_client.upload_segment = MagicMock(return_value=UploadResult(True)) 467 467 mock_client_class.return_value = mock_client 468 468 469 469 service = SyncService("https://server/ingest/key") ··· 477 477 478 478 def test_process_segment_uploads_if_not_on_server(sync_journal, monkeypatch): 479 479 """Test that segment not on server is uploaded.""" 480 - from observe.sync import SegmentInfo, SyncService 480 + from observe.sync import SegmentInfo, SyncService, UploadResult 481 481 482 482 journal = sync_journal["path"] 483 483 day = sync_journal["day"] ··· 518 518 ] 519 519 mock_session.get.side_effect = responses 520 520 mock_client.session = mock_session 521 - mock_client.upload_segment = MagicMock(return_value=True) 521 + mock_client.upload_segment = MagicMock(return_value=UploadResult(True)) 522 522 mock_client_class.return_value = mock_client 523 523 524 524 service = SyncService("https://server/ingest/key") ··· 530 530 531 531 def test_process_segment_passes_metadata_to_upload(sync_journal, monkeypatch): 532 532 """Test that metadata is passed through to upload_segment call.""" 533 - from observe.sync import SegmentInfo, SyncService 533 + from observe.sync import SegmentInfo, SyncService, UploadResult 534 534 535 535 journal = sync_journal["path"] 536 536 day = sync_journal["day"] ··· 577 577 ] 578 578 mock_session.get.side_effect = responses 579 579 mock_client.session = mock_session 580 - mock_client.upload_segment = MagicMock(return_value=True) 580 + mock_client.upload_segment = MagicMock(return_value=UploadResult(True)) 581 581 mock_client_class.return_value = mock_client 582 582 583 583 service = SyncService("https://server/ingest/key") ··· 592 592 "facet": "meetings", 593 593 "stream": "default", 594 594 } 595 + 596 + 597 + def test_handle_message_skips_zero_byte_files(sync_journal, monkeypatch): 598 + """Test that 0-byte files are skipped during message handling.""" 599 + from observe.sync import SyncService, load_sync_state 600 + 601 + journal = sync_journal["path"] 602 + day = sync_journal["day"] 603 + monkeypatch.setenv("JOURNAL_PATH", str(journal)) 604 + 605 + # Create segment directory with mixed files 606 + seg_dir = journal / day / "default" / "120000_300" 607 + seg_dir.mkdir(parents=True, exist_ok=True) 608 + (seg_dir / "audio.flac").write_bytes(b"real audio data") 609 + (seg_dir / "screen.webm").write_bytes(b"") # 0-byte 610 + 611 + with patch("observe.sync.RemoteClient"), patch("observe.sync.CallosumConnection"): 612 + service = SyncService("https://server/ingest/key") 613 + 614 + message = { 615 + "tract": "observe", 616 + "event": "observing", 617 + "day": day, 618 + "segment": "120000_300", 619 + "files": ["audio.flac", "screen.webm"], 620 + "stream": "default", 621 + } 622 + service._handle_message(message) 623 + 624 + # Only valid file should be queued 625 + assert service._queue.qsize() == 1 626 + seg_info = service._queue.get_nowait() 627 + assert len(seg_info.files) == 1 628 + assert seg_info.files[0]["name"] == "audio.flac" 629 + 630 + # Pending record should only have 1 file 631 + records = load_sync_state(day) 632 + assert len(records) == 1 633 + assert len(records[0]["files"]) == 1 634 + 635 + 636 + def test_handle_message_skips_all_zero_byte_files(sync_journal, monkeypatch): 637 + """Test that segment is not queued when all files are 0-byte.""" 638 + from observe.sync import SyncService, load_sync_state 639 + 640 + journal = sync_journal["path"] 641 + day = sync_journal["day"] 642 + monkeypatch.setenv("JOURNAL_PATH", str(journal)) 643 + 644 + # Create segment directory with only 0-byte files 645 + seg_dir = journal / day / "default" / "120000_300" 646 + seg_dir.mkdir(parents=True, exist_ok=True) 647 + (seg_dir / "audio.flac").write_bytes(b"") 648 + (seg_dir / "screen.webm").write_bytes(b"") 649 + 650 + with patch("observe.sync.RemoteClient"), patch("observe.sync.CallosumConnection"): 651 + service = SyncService("https://server/ingest/key") 652 + 653 + message = { 654 + "tract": "observe", 655 + "event": "observing", 656 + "day": day, 657 + "segment": "120000_300", 658 + "files": ["audio.flac", "screen.webm"], 659 + "stream": "default", 660 + } 661 + service._handle_message(message) 662 + 663 + # No segment should be queued 664 + assert service._queue.qsize() == 0 665 + 666 + # No pending record should be written 667 + records = load_sync_state(day) 668 + assert len(records) == 0 669 + 670 + 671 + def test_process_segment_duplicate_skips_confirmation(sync_journal, monkeypatch): 672 + """Test that duplicate upload skips confirmation polling.""" 673 + from observe.sync import SegmentInfo, SyncService, UploadResult, load_sync_state 674 + 675 + journal = sync_journal["path"] 676 + day = sync_journal["day"] 677 + monkeypatch.setenv("JOURNAL_PATH", str(journal)) 678 + 679 + seg_info = SegmentInfo( 680 + day=day, 681 + segment="120000_300", 682 + files=[ 683 + {"name": "audio.flac", "sha256": "abc123"}, 684 + ], 685 + meta={"stream": "default"}, 686 + ) 687 + 688 + # Create the segment file so cleanup has something to work with 689 + seg_dir = journal / day / "default" / "120000_300" 690 + seg_dir.mkdir(parents=True, exist_ok=True) 691 + (seg_dir / "audio.flac").write_bytes(b"audio data") 692 + 693 + with patch("observe.sync.CallosumConnection") as mock_callosum_class: 694 + mock_callosum = MagicMock() 695 + mock_callosum_class.return_value = mock_callosum 696 + 697 + with patch("observe.sync.RemoteClient") as mock_client_class: 698 + mock_client = MagicMock() 699 + mock_session = MagicMock() 700 + 701 + # Pre-check: server doesn't have segment yet 702 + mock_session.get.return_value = MagicMock( 703 + status_code=200, json=MagicMock(return_value=[]) 704 + ) 705 + mock_client.session = mock_session 706 + # Upload returns duplicate 707 + mock_client.upload_segment = MagicMock( 708 + return_value=UploadResult(True, duplicate=True) 709 + ) 710 + mock_client_class.return_value = mock_client 711 + 712 + service = SyncService("https://server/ingest/key") 713 + service._process_segment(seg_info) 714 + 715 + # Upload should have been called 716 + mock_client.upload_segment.assert_called_once() 717 + 718 + # Confirmation polling should NOT have happened 719 + # (only 1 GET call for pre-check, no additional confirmation GETs) 720 + assert mock_session.get.call_count == 1 721 + 722 + # Confirmed record should have been written 723 + records = load_sync_state(day) 724 + confirmed = [r for r in records if r.get("status") == "confirmed"] 725 + assert len(confirmed) == 1 726 + assert confirmed[0]["segment"] == "120000_300" 595 727 596 728 597 729 class TestCheckRemoteHealth: