personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add observe.observed event tracking for remote segments

Track when remote-uploaded segments complete processing by listening for
observe.observed events. Records observed status in sync history and
updates segments_observed stat counter.

- Add apps/remote/events.py with @on_event handler for observe.observed
- Extract shared utilities to apps/remote/utils.py (DRY refactor)
- Segments endpoint now includes observed: true/false status
- Rename test fixture to storage_env to avoid conftest collision

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+890 -209
+60
apps/remote/events.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Remote app event handlers - track processing status of remote segments. 5 + 6 + Listens for observe.observed events for remote-originated segments and 7 + records their completion in the sync history. This enables remote observers 8 + to verify end-to-end processing success via the segments endpoint. 9 + """ 10 + 11 + import logging 12 + import time 13 + 14 + from apps.events import EventContext, on_event 15 + 16 + from .utils import append_history_record, find_remote_by_name, increment_stat 17 + 18 + logger = logging.getLogger(__name__) 19 + 20 + 21 + @on_event("observe", "observed") 22 + def handle_observed(ctx: EventContext) -> None: 23 + """Track observe.observed events for remote-originated segments. 24 + 25 + When a segment from a remote observer completes processing, append 26 + an 'observed' record to that remote's sync history. This enables 27 + remote observers to verify end-to-end success via the segments API. 28 + """ 29 + remote_name = ctx.msg.get("remote") 30 + if not remote_name: 31 + return # Not a remote segment 32 + 33 + segment = ctx.msg.get("segment") 34 + day = ctx.msg.get("day") 35 + if not segment or not day: 36 + logger.warning(f"observe.observed missing segment/day for remote {remote_name}") 37 + return 38 + 39 + # Find remote by name to get key prefix 40 + remote = find_remote_by_name(remote_name) 41 + if not remote: 42 + logger.debug(f"Remote not found for observed event: {remote_name}") 43 + return 44 + 45 + key_prefix = remote.get("key", "")[:8] 46 + if not key_prefix: 47 + return 48 + 49 + # Append observed record to history 50 + record = { 51 + "ts": int(time.time() * 1000), 52 + "type": "observed", 53 + "segment": segment, 54 + } 55 + append_history_record(key_prefix, day, record) 56 + 57 + # Update stats 58 + increment_stat(key_prefix, "segments_observed") 59 + 60 + logger.debug(f"Recorded observed status for remote {remote_name}: {day}/{segment}")
+85 -163
apps/remote/routes.py
··· 30 30 from convey import emit 31 31 from think.utils import day_path 32 32 33 + from .utils import ( 34 + append_history_record, 35 + get_remotes_dir, 36 + list_remotes, 37 + load_history, 38 + load_remote, 39 + save_remote, 40 + ) 41 + 33 42 logger = logging.getLogger(__name__) 34 43 35 44 remote_bp = Blueprint( ··· 47 56 return base64.urlsafe_b64encode(secrets.token_bytes(KEY_BYTES)).decode().rstrip("=") 48 57 49 58 50 - def _get_remotes_dir() -> Path: 51 - """Get the remotes storage directory.""" 52 - return get_app_storage_path("remote", "remotes", ensure_exists=True) 53 - 54 - 55 - def _load_remote(key: str) -> dict | None: 56 - """Load remote metadata by key.""" 57 - remotes_dir = _get_remotes_dir() 58 - # Use first 8 chars of key as filename for readability 59 - remote_path = remotes_dir / f"{key[:8]}.json" 60 - if not remote_path.exists(): 61 - return None 62 - try: 63 - with open(remote_path) as f: 64 - data = json.load(f) 65 - # Verify full key matches 66 - if data.get("key") != key: 67 - return None 68 - return data 69 - except (json.JSONDecodeError, OSError): 70 - return None 71 - 72 - 73 - def _save_remote(data: dict) -> bool: 74 - """Save remote metadata.""" 75 - key = data.get("key") 76 - if not key: 77 - return False 78 - remotes_dir = _get_remotes_dir() 79 - remote_path = remotes_dir / f"{key[:8]}.json" 80 - try: 81 - with open(remote_path, "w") as f: 82 - json.dump(data, f, indent=2) 83 - return True 84 - except OSError: 85 - return False 86 - 87 - 88 59 def _revoke_remote(key: str) -> bool: 89 60 """Revoke remote by key (soft-delete).""" 90 - remote = _load_remote(key) 61 + remote = load_remote(key) 91 62 if not remote: 92 63 return False 93 64 remote["revoked"] = True 94 65 remote["revoked_at"] = int(time.time() * 1000) 95 - return _save_remote(remote) 96 - 97 - 98 - def _list_remotes() -> list[dict]: 99 - """List all registered remotes.""" 100 - remotes_dir = _get_remotes_dir() 101 - remotes = [] 102 - for remote_path in remotes_dir.glob("*.json"): 103 - try: 104 - with open(remote_path) as f: 105 - data = json.load(f) 106 - remotes.append(data) 107 - except (json.JSONDecodeError, OSError): 108 - continue 109 - # Sort by created_at descending 110 - remotes.sort(key=lambda x: x.get("created_at", 0), reverse=True) 111 - return remotes 66 + return save_remote(remote) 112 67 113 68 114 69 # === Management API (session-protected) === ··· 117 72 @remote_bp.route("/api/list") 118 73 def api_list() -> Any: 119 74 """List all registered remotes.""" 120 - remotes = _list_remotes() 75 + remotes = list_remotes() 121 76 # Sanitize output - don't expose full keys 122 77 result = [] 123 78 for r in remotes: ··· 162 117 }, 163 118 } 164 119 165 - if not _save_remote(remote_data): 120 + if not save_remote(remote_data): 166 121 return jsonify({"error": "Failed to save remote"}), 500 167 122 168 123 # Log observer creation (journal-level, no facet) ··· 190 145 def api_delete(key_prefix: str) -> Any: 191 146 """Revoke a remote by key prefix (soft-delete).""" 192 147 # Find remote by prefix 193 - remotes_dir = _get_remotes_dir() 148 + remotes_dir = get_remotes_dir() 194 149 remote_path = remotes_dir / f"{key_prefix}.json" 195 150 if not remote_path.exists(): 196 151 return jsonify({"error": "Remote not found"}), 404 ··· 221 176 def api_get_key(key_prefix: str) -> Any: 222 177 """Get full key and ingest URL for a remote.""" 223 178 # Find remote by prefix 224 - remotes_dir = _get_remotes_dir() 179 + remotes_dir = get_remotes_dir() 225 180 remote_path = remotes_dir / f"{key_prefix}.json" 226 181 if not remote_path.exists(): 227 182 return jsonify({"error": "Remote not found"}), 404 ··· 261 216 return sha256.hexdigest() 262 217 263 218 264 - def _get_hist_dir(key_prefix: str, ensure_exists: bool = True) -> Path: 265 - """Get the history directory for a remote. 266 - 267 - Args: 268 - key_prefix: First 8 chars of remote key 269 - ensure_exists: Create directory if it doesn't exist (default: True) 270 - 271 - Returns: 272 - Path to apps/remote/remotes/<key_prefix>/hist/ 273 - """ 274 - return get_app_storage_path( 275 - "remote", "remotes", key_prefix, "hist", ensure_exists=ensure_exists 276 - ) 277 - 278 - 279 - def _append_sync_record(key_prefix: str, day: str, record: dict) -> None: 280 - """Append a sync record to the history file. 281 - 282 - Args: 283 - key_prefix: First 8 chars of remote key 284 - day: Day string (YYYYMMDD) 285 - record: Sync record to append 286 - """ 287 - hist_dir = _get_hist_dir(key_prefix) 288 - hist_path = hist_dir / f"{day}.jsonl" 289 - with open(hist_path, "a", encoding="utf-8") as f: 290 - f.write(json.dumps(record, ensure_ascii=False) + "\n") 291 - 292 - 293 - def _load_sync_history(key_prefix: str, day: str) -> list[dict]: 294 - """Load sync history for a remote on a given day. 295 - 296 - Args: 297 - key_prefix: First 8 chars of remote key 298 - day: Day string (YYYYMMDD) 299 - 300 - Returns: 301 - List of sync records, empty if file doesn't exist 302 - """ 303 - hist_dir = _get_hist_dir(key_prefix, ensure_exists=False) 304 - hist_path = hist_dir / f"{day}.jsonl" 305 - if not hist_path.exists(): 306 - return [] 307 - 308 - records = [] 309 - try: 310 - with open(hist_path, encoding="utf-8") as f: 311 - for line in f: 312 - line = line.strip() 313 - if line: 314 - records.append(json.loads(line)) 315 - except (json.JSONDecodeError, OSError) as e: 316 - logger.warning(f"Failed to load sync history {hist_path}: {e}") 317 - return records 318 - 319 - 320 219 def _find_by_inode(day_dir: Path, inode: int) -> Path | None: 321 220 """Find a file by inode in the day directory. 322 221 ··· 392 291 segment: Segment key in HHMMSS_LEN format 393 292 394 293 Returns: 395 - True if segment directory or files with segment prefix exist 294 + True if segment directory exists 396 295 """ 397 296 # Check for segment directory 398 - if (day_dir / segment).exists(): 399 - return True 400 - # Check for files starting with segment key 401 - if list(day_dir.glob(f"{segment}_*")): 402 - return True 403 - return False 297 + return (day_dir / segment).exists() 404 298 405 299 406 300 def _find_available_segment( ··· 447 341 return None # Exhausted attempts 448 342 449 343 344 + def _strip_segment_prefix(filename: str, segment: str) -> str: 345 + """Strip segment prefix from filename if present. 346 + 347 + Handles old-style prefixed filenames (e.g., "143022_300_audio.flac") 348 + and returns simple names (e.g., "audio.flac"). 349 + 350 + Args: 351 + filename: Original filename (may have segment prefix) 352 + segment: Segment key (HHMMSS_LEN) 353 + 354 + Returns: 355 + Simple filename without segment prefix 356 + """ 357 + prefix = f"{segment}_" 358 + if filename.startswith(prefix): 359 + return filename[len(prefix) :] 360 + return filename 361 + 362 + 450 363 def _save_to_failed(day_dir: Path, files: list, segment: str) -> Path: 451 364 """Save files to failed directory for manual review. 452 365 ··· 492 405 Writes files to journal and emits observe.observing event. 493 406 """ 494 407 # Validate key 495 - remote = _load_remote(key) 408 + remote = load_remote(key) 496 409 if not remote: 497 410 return jsonify({"error": "Invalid key"}), 401 498 411 ··· 527 440 return jsonify({"error": "No files uploaded"}), 400 528 441 529 442 # Ensure day directory exists 530 - target_dir = day_path(day) 531 - target_dir.mkdir(parents=True, exist_ok=True) 443 + day_dir = day_path(day) 444 + day_dir.mkdir(parents=True, exist_ok=True) 532 445 533 446 # Find available segment key (may differ from original if collision) 534 447 original_segment = segment 535 - available_segment = _find_available_segment(target_dir, segment) 448 + available_segment = _find_available_segment(day_dir, segment) 536 449 537 450 if available_segment is None: 538 451 # Exhausted attempts, save to failed directory ··· 540 453 f"No available segment slot for {day}/{segment} from " 541 454 f"{remote.get('name')} after {MAX_SEGMENT_ATTEMPTS} attempts" 542 455 ) 543 - failed_dir = _save_to_failed(target_dir, files, segment) 456 + failed_dir = _save_to_failed(day_dir, files, segment) 544 457 return ( 545 458 jsonify( 546 459 { 547 460 "status": "failed", 548 461 "error": f"No available segment slot after {MAX_SEGMENT_ATTEMPTS} attempts", 549 - "failed_path": str(failed_dir.relative_to(target_dir.parent)), 462 + "failed_path": str(failed_dir.relative_to(day_dir.parent)), 550 463 } 551 464 ), 552 465 507, ··· 559 472 f"for remote {remote.get('name')}" 560 473 ) 561 474 562 - # Save files with adjusted segment key in filenames 475 + # Create segment directory for files 476 + segment_dir = day_dir / segment 477 + segment_dir.mkdir(parents=True, exist_ok=True) 478 + 479 + # Save files with simple names (strip any segment prefix) 563 480 saved_files = [] 564 481 file_records = [] # For sync history 565 482 total_bytes = 0 ··· 573 490 if not submitted_filename: 574 491 continue 575 492 576 - # Replace original segment with adjusted segment in filename 577 - written_filename = submitted_filename 578 - if original_segment != segment and original_segment in submitted_filename: 579 - written_filename = submitted_filename.replace(original_segment, segment, 1) 493 + # Strip segment prefix from filename if present (for backward compat with old clients) 494 + # e.g., "143022_300_audio.flac" -> "audio.flac" 495 + simple_filename = _strip_segment_prefix(submitted_filename, original_segment) 580 496 581 - target_path = target_dir / written_filename 497 + target_path = segment_dir / simple_filename 582 498 583 499 # Save file 584 500 try: ··· 587 503 file_size = stat.st_size 588 504 file_inode = stat.st_ino 589 505 590 - saved_files.append(written_filename) 506 + saved_files.append(simple_filename) 591 507 total_bytes += file_size 592 508 593 509 # Compute SHA256 and record file info for sync history ··· 595 511 file_records.append( 596 512 { 597 513 "submitted": submitted_filename, 598 - "written": written_filename, 514 + "written": simple_filename, 599 515 "inode": file_inode, 600 516 "size": file_size, 601 517 "sha256": file_sha256, 602 518 } 603 519 ) 604 520 605 - logger.info(f"Saved {written_filename} to {target_dir}") 521 + logger.info(f"Saved {simple_filename} to {segment_dir}") 606 522 except OSError as e: 607 - logger.error(f"Failed to save {written_filename}: {e}") 608 - return jsonify({"error": f"Failed to save {written_filename}"}), 500 523 + logger.error(f"Failed to save {simple_filename}: {e}") 524 + return jsonify({"error": f"Failed to save {simple_filename}"}), 500 609 525 610 526 if not saved_files: 611 527 return jsonify({"error": "No valid files saved"}), 400 ··· 619 535 } 620 536 if segment != original_segment: 621 537 sync_record["segment_original"] = original_segment 622 - _append_sync_record(key_prefix, day, sync_record) 538 + append_history_record(key_prefix, day, sync_record) 623 539 624 540 # Update remote stats 625 541 remote["last_seen"] = int(time.time() * 1000) ··· 630 546 remote["stats"]["bytes_received"] = ( 631 547 remote["stats"].get("bytes_received", 0) + total_bytes 632 548 ) 633 - _save_remote(remote) 549 + save_remote(remote) 634 550 635 551 # Emit observe.observing event to local Callosum 636 552 # Include host/platform from remote observer if provided ··· 669 585 - ...additional fields 670 586 """ 671 587 # Validate key 672 - remote = _load_remote(key) 588 + remote = load_remote(key) 673 589 if not remote: 674 590 return jsonify({"error": "Invalid key"}), 401 675 591 ··· 697 613 # Update last_seen on status events 698 614 if tract == "observe" and event == "status": 699 615 remote["last_seen"] = int(time.time() * 1000) 700 - _save_remote(remote) 616 + save_remote(remote) 701 617 702 618 return jsonify({"status": "ok"}) 703 619 ··· 716 632 day: Day string (YYYYMMDD) 717 633 """ 718 634 # Validate key 719 - remote = _load_remote(key) 635 + remote = load_remote(key) 720 636 if not remote: 721 637 return jsonify({"error": "Invalid key"}), 401 722 638 ··· 732 648 733 649 # Load sync history for this remote/day 734 650 key_prefix = key[:8] 735 - records = _load_sync_history(key_prefix, day) 651 + records = load_history(key_prefix, day) 736 652 737 653 if not records: 738 654 return jsonify([]) 739 655 740 656 # Get day directory for file verification 741 - target_dir = day_path(day) 657 + day_dir = day_path(day) 742 658 743 659 # Build response grouped by segment, deduplicating by sha256 744 660 # Later records overwrite earlier ones (most recent upload wins) 745 661 segments: dict[str, dict] = {} 662 + observed_segments: set[str] = set() # Track which segments have been observed 746 663 747 664 for record in records: 665 + # Handle "observed" record type (from event handler) 666 + record_type = record.get("type", "upload") 667 + if record_type == "observed": 668 + observed_segments.add(record.get("segment", "")) 669 + continue 670 + 748 671 segment = record.get("segment", "") 749 672 segment_original = record.get("segment_original") 750 673 ··· 774 697 if submitted != written: 775 698 file_info["submitted_name"] = submitted 776 699 777 - # Check file status 778 - recorded_path = target_dir / written 700 + # Check file status - files are now in segment directories 701 + segment_dir = day_dir / segment 702 + recorded_path = segment_dir / written 779 703 if recorded_path.exists(): 780 704 file_info["status"] = "present" 781 - elif inode and target_dir.exists(): 705 + elif inode and day_dir.exists(): 782 706 # Try to find by inode 783 - relocated = _find_by_inode(target_dir, inode) 707 + relocated = _find_by_inode(day_dir, inode) 784 708 if relocated: 785 709 file_info["status"] = "relocated" 786 - file_info["current_path"] = str(relocated.relative_to(target_dir)) 710 + file_info["current_path"] = str(relocated.relative_to(day_dir)) 787 711 else: 788 712 file_info["status"] = "missing" 789 713 else: ··· 795 719 # Convert files_by_sha dicts to lists and sort by segment key 796 720 result = [] 797 721 for segment_data in sorted(segments.values(), key=lambda s: s["key"]): 798 - result.append( 799 - { 800 - "key": segment_data["key"], 801 - **( 802 - {"original_key": segment_data["original_key"]} 803 - if "original_key" in segment_data 804 - else {} 805 - ), 806 - "files": list(segment_data["files_by_sha"].values()), 807 - } 808 - ) 722 + segment_key = segment_data["key"] 723 + entry = { 724 + "key": segment_key, 725 + "observed": segment_key in observed_segments, 726 + "files": list(segment_data["files_by_sha"].values()), 727 + } 728 + if "original_key" in segment_data: 729 + entry["original_key"] = segment_data["original_key"] 730 + result.append(entry) 809 731 return jsonify(result)
+213
apps/remote/tests/test_events.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Tests for remote app event handlers.""" 5 + 6 + from __future__ import annotations 7 + 8 + import json 9 + 10 + import pytest 11 + 12 + from apps.events import EventContext 13 + from apps.remote.events import handle_observed 14 + 15 + 16 + @pytest.fixture 17 + def remote_journal(tmp_path, monkeypatch): 18 + """Create a temporary journal with a remote registered.""" 19 + from convey import state 20 + 21 + journal = tmp_path / "journal" 22 + journal.mkdir() 23 + 24 + # Set JOURNAL_PATH env var and convey state 25 + monkeypatch.setenv("JOURNAL_PATH", str(journal)) 26 + monkeypatch.setattr(state, "journal_root", str(journal)) 27 + 28 + # Create remotes directory 29 + remotes_dir = journal / "apps" / "remote" / "remotes" 30 + remotes_dir.mkdir(parents=True) 31 + 32 + # Create a test remote 33 + remote_data = { 34 + "key": "testkey123456789abcdef", 35 + "name": "test-remote", 36 + "created_at": 1704312000000, 37 + "last_seen": None, 38 + "last_segment": None, 39 + "enabled": True, 40 + "stats": { 41 + "segments_received": 5, 42 + "bytes_received": 1024, 43 + }, 44 + } 45 + remote_path = remotes_dir / "testkey1.json" 46 + with open(remote_path, "w") as f: 47 + json.dump(remote_data, f) 48 + 49 + class Env: 50 + def __init__(self): 51 + self.journal = journal 52 + self.remotes_dir = remotes_dir 53 + self.remote_path = remote_path 54 + 55 + return Env() 56 + 57 + 58 + class TestHandleObserved: 59 + """Tests for handle_observed event handler.""" 60 + 61 + def test_records_observed_for_remote(self, remote_journal): 62 + """Handler records observed status for remote segment.""" 63 + ctx = EventContext( 64 + msg={ 65 + "tract": "observe", 66 + "event": "observed", 67 + "remote": "test-remote", 68 + "segment": "120000_300", 69 + "day": "20250103", 70 + }, 71 + app="remote", 72 + tract="observe", 73 + event="observed", 74 + journal_root=str(remote_journal.journal), 75 + ) 76 + 77 + handle_observed(ctx) 78 + 79 + # Check history was written 80 + hist_path = remote_journal.remotes_dir / "testkey1" / "hist" / "20250103.jsonl" 81 + assert hist_path.exists() 82 + 83 + with open(hist_path) as f: 84 + record = json.loads(f.readline()) 85 + 86 + assert record["type"] == "observed" 87 + assert record["segment"] == "120000_300" 88 + assert "ts" in record 89 + 90 + # Check stat was incremented 91 + with open(remote_journal.remote_path) as f: 92 + data = json.load(f) 93 + assert data["stats"]["segments_observed"] == 1 94 + 95 + def test_multiple_observed_events(self, remote_journal): 96 + """Handler appends multiple observed records.""" 97 + for segment in ["120000_300", "130000_300", "140000_300"]: 98 + ctx = EventContext( 99 + msg={ 100 + "tract": "observe", 101 + "event": "observed", 102 + "remote": "test-remote", 103 + "segment": segment, 104 + "day": "20250103", 105 + }, 106 + app="remote", 107 + tract="observe", 108 + event="observed", 109 + journal_root=str(remote_journal.journal), 110 + ) 111 + handle_observed(ctx) 112 + 113 + # Check all records written 114 + hist_path = remote_journal.remotes_dir / "testkey1" / "hist" / "20250103.jsonl" 115 + with open(hist_path) as f: 116 + lines = f.readlines() 117 + 118 + assert len(lines) == 3 119 + assert json.loads(lines[0])["segment"] == "120000_300" 120 + assert json.loads(lines[1])["segment"] == "130000_300" 121 + assert json.loads(lines[2])["segment"] == "140000_300" 122 + 123 + # Check stat incremented 3 times 124 + with open(remote_journal.remote_path) as f: 125 + data = json.load(f) 126 + assert data["stats"]["segments_observed"] == 3 127 + 128 + def test_ignores_non_remote_events(self, remote_journal): 129 + """Handler ignores events without remote field.""" 130 + ctx = EventContext( 131 + msg={ 132 + "tract": "observe", 133 + "event": "observed", 134 + "segment": "120000_300", 135 + "day": "20250103", 136 + }, 137 + app="remote", 138 + tract="observe", 139 + event="observed", 140 + journal_root=str(remote_journal.journal), 141 + ) 142 + 143 + handle_observed(ctx) 144 + 145 + # No history should be created 146 + hist_dir = remote_journal.remotes_dir / "testkey1" / "hist" 147 + assert not hist_dir.exists() 148 + 149 + def test_ignores_unknown_remote(self, remote_journal): 150 + """Handler ignores events for unknown remotes.""" 151 + ctx = EventContext( 152 + msg={ 153 + "tract": "observe", 154 + "event": "observed", 155 + "remote": "unknown-remote", 156 + "segment": "120000_300", 157 + "day": "20250103", 158 + }, 159 + app="remote", 160 + tract="observe", 161 + event="observed", 162 + journal_root=str(remote_journal.journal), 163 + ) 164 + 165 + handle_observed(ctx) 166 + 167 + # No history should be created for unknown remote 168 + hist_dir = remote_journal.remotes_dir / "testkey1" / "hist" 169 + assert not hist_dir.exists() 170 + 171 + def test_handles_missing_segment(self, remote_journal): 172 + """Handler handles events missing segment field.""" 173 + ctx = EventContext( 174 + msg={ 175 + "tract": "observe", 176 + "event": "observed", 177 + "remote": "test-remote", 178 + "day": "20250103", 179 + }, 180 + app="remote", 181 + tract="observe", 182 + event="observed", 183 + journal_root=str(remote_journal.journal), 184 + ) 185 + 186 + # Should not raise 187 + handle_observed(ctx) 188 + 189 + # No history should be created 190 + hist_dir = remote_journal.remotes_dir / "testkey1" / "hist" 191 + assert not hist_dir.exists() 192 + 193 + def test_handles_missing_day(self, remote_journal): 194 + """Handler handles events missing day field.""" 195 + ctx = EventContext( 196 + msg={ 197 + "tract": "observe", 198 + "event": "observed", 199 + "remote": "test-remote", 200 + "segment": "120000_300", 201 + }, 202 + app="remote", 203 + tract="observe", 204 + event="observed", 205 + journal_root=str(remote_journal.journal), 206 + ) 207 + 208 + # Should not raise 209 + handle_observed(ctx) 210 + 211 + # No history should be created 212 + hist_dir = remote_journal.remotes_dir / "testkey1" / "hist" 213 + assert not hist_dir.exists()
+132 -46
apps/remote/tests/test_routes.py
··· 264 264 assert data["files"] == ["test_audio.flac"] 265 265 assert data["bytes"] == len(test_data) 266 266 267 - # Verify file was written 268 - expected_file = env.journal / "20250103" / "test_audio.flac" 267 + # Verify file was written (in segment directory) 268 + expected_file = env.journal / "20250103" / "120000_300" / "test_audio.flac" 269 269 assert expected_file.exists() 270 270 assert expected_file.read_bytes() == test_data 271 271 ··· 499 499 assert _segment_exists(day_dir, "120001_300") is False 500 500 501 501 502 - def test_segment_exists_with_files(remote_env): 503 - """Test _segment_exists detects files with segment prefix.""" 502 + def test_segment_exists_checks_directory(remote_env): 503 + """Test _segment_exists only checks for segment directory.""" 504 504 from apps.remote.routes import _segment_exists 505 505 506 506 env = remote_env() 507 507 day_dir = env.journal / "20250103" 508 508 day_dir.mkdir(parents=True) 509 509 510 - # Create a file with segment prefix 510 + # File with segment prefix doesn't count - only directories 511 511 (day_dir / "120000_300_audio.flac").write_bytes(b"test") 512 + assert _segment_exists(day_dir, "120000_300") is False 512 513 514 + # Create a segment directory 515 + (day_dir / "120000_300").mkdir() 513 516 assert _segment_exists(day_dir, "120000_300") is True 514 517 assert _segment_exists(day_dir, "120001_300") is False 515 518 ··· 545 548 day_dir = env.journal / "20250103" 546 549 day_dir.mkdir(parents=True) 547 550 548 - # Create conflicting file 549 - (day_dir / "120000_300_audio.flac").write_bytes(b"test") 551 + # Create conflicting segment directory 552 + (day_dir / "120000_300").mkdir() 550 553 551 554 result = _find_available_segment(day_dir, "120000_300") 552 555 ··· 568 571 day_dir = env.journal / "20250103" 569 572 day_dir.mkdir(parents=True) 570 573 571 - # Create conflicting file 572 - (day_dir / "120000_300_audio.flac").write_bytes(b"test") 574 + # Create conflicting segment directory 575 + (day_dir / "120000_300").mkdir() 573 576 574 577 # With max_attempts=0, should return None immediately (no attempts allowed) 575 578 result = _find_available_segment(day_dir, "120000_300", max_attempts=0) ··· 618 621 ) 619 622 key = resp.get_json()["key"] 620 623 621 - # Create a conflicting file 624 + # Create a conflicting segment directory 622 625 day_dir = env.journal / "20250103" 623 626 day_dir.mkdir(parents=True) 624 - (day_dir / "120000_300_audio.flac").write_bytes(b"existing") 627 + (day_dir / "120000_300").mkdir() 628 + (day_dir / "120000_300" / "audio.flac").write_bytes(b"existing") 625 629 626 630 # Upload with same segment key 627 631 test_data = b"new audio content" ··· 638 642 data = resp.get_json() 639 643 assert data["status"] == "ok" 640 644 641 - # The filename should have been adjusted 645 + # The segment key should have been adjusted, file is stripped of prefix 642 646 saved_file = data["files"][0] 643 - assert saved_file != "120000_300_audio.flac" 644 - assert "_audio.flac" in saved_file 647 + assert saved_file == "audio.flac" 645 648 646 - # Verify both files exist 647 - assert (day_dir / "120000_300_audio.flac").exists() # Original 648 - assert (day_dir / saved_file).exists() # New adjusted 649 + # Verify both segments exist 650 + assert (day_dir / "120000_300" / "audio.flac").exists() # Original 651 + # New one is in adjusted segment directory (not 120000_300) 652 + adjusted_segments = [ 653 + d for d in day_dir.iterdir() if d.is_dir() and d.name != "120000_300" 654 + ] 655 + assert len(adjusted_segments) == 1 656 + assert (adjusted_segments[0] / "audio.flac").exists() 649 657 650 658 651 659 def test_ingest_no_collision_preserves_segment(remote_env): ··· 660 668 ) 661 669 key = resp.get_json()["key"] 662 670 663 - # Upload without any conflicting files 671 + # Upload without any conflicting segment directory 664 672 test_data = b"audio content" 665 673 resp = env.client.post( 666 674 f"/app/remote/ingest/{key}", ··· 674 682 assert resp.status_code == 200 675 683 data = resp.get_json() 676 684 assert data["status"] == "ok" 677 - assert data["files"] == ["120000_300_audio.flac"] 685 + assert data["files"] == ["audio.flac"] # Segment prefix stripped 678 686 679 - # Verify file saved with original name 680 - expected_file = env.journal / "20250103" / "120000_300_audio.flac" 687 + # Verify file saved in segment directory 688 + expected_file = env.journal / "20250103" / "120000_300" / "audio.flac" 681 689 assert expected_file.exists() 682 690 683 691 ··· 693 701 ) 694 702 key = resp.get_json()["key"] 695 703 696 - # Create a conflicting file 704 + # Create a conflicting segment directory 697 705 day_dir = env.journal / "20250103" 698 706 day_dir.mkdir(parents=True) 699 - (day_dir / "120000_300_audio.flac").write_bytes(b"existing") 707 + (day_dir / "120000_300").mkdir() 700 708 701 709 # Upload with same segment key 702 710 test_data = b"new audio" ··· 715 723 resp = env.client.get("/app/remote/api/list") 716 724 remotes = resp.get_json() 717 725 assert len(remotes) == 1 718 - # The stored segment should be different from original 719 726 last_segment = remotes[0]["last_segment"] 720 727 assert last_segment is not None 721 728 # It should be adjusted (not the original conflicting one) 722 - assert ( 723 - last_segment != "120000_300" 724 - or (day_dir / f"{last_segment}_audio.flac").exists() 725 - ) 729 + assert last_segment != "120000_300" 730 + # The adjusted segment directory should exist 731 + assert (day_dir / last_segment).exists() 726 732 727 733 728 734 # === Sync history tests === ··· 789 795 790 796 file_rec = record["files"][0] 791 797 assert file_rec["submitted"] == "120000_300_audio.flac" 792 - assert file_rec["written"] == "120000_300_audio.flac" 798 + assert file_rec["written"] == "audio.flac" # Segment prefix stripped 793 799 assert file_rec["size"] == len(test_data) 794 800 assert len(file_rec["sha256"]) == 64 # SHA256 hex length 795 801 assert file_rec["inode"] > 0 ··· 809 815 key = data["key"] 810 816 key_prefix = data["key_prefix"] 811 817 812 - # Create conflicting file 818 + # Create conflicting segment directory 813 819 day_dir = env.journal / "20250103" 814 820 day_dir.mkdir(parents=True) 815 - (day_dir / "120000_300_audio.flac").write_bytes(b"existing") 821 + (day_dir / "120000_300").mkdir() 816 822 817 823 # Upload with same segment key 818 824 test_data = b"new audio content" ··· 843 849 assert record["segment_original"] == "120000_300" 844 850 assert record["segment"] != "120000_300" 845 851 846 - # File names should reflect adjustment 852 + # File names should reflect stripping of segment prefix 847 853 file_rec = record["files"][0] 848 854 assert file_rec["submitted"] == "120000_300_audio.flac" 849 - assert file_rec["written"] != "120000_300_audio.flac" 850 - assert record["segment"] in file_rec["written"] 855 + assert file_rec["written"] == "audio.flac" # Segment prefix stripped 851 856 852 857 853 858 def test_segments_endpoint_empty(remote_env): ··· 925 930 assert len(data) == 1 926 931 segment = data[0] 927 932 assert segment["key"] == "120000_300" 933 + assert segment["observed"] is False # Not yet processed 928 934 assert "original_key" not in segment # No collision 929 935 assert len(segment["files"]) == 1 930 936 931 937 file_info = segment["files"][0] 932 - assert file_info["name"] == "120000_300_audio.flac" 938 + assert file_info["name"] == "audio.flac" # Segment prefix stripped 933 939 assert file_info["size"] == len(test_data) 934 940 assert len(file_info["sha256"]) == 64 935 941 assert file_info["status"] == "present" 936 - assert "submitted_name" not in file_info # Same as written 942 + assert ( 943 + file_info["submitted_name"] == "120000_300_audio.flac" 944 + ) # Original name preserved 937 945 938 946 939 947 def test_segments_endpoint_shows_collision(remote_env): ··· 948 956 ) 949 957 key = resp.get_json()["key"] 950 958 951 - # Create conflicting file 959 + # Create conflicting segment directory 952 960 day_dir = env.journal / "20250103" 953 961 day_dir.mkdir(parents=True) 954 - (day_dir / "120000_300_audio.flac").write_bytes(b"existing") 962 + (day_dir / "120000_300").mkdir() 955 963 956 964 # Upload with collision 957 965 test_data = b"new audio" ··· 976 984 977 985 file_info = segment["files"][0] 978 986 assert file_info["submitted_name"] == "120000_300_audio.flac" 979 - assert file_info["name"] != "120000_300_audio.flac" 987 + assert file_info["name"] == "audio.flac" # Segment prefix stripped 980 988 assert file_info["status"] == "present" 981 989 982 990 ··· 1004 1012 ) 1005 1013 assert resp.status_code == 200 1006 1014 1007 - # Delete the file 1008 - (env.journal / "20250103" / "120000_300_audio.flac").unlink() 1015 + # Delete the file (now in segment directory with stripped name) 1016 + (env.journal / "20250103" / "120000_300" / "audio.flac").unlink() 1009 1017 1010 1018 # Query segments 1011 1019 resp = env.client.get(f"/app/remote/ingest/{key}/segments/20250103") ··· 1040 1048 ) 1041 1049 assert resp.status_code == 200 1042 1050 1043 - # Move the file to a subdirectory (simulating indexer moving it) 1051 + # Move the file to a different name (simulating some file reorganization) 1044 1052 day_dir = env.journal / "20250103" 1045 1053 segment_dir = day_dir / "120000_300" 1046 - segment_dir.mkdir() 1047 - original_path = day_dir / "120000_300_audio.flac" 1048 - new_path = segment_dir / "audio.flac" 1054 + original_path = segment_dir / "audio.flac" 1055 + new_path = segment_dir / "renamed_audio.flac" 1049 1056 original_path.rename(new_path) 1050 1057 1051 1058 # Query segments - should detect relocation by inode ··· 1055 1062 assert len(data) == 1 1056 1063 file_info = data[0]["files"][0] 1057 1064 assert file_info["status"] == "relocated" 1058 - assert file_info["current_path"] == "120000_300/audio.flac" 1065 + assert file_info["current_path"] == "120000_300/renamed_audio.flac" 1059 1066 1060 1067 1061 1068 def test_find_by_inode(remote_env): ··· 1158 1165 for segment in data: 1159 1166 assert len(segment["files"]) == 1 1160 1167 assert segment["files"][0]["status"] == "present" 1168 + 1169 + 1170 + def test_segments_endpoint_shows_observed_status(remote_env): 1171 + """Test that segments endpoint includes observed status.""" 1172 + env = remote_env() 1173 + 1174 + # Create a remote 1175 + resp = env.client.post( 1176 + "/app/remote/api/create", 1177 + json={"name": "observed-test"}, 1178 + content_type="application/json", 1179 + ) 1180 + data = resp.get_json() 1181 + key = data["key"] 1182 + key_prefix = data["key_prefix"] 1183 + 1184 + # Upload a file 1185 + test_data = b"test audio content" 1186 + resp = env.client.post( 1187 + f"/app/remote/ingest/{key}", 1188 + data={ 1189 + "day": "20250103", 1190 + "segment": "120000_300", 1191 + "files": (io.BytesIO(test_data), "120000_300_audio.flac"), 1192 + }, 1193 + ) 1194 + assert resp.status_code == 200 1195 + 1196 + # Query segments - should show observed: false 1197 + resp = env.client.get(f"/app/remote/ingest/{key}/segments/20250103") 1198 + data = resp.get_json() 1199 + assert len(data) == 1 1200 + assert data[0]["observed"] is False 1201 + 1202 + # Manually add an observed record to simulate event handler 1203 + hist_dir = env.journal / "apps" / "remote" / "remotes" / key_prefix / "hist" 1204 + hist_dir.mkdir(parents=True, exist_ok=True) 1205 + hist_path = hist_dir / "20250103.jsonl" 1206 + with open(hist_path, "a") as f: 1207 + f.write('{"ts": 1704312345000, "type": "observed", "segment": "120000_300"}\n') 1208 + 1209 + # Query again - should now show observed: true 1210 + resp = env.client.get(f"/app/remote/ingest/{key}/segments/20250103") 1211 + data = resp.get_json() 1212 + assert len(data) == 1 1213 + assert data[0]["observed"] is True 1214 + 1215 + 1216 + def test_api_list_includes_segments_observed_stat(remote_env): 1217 + """Test that api_list includes segments_observed stat.""" 1218 + env = remote_env() 1219 + 1220 + # Create a remote 1221 + resp = env.client.post( 1222 + "/app/remote/api/create", 1223 + json={"name": "stats-test"}, 1224 + content_type="application/json", 1225 + ) 1226 + data = resp.get_json() 1227 + key_prefix = data["key_prefix"] 1228 + 1229 + # Initially no segments_observed 1230 + resp = env.client.get("/app/remote/api/list") 1231 + data = resp.get_json() 1232 + assert len(data) == 1 1233 + assert "segments_observed" not in data[0]["stats"] 1234 + 1235 + # Manually add segments_observed stat 1236 + remote_path = env.journal / "apps" / "remote" / "remotes" / f"{key_prefix}.json" 1237 + with open(remote_path) as f: 1238 + remote_data = json.load(f) 1239 + remote_data["stats"]["segments_observed"] = 5 1240 + with open(remote_path, "w") as f: 1241 + json.dump(remote_data, f) 1242 + 1243 + # Should now show in list 1244 + resp = env.client.get("/app/remote/api/list") 1245 + data = resp.get_json() 1246 + assert data[0]["stats"]["segments_observed"] == 5
+217
apps/remote/tests/test_utils.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Tests for remote app utilities.""" 5 + 6 + from __future__ import annotations 7 + 8 + import json 9 + 10 + import pytest 11 + 12 + from apps.remote.utils import ( 13 + append_history_record, 14 + find_remote_by_name, 15 + get_hist_dir, 16 + get_remotes_dir, 17 + increment_stat, 18 + list_remotes, 19 + load_history, 20 + load_remote, 21 + save_remote, 22 + ) 23 + 24 + 25 + @pytest.fixture 26 + def storage_env(tmp_path, monkeypatch): 27 + """Create a temporary journal environment for storage tests.""" 28 + from convey import state 29 + 30 + journal = tmp_path / "journal" 31 + journal.mkdir() 32 + monkeypatch.setenv("JOURNAL_PATH", str(journal)) 33 + monkeypatch.setattr(state, "journal_root", str(journal)) 34 + 35 + # Create remotes directory 36 + remotes_dir = journal / "apps" / "remote" / "remotes" 37 + remotes_dir.mkdir(parents=True) 38 + 39 + class Env: 40 + def __init__(self): 41 + self.journal = journal 42 + self.remotes_dir = remotes_dir 43 + 44 + return Env() 45 + 46 + 47 + class TestRemoteStorage: 48 + """Tests for remote metadata storage.""" 49 + 50 + def test_get_remotes_dir_creates_directory(self, storage_env): 51 + """get_remotes_dir creates and returns remotes directory.""" 52 + result = get_remotes_dir() 53 + assert result.exists() 54 + assert result == storage_env.remotes_dir 55 + 56 + def test_save_and_load_remote(self, storage_env): 57 + """save_remote and load_remote work together.""" 58 + remote = { 59 + "key": "testkey123456789", 60 + "name": "test-remote", 61 + "stats": {"segments_received": 0}, 62 + } 63 + 64 + assert save_remote(remote) is True 65 + 66 + loaded = load_remote("testkey123456789") 67 + assert loaded is not None 68 + assert loaded["name"] == "test-remote" 69 + 70 + def test_load_remote_wrong_key(self, storage_env): 71 + """load_remote returns None for wrong key.""" 72 + remote = { 73 + "key": "testkey123456789", 74 + "name": "test-remote", 75 + "stats": {}, 76 + } 77 + save_remote(remote) 78 + 79 + # Same prefix but different key 80 + result = load_remote("testkey1xxxxxxxx") 81 + assert result is None 82 + 83 + def test_load_remote_not_found(self, storage_env): 84 + """load_remote returns None when remote doesn't exist.""" 85 + result = load_remote("nonexistent12345") 86 + assert result is None 87 + 88 + def test_list_remotes_empty(self, storage_env): 89 + """list_remotes returns empty list when no remotes.""" 90 + result = list_remotes() 91 + assert result == [] 92 + 93 + def test_list_remotes_returns_all(self, storage_env): 94 + """list_remotes returns all registered remotes.""" 95 + for i in range(3): 96 + save_remote( 97 + { 98 + "key": f"remote{i}0123456789", 99 + "name": f"remote-{i}", 100 + "created_at": 1000 + i, 101 + "stats": {}, 102 + } 103 + ) 104 + 105 + result = list_remotes() 106 + assert len(result) == 3 107 + # Sorted by created_at descending 108 + assert result[0]["name"] == "remote-2" 109 + assert result[1]["name"] == "remote-1" 110 + assert result[2]["name"] == "remote-0" 111 + 112 + def test_find_remote_by_name(self, storage_env): 113 + """find_remote_by_name finds existing remote.""" 114 + save_remote( 115 + { 116 + "key": "findme123456789", 117 + "name": "find-me", 118 + "stats": {}, 119 + } 120 + ) 121 + 122 + result = find_remote_by_name("find-me") 123 + assert result is not None 124 + assert result["key"] == "findme123456789" 125 + 126 + def test_find_remote_by_name_not_found(self, storage_env): 127 + """find_remote_by_name returns None for unknown name.""" 128 + result = find_remote_by_name("unknown") 129 + assert result is None 130 + 131 + 132 + class TestHistoryStorage: 133 + """Tests for sync history storage.""" 134 + 135 + def test_get_hist_dir_creates_directory(self, storage_env): 136 + """get_hist_dir creates history directory.""" 137 + result = get_hist_dir("testkey1") 138 + assert result.exists() 139 + assert result == storage_env.remotes_dir / "testkey1" / "hist" 140 + 141 + def test_get_hist_dir_no_create(self, storage_env): 142 + """get_hist_dir with ensure_exists=False doesn't create.""" 143 + result = get_hist_dir("nonexistent", ensure_exists=False) 144 + assert not result.exists() 145 + 146 + def test_append_history_record(self, storage_env): 147 + """append_history_record creates and appends to JSONL file.""" 148 + append_history_record( 149 + "testkey1", "20250103", {"type": "upload", "segment": "120000_300"} 150 + ) 151 + append_history_record( 152 + "testkey1", "20250103", {"type": "observed", "segment": "120000_300"} 153 + ) 154 + 155 + hist_path = storage_env.remotes_dir / "testkey1" / "hist" / "20250103.jsonl" 156 + assert hist_path.exists() 157 + 158 + with open(hist_path) as f: 159 + lines = f.readlines() 160 + 161 + assert len(lines) == 2 162 + assert json.loads(lines[0])["type"] == "upload" 163 + assert json.loads(lines[1])["type"] == "observed" 164 + 165 + def test_load_history_empty(self, storage_env): 166 + """load_history returns empty list when no history.""" 167 + result = load_history("testkey1", "20250103") 168 + assert result == [] 169 + 170 + def test_load_history(self, storage_env): 171 + """load_history returns all records.""" 172 + append_history_record("testkey1", "20250103", {"segment": "a"}) 173 + append_history_record("testkey1", "20250103", {"segment": "b"}) 174 + 175 + result = load_history("testkey1", "20250103") 176 + assert len(result) == 2 177 + assert result[0]["segment"] == "a" 178 + assert result[1]["segment"] == "b" 179 + 180 + 181 + class TestIncrementStat: 182 + """Tests for stat increment.""" 183 + 184 + def test_increment_stat_new_counter(self, storage_env): 185 + """increment_stat creates new counter.""" 186 + save_remote( 187 + { 188 + "key": "testkey123456789", 189 + "name": "test", 190 + "stats": {}, 191 + } 192 + ) 193 + 194 + increment_stat("testkey1", "segments_observed") 195 + 196 + loaded = load_remote("testkey123456789") 197 + assert loaded["stats"]["segments_observed"] == 1 198 + 199 + def test_increment_stat_existing_counter(self, storage_env): 200 + """increment_stat increments existing counter.""" 201 + save_remote( 202 + { 203 + "key": "testkey123456789", 204 + "name": "test", 205 + "stats": {"segments_observed": 5}, 206 + } 207 + ) 208 + 209 + increment_stat("testkey1", "segments_observed") 210 + 211 + loaded = load_remote("testkey123456789") 212 + assert loaded["stats"]["segments_observed"] == 6 213 + 214 + def test_increment_stat_missing_remote(self, storage_env): 215 + """increment_stat handles missing remote gracefully.""" 216 + # Should not raise 217 + increment_stat("nonexistent", "segments_observed")
+183
apps/remote/utils.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Shared utilities for the remote app. 5 + 6 + Provides common helpers for remote metadata management and sync history 7 + that are used by both routes.py and events.py. 8 + """ 9 + 10 + from __future__ import annotations 11 + 12 + import json 13 + import logging 14 + from pathlib import Path 15 + 16 + from apps.utils import get_app_storage_path 17 + 18 + logger = logging.getLogger(__name__) 19 + 20 + 21 + def get_remotes_dir() -> Path: 22 + """Get the remotes storage directory.""" 23 + return get_app_storage_path("remote", "remotes", ensure_exists=True) 24 + 25 + 26 + def get_hist_dir(key_prefix: str, ensure_exists: bool = True) -> Path: 27 + """Get the history directory for a remote. 28 + 29 + Args: 30 + key_prefix: First 8 chars of remote key 31 + ensure_exists: Create directory if it doesn't exist (default: True) 32 + 33 + Returns: 34 + Path to apps/remote/remotes/<key_prefix>/hist/ 35 + """ 36 + return get_app_storage_path( 37 + "remote", "remotes", key_prefix, "hist", ensure_exists=ensure_exists 38 + ) 39 + 40 + 41 + def load_remote(key: str) -> dict | None: 42 + """Load remote metadata by key. 43 + 44 + Args: 45 + key: Full remote authentication key 46 + 47 + Returns: 48 + Remote metadata dict if found and key matches, None otherwise 49 + """ 50 + remotes_dir = get_remotes_dir() 51 + remote_path = remotes_dir / f"{key[:8]}.json" 52 + if not remote_path.exists(): 53 + return None 54 + try: 55 + with open(remote_path) as f: 56 + data = json.load(f) 57 + # Verify full key matches 58 + if data.get("key") != key: 59 + return None 60 + return data 61 + except (json.JSONDecodeError, OSError): 62 + return None 63 + 64 + 65 + def save_remote(data: dict) -> bool: 66 + """Save remote metadata. 67 + 68 + Args: 69 + data: Remote metadata dict (must contain 'key' field) 70 + 71 + Returns: 72 + True if saved successfully, False otherwise 73 + """ 74 + key = data.get("key") 75 + if not key: 76 + return False 77 + remotes_dir = get_remotes_dir() 78 + remote_path = remotes_dir / f"{key[:8]}.json" 79 + try: 80 + with open(remote_path, "w") as f: 81 + json.dump(data, f, indent=2) 82 + return True 83 + except OSError: 84 + return False 85 + 86 + 87 + def list_remotes() -> list[dict]: 88 + """List all registered remotes. 89 + 90 + Returns: 91 + List of remote metadata dicts, sorted by created_at descending 92 + """ 93 + remotes_dir = get_remotes_dir() 94 + remotes = [] 95 + for remote_path in remotes_dir.glob("*.json"): 96 + try: 97 + with open(remote_path) as f: 98 + data = json.load(f) 99 + remotes.append(data) 100 + except (json.JSONDecodeError, OSError): 101 + continue 102 + remotes.sort(key=lambda x: x.get("created_at", 0), reverse=True) 103 + return remotes 104 + 105 + 106 + def find_remote_by_name(name: str) -> dict | None: 107 + """Find remote metadata by name. 108 + 109 + Args: 110 + name: Remote name to search for 111 + 112 + Returns: 113 + Remote metadata dict if found, None otherwise 114 + """ 115 + for remote in list_remotes(): 116 + if remote.get("name") == name: 117 + return remote 118 + return None 119 + 120 + 121 + def append_history_record(key_prefix: str, day: str, record: dict) -> None: 122 + """Append a record to the sync history file. 123 + 124 + Args: 125 + key_prefix: First 8 chars of remote key 126 + day: Day string (YYYYMMDD) 127 + record: Record to append (will be JSON-serialized) 128 + """ 129 + hist_dir = get_hist_dir(key_prefix) 130 + hist_path = hist_dir / f"{day}.jsonl" 131 + with open(hist_path, "a", encoding="utf-8") as f: 132 + f.write(json.dumps(record, ensure_ascii=False) + "\n") 133 + 134 + 135 + def load_history(key_prefix: str, day: str) -> list[dict]: 136 + """Load sync history for a remote on a given day. 137 + 138 + Args: 139 + key_prefix: First 8 chars of remote key 140 + day: Day string (YYYYMMDD) 141 + 142 + Returns: 143 + List of history records, empty if file doesn't exist 144 + """ 145 + hist_dir = get_hist_dir(key_prefix, ensure_exists=False) 146 + hist_path = hist_dir / f"{day}.jsonl" 147 + if not hist_path.exists(): 148 + return [] 149 + 150 + records = [] 151 + try: 152 + with open(hist_path, encoding="utf-8") as f: 153 + for line in f: 154 + line = line.strip() 155 + if line: 156 + records.append(json.loads(line)) 157 + except (json.JSONDecodeError, OSError) as e: 158 + logger.warning(f"Failed to load sync history {hist_path}: {e}") 159 + return records 160 + 161 + 162 + def increment_stat(key_prefix: str, stat_name: str) -> None: 163 + """Increment a stat counter for a remote. 164 + 165 + Args: 166 + key_prefix: First 8 chars of remote key 167 + stat_name: Name of the stat to increment (e.g., 'segments_observed') 168 + """ 169 + remotes_dir = get_remotes_dir() 170 + remote_path = remotes_dir / f"{key_prefix}.json" 171 + if not remote_path.exists(): 172 + return 173 + 174 + try: 175 + with open(remote_path) as f: 176 + data = json.load(f) 177 + 178 + data["stats"][stat_name] = data["stats"].get(stat_name, 0) + 1 179 + 180 + with open(remote_path, "w") as f: 181 + json.dump(data, f, indent=2) 182 + except (json.JSONDecodeError, OSError, KeyError) as e: 183 + logger.warning(f"Failed to update {stat_name} for {key_prefix}: {e}")