personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Implement two-phase Plaud sync with catalog matching and import pipeline

Replace the PlaudBackend stub with a full sync implementation that fetches
the Plaud API catalog, matches files against existing imports by filename,
and optionally downloads + imports new recordings through the pipeline.

- Add --sync BACKEND and --save flags to sol import CLI
- Add match_existing_imports() for filename-based matching (exact, stem, sanitized)
- Add timestamp_from_start_time() for Plaud epoch conversion
- Re-check available files on each sync to catch manual imports between syncs
- Save incremental sync state to imports/plaud.json keyed by Plaud file ID
- Add scratch/migrate_plaud_imports.py for bootstrapping state from prior imports

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+633 -102
+276 -11
tests/test_importer_sync.py
··· 1 1 # SPDX-License-Identifier: AGPL-3.0-only 2 2 # Copyright (c) 2026 sol pbc 3 3 4 + import json 5 + from pathlib import Path 6 + from unittest.mock import patch 7 + 4 8 import pytest 5 9 6 10 ··· 18 22 state = { 19 23 "backend": "plaud", 20 24 "last_sync": "2026-02-14T12:00:00", 21 - "synced_files": { 25 + "files": { 22 26 "abc123": { 23 27 "filename": "test.opus", 24 - "synced_at": "2026-02-14T12:00:00", 28 + "status": "imported", 25 29 } 26 30 }, 27 31 } ··· 75 79 assert isinstance(PlaudBackend(), SyncableBackend) 76 80 77 81 78 - def test_plaud_sync_not_implemented(tmp_path, monkeypatch): 79 - """With token configured, PlaudBackend.sync() raises NotImplementedError.""" 80 - from think.importers.plaud import PlaudBackend 81 - 82 - monkeypatch.setenv("PLAUD_ACCESS_TOKEN", "test-token") 83 - with pytest.raises(NotImplementedError): 84 - PlaudBackend().sync(tmp_path) 85 - 86 - 87 82 def test_plaud_sync_requires_token(tmp_path, monkeypatch): 88 83 """Without token configured, PlaudBackend.sync() raises ValueError.""" 89 84 from think.importers.plaud import PlaudBackend ··· 104 99 main() 105 100 captured = capsys.readouterr() 106 101 assert "plaud" in captured.out 102 + 103 + 104 + # --------------------------------------------------------------------------- 105 + # timestamp_from_start_time 106 + # --------------------------------------------------------------------------- 107 + 108 + 109 + def test_timestamp_from_start_time_seconds(): 110 + """Handles epoch seconds.""" 111 + # 2026-01-15 10:30:00 local 112 + import datetime as dt 113 + 114 + from think.importers.plaud import timestamp_from_start_time 115 + 116 + epoch = dt.datetime(2026, 1, 15, 10, 30, 0).timestamp() 117 + ts = timestamp_from_start_time(epoch) 118 + assert ts == "20260115_103000" 119 + 120 + 121 + def test_timestamp_from_start_time_millis(): 122 + """Handles epoch milliseconds (Plaud format).""" 123 + import datetime as dt 124 + 125 + from think.importers.plaud import timestamp_from_start_time 126 + 127 + epoch_ms = dt.datetime(2026, 1, 15, 10, 30, 0).timestamp() * 1000 128 + ts = timestamp_from_start_time(epoch_ms) 129 + assert ts == "20260115_103000" 130 + 131 + 132 + # --------------------------------------------------------------------------- 133 + # match_existing_imports 134 + # --------------------------------------------------------------------------- 135 + 136 + 137 + def _create_import(journal_root: Path, timestamp: str, original_filename: str) -> None: 138 + """Helper: create a minimal imports/{timestamp}/import.json.""" 139 + import_dir = journal_root / "imports" / timestamp 140 + import_dir.mkdir(parents=True, exist_ok=True) 141 + meta = {"original_filename": original_filename} 142 + (import_dir / "import.json").write_text(json.dumps(meta), encoding="utf-8") 143 + 144 + 145 + def test_match_exact_filename(tmp_path): 146 + """Matches by exact filename.""" 147 + from think.importers.plaud import match_existing_imports 148 + 149 + _create_import(tmp_path, "20260115_103000", "Team Meeting.opus") 150 + plaud_files = [ 151 + {"id": "abc", "filename": "Team Meeting", "fullname": "hash1.opus"}, 152 + ] 153 + matches = match_existing_imports(tmp_path, plaud_files) 154 + assert matches == {"abc": "20260115_103000"} 155 + 156 + 157 + def test_match_sanitized_filename(tmp_path): 158 + """Matches sanitized filename with extension.""" 159 + from think.importers.plaud import match_existing_imports 160 + 161 + _create_import(tmp_path, "20260115_103000", "Team_Meeting.opus") 162 + plaud_files = [ 163 + {"id": "abc", "filename": "Team Meeting", "fullname": "hash1.opus"}, 164 + ] 165 + matches = match_existing_imports(tmp_path, plaud_files) 166 + assert matches == {"abc": "20260115_103000"} 167 + 168 + 169 + def test_match_no_match(tmp_path): 170 + """Returns empty dict when no match.""" 171 + from think.importers.plaud import match_existing_imports 172 + 173 + _create_import(tmp_path, "20260115_103000", "Something Else.m4a") 174 + plaud_files = [ 175 + {"id": "abc", "filename": "Team Meeting", "fullname": "hash1.opus"}, 176 + ] 177 + matches = match_existing_imports(tmp_path, plaud_files) 178 + assert matches == {} 179 + 180 + 181 + def test_match_no_imports_dir(tmp_path): 182 + """Returns empty dict when imports/ doesn't exist.""" 183 + from think.importers.plaud import match_existing_imports 184 + 185 + plaud_files = [ 186 + {"id": "abc", "filename": "Team Meeting", "fullname": "hash1.opus"}, 187 + ] 188 + matches = match_existing_imports(tmp_path, plaud_files) 189 + assert matches == {} 190 + 191 + 192 + def test_match_by_stem(tmp_path): 193 + """Matches by filename stem (without extension).""" 194 + from think.importers.plaud import match_existing_imports 195 + 196 + _create_import(tmp_path, "20260115_103000", "Team Meeting.m4a") 197 + plaud_files = [ 198 + {"id": "abc", "filename": "Team Meeting", "fullname": "hash1.opus"}, 199 + ] 200 + matches = match_existing_imports(tmp_path, plaud_files) 201 + assert matches == {"abc": "20260115_103000"} 202 + 203 + 204 + # --------------------------------------------------------------------------- 205 + # PlaudBackend.sync() — catalog mode 206 + # --------------------------------------------------------------------------- 207 + 208 + 209 + def _mock_list_files(_session, _token): 210 + """Return a small fake Plaud file list.""" 211 + return [ 212 + { 213 + "id": "file1", 214 + "filename": "Standup", 215 + "fullname": "aaa.opus", 216 + "filesize": 5000, 217 + "start_time": 1737000000000, 218 + }, 219 + { 220 + "id": "file2", 221 + "filename": "Retro", 222 + "fullname": "bbb.opus", 223 + "filesize": 8000, 224 + "start_time": 1737100000000, 225 + }, 226 + ] 227 + 228 + 229 + def test_plaud_sync_dry_run(tmp_path, monkeypatch): 230 + """Dry-run sync fetches catalog and saves state.""" 231 + from think.importers.plaud import PlaudBackend 232 + from think.importers.sync import load_sync_state 233 + 234 + monkeypatch.setenv("PLAUD_ACCESS_TOKEN", "test-token") 235 + 236 + with patch("think.importers.plaud.list_files", side_effect=_mock_list_files): 237 + result = PlaudBackend().sync(tmp_path, dry_run=True) 238 + 239 + assert result["total"] == 2 240 + assert result["available"] == 2 241 + assert result["imported"] == 0 242 + assert result["downloaded"] == 0 243 + 244 + # State was saved 245 + state = load_sync_state(tmp_path, "plaud") 246 + assert state is not None 247 + assert len(state["files"]) == 2 248 + assert state["files"]["file1"]["status"] == "available" 249 + 250 + 251 + def test_plaud_sync_matches_existing(tmp_path, monkeypatch): 252 + """Sync matches existing imports and marks them imported.""" 253 + from think.importers.plaud import PlaudBackend 254 + from think.importers.sync import load_sync_state 255 + 256 + monkeypatch.setenv("PLAUD_ACCESS_TOKEN", "test-token") 257 + _create_import(tmp_path, "20260116_051320", "Standup.opus") 258 + 259 + with patch("think.importers.plaud.list_files", side_effect=_mock_list_files): 260 + result = PlaudBackend().sync(tmp_path, dry_run=True) 261 + 262 + assert result["imported"] == 1 263 + assert result["available"] == 1 264 + 265 + state = load_sync_state(tmp_path, "plaud") 266 + assert state["files"]["file1"]["status"] == "imported" 267 + assert state["files"]["file1"]["import_timestamp"] == "20260116_051320" 268 + assert state["files"]["file2"]["status"] == "available" 269 + 270 + 271 + def test_plaud_sync_incremental(tmp_path, monkeypatch): 272 + """Second sync preserves existing state and detects new files.""" 273 + from think.importers.plaud import PlaudBackend 274 + from think.importers.sync import load_sync_state, save_sync_state 275 + 276 + monkeypatch.setenv("PLAUD_ACCESS_TOKEN", "test-token") 277 + 278 + # Pre-seed state with file1 already imported 279 + save_sync_state( 280 + tmp_path, 281 + "plaud", 282 + { 283 + "backend": "plaud", 284 + "files": { 285 + "file1": { 286 + "filename": "Standup", 287 + "fullname": "aaa.opus", 288 + "filesize": 5000, 289 + "start_time": 1737000000000, 290 + "status": "imported", 291 + "import_timestamp": "20260116_051320", 292 + } 293 + }, 294 + }, 295 + ) 296 + 297 + with patch("think.importers.plaud.list_files", side_effect=_mock_list_files): 298 + result = PlaudBackend().sync(tmp_path, dry_run=True) 299 + 300 + assert result["total"] == 2 301 + assert result["imported"] == 1 302 + assert result["available"] == 1 303 + 304 + state = load_sync_state(tmp_path, "plaud") 305 + # file1 preserved as imported 306 + assert state["files"]["file1"]["status"] == "imported" 307 + # file2 detected as new available 308 + assert state["files"]["file2"]["status"] == "available" 309 + 310 + 311 + def test_plaud_sync_promotes_manually_imported(tmp_path, monkeypatch): 312 + """Available file gets promoted to imported if manually imported between syncs.""" 313 + from think.importers.plaud import PlaudBackend 314 + from think.importers.sync import load_sync_state, save_sync_state 315 + 316 + monkeypatch.setenv("PLAUD_ACCESS_TOKEN", "test-token") 317 + 318 + # First sync: file2 is available 319 + save_sync_state( 320 + tmp_path, 321 + "plaud", 322 + { 323 + "backend": "plaud", 324 + "files": { 325 + "file1": { 326 + "filename": "Standup", 327 + "status": "imported", 328 + "import_timestamp": "20260116_051320", 329 + }, 330 + "file2": { 331 + "filename": "Retro", 332 + "fullname": "bbb.opus", 333 + "filesize": 8000, 334 + "start_time": 1737100000000, 335 + "status": "available", 336 + }, 337 + }, 338 + }, 339 + ) 340 + 341 + # Simulate manual import of file2 (creates imports/*/import.json) 342 + _create_import(tmp_path, "20260117_134640", "Retro.opus") 343 + 344 + # Second sync: file2 should be promoted to imported 345 + with patch("think.importers.plaud.list_files", side_effect=_mock_list_files): 346 + result = PlaudBackend().sync(tmp_path, dry_run=True) 347 + 348 + assert result["imported"] == 2 349 + assert result["available"] == 0 350 + 351 + state = load_sync_state(tmp_path, "plaud") 352 + assert state["files"]["file2"]["status"] == "imported" 353 + assert state["files"]["file2"]["import_timestamp"] == "20260117_134640" 354 + 355 + 356 + def test_plaud_sync_cli_flag(capsys, monkeypatch, tmp_path): 357 + """sol import --sync plaud runs sync in dry-run mode.""" 358 + import sys 359 + 360 + from think.importers.cli import main 361 + 362 + monkeypatch.setattr(sys, "argv", ["sol import", "--sync", "plaud"]) 363 + monkeypatch.setenv("JOURNAL_PATH", str(tmp_path)) 364 + monkeypatch.setenv("PLAUD_ACCESS_TOKEN", "test-token") 365 + 366 + with patch("think.importers.plaud.list_files", side_effect=_mock_list_files): 367 + main() 368 + 369 + captured = capsys.readouterr() 370 + assert "Total recordings:" in captured.out 371 + assert "Available to import:" in captured.out
+92
think/importers/cli.py
··· 89 89 return timestamp 90 90 91 91 92 + def _run_sync(backend_name: str, *, dry_run: bool = True) -> None: 93 + """Run sync for a named backend and print results.""" 94 + from think.importers.plaud import format_size 95 + from think.importers.sync import get_syncable_backends, load_sync_state 96 + 97 + journal_root = Path(get_journal()) 98 + 99 + # Find the requested backend 100 + backends = get_syncable_backends() 101 + backend = None 102 + for b in backends: 103 + if b.name == backend_name: 104 + backend = b 105 + break 106 + 107 + if backend is None: 108 + available = ", ".join(b.name for b in backends) or "(none)" 109 + raise SystemExit( 110 + f"Unknown sync backend: {backend_name}\n" f"Available backends: {available}" 111 + ) 112 + 113 + mode = "save" if not dry_run else "catalog" 114 + print(f"Syncing {backend_name} ({mode} mode)...") 115 + print() 116 + 117 + try: 118 + result = backend.sync(journal_root, dry_run=dry_run) 119 + except ValueError as e: 120 + raise SystemExit(str(e)) 121 + except RuntimeError as e: 122 + raise SystemExit(f"Sync failed: {e}") 123 + 124 + total = result.get("total", 0) 125 + imported = result.get("imported", 0) 126 + available = result.get("available", 0) 127 + downloaded = result.get("downloaded", 0) 128 + errors = result.get("errors", []) 129 + 130 + # Print summary 131 + print() 132 + print(f" Total recordings: {total}") 133 + print(f" Already imported: {imported}") 134 + print(f" Available to import: {available}") 135 + 136 + if downloaded > 0: 137 + print(f" Downloaded + imported: {downloaded}") 138 + if errors: 139 + print(f" Errors: {len(errors)}") 140 + for err in errors: 141 + print(f" - {err}") 142 + 143 + # In dry-run mode, show available files 144 + if dry_run and available > 0: 145 + state = load_sync_state(journal_root, backend_name) 146 + if state: 147 + files = state.get("files", {}) 148 + avail_files = [ 149 + (fid, info) 150 + for fid, info in files.items() 151 + if info.get("status") == "available" 152 + ] 153 + if avail_files: 154 + print() 155 + print("Available recordings:") 156 + for _fid, info in avail_files: 157 + name = info.get("filename", "unnamed") 158 + size = info.get("filesize", 0) 159 + print(f" - {name} ({format_size(size)})") 160 + print() 161 + print("Run with --save to download and import these files:") 162 + print(f" sol import --sync {backend_name} --save") 163 + 164 + if not dry_run and available == 0 and downloaded == 0: 165 + print() 166 + print("Everything is up to date.") 167 + 168 + 92 169 def main() -> None: 93 170 global _callosum, _message_queue, _import_id, _current_stage, _start_time 94 171 global _stage_start_time, _stages_run, _status_thread, _status_running ··· 142 219 action="store_true", 143 220 help="List syncable importer backends", 144 221 ) 222 + parser.add_argument( 223 + "--sync", 224 + type=str, 225 + metavar="BACKEND", 226 + help="Sync catalog from a backend (e.g., plaud). Shows status by default.", 227 + ) 228 + parser.add_argument( 229 + "--save", 230 + action="store_true", 231 + help="With --sync: download and import new files (default is dry-run)", 232 + ) 145 233 args, extra = setup_cli(parser, parse_known=True) 146 234 if extra and not args.timestamp: 147 235 args.timestamp = extra[0] ··· 156 244 print(f" {b.name}") 157 245 else: 158 246 print("No syncable backends available") 247 + return 248 + 249 + if args.sync: 250 + _run_sync(args.sync, dry_run=not args.save) 159 251 return 160 252 161 253 if not args.media:
+265 -91
think/importers/plaud.py
··· 3 3 4 4 """Plaud audio recorder API utilities and syncable backend.""" 5 5 6 + import datetime as dt 6 7 import json 8 + import logging 7 9 import os 8 10 import pathlib 9 11 import re 12 + import subprocess 10 13 import sys 11 14 import tempfile 12 15 from pathlib import Path ··· 15 18 import requests 16 19 from requests.adapters import HTTPAdapter 17 20 from urllib3.util.retry import Retry 21 + 22 + logger = logging.getLogger(__name__) 18 23 19 24 API_BASE = "https://api.plaud.ai" 20 25 ··· 172 177 return f"{bytes_size:.1f}TB" 173 178 174 179 175 - def sync_files( 176 - session: requests.Session, 177 - token: str, 178 - target_dir: pathlib.Path, 179 - dry_run: bool = True, 180 - ) -> int: 181 - """ 182 - Sync files from Plaud API to local directory. 180 + def timestamp_from_start_time(start_time: int | float) -> str: 181 + """Convert Plaud epoch milliseconds to YYYYMMDD_HHMMSS timestamp.""" 182 + # Plaud start_time is milliseconds since epoch 183 + if start_time > 1e12: 184 + start_time = start_time / 1000 185 + d = dt.datetime.fromtimestamp(start_time) 186 + return d.strftime("%Y%m%d_%H%M%S") 183 187 184 - Returns the number of files that were (or would be) downloaded. 188 + 189 + def match_existing_imports( 190 + journal_root: Path, plaud_files: list[dict[str, Any]] 191 + ) -> dict[str, str]: 192 + """Match Plaud files against existing imports by filename. 193 + 194 + Scans all imports/*/import.json for original_filename and matches against 195 + Plaud filenames using exact and sanitized comparison. 196 + 197 + Returns: 198 + Dict mapping plaud file ID -> import timestamp for matches. 185 199 """ 186 - file_list = list_files(session, token) 187 - if file_list is None: 188 - return -1 200 + imports_dir = journal_root / "imports" 201 + if not imports_dir.exists(): 202 + return {} 189 203 190 - target_dir.mkdir(parents=True, exist_ok=True) 204 + # Build index: normalized filename stem -> import timestamp 205 + filename_index: dict[str, str] = {} 206 + for import_dir in imports_dir.iterdir(): 207 + if not import_dir.is_dir(): 208 + continue 209 + meta_path = import_dir / "import.json" 210 + if not meta_path.exists(): 211 + continue 212 + try: 213 + meta = json.loads(meta_path.read_text(encoding="utf-8")) 214 + orig = meta.get("original_filename", "") 215 + if orig: 216 + # Index by exact name and by stem (without extension) 217 + filename_index[orig] = import_dir.name 218 + stem = pathlib.Path(orig).stem 219 + if stem: 220 + filename_index[stem] = import_dir.name 221 + # Also index sanitized form 222 + sanitized = sanitize_filename(stem) if stem else "" 223 + if sanitized and sanitized != stem: 224 + filename_index[sanitized] = import_dir.name 225 + except (json.JSONDecodeError, OSError): 226 + continue 191 227 192 - to_download = [] 193 - already_exist = [] 228 + # Match each Plaud file against the index 229 + matches: dict[str, str] = {} 230 + for file_info in plaud_files: 231 + file_id = file_info.get("id", "") 232 + filename = file_info.get("filename", "") 233 + fullname = file_info.get("fullname", "") 194 234 195 - print(f"\nChecking local files in: {target_dir}") 196 - print("=" * 70) 235 + # Try matching strategies in priority order 236 + candidates = [ 237 + filename, # exact display name 238 + pathlib.Path(fullname).stem if fullname else "", # hash stem 239 + sanitize_filename(filename) if filename else "", # sanitized display name 240 + ] 241 + ext = pathlib.Path(fullname).suffix if fullname else ".opus" 242 + if filename: 243 + candidates.append(f"{filename}{ext}") # display name + extension 244 + candidates.append(f"{sanitize_filename(filename)}{ext}") # sanitized + ext 197 245 198 - for file_info in file_list: 199 - file_id = file_info.get("id") 200 - filename = file_info.get("filename", "unnamed") 201 - filesize = file_info.get("filesize", 0) 202 - fullname = file_info.get("fullname", f"{file_id}.opus") 246 + for candidate in candidates: 247 + if candidate and candidate in filename_index: 248 + matches[file_id] = filename_index[candidate] 249 + break 203 250 204 - # Use the fullname (e.g., "hash.opus") or sanitize the filename 205 - # Let's use the fullname which includes extension 206 - safe_name = sanitize_filename(filename) 207 - # Get extension from fullname 208 - ext = pathlib.Path(fullname).suffix or ".opus" 209 - local_filename = f"{safe_name}{ext}" 210 - local_path = target_dir / local_filename 251 + return matches 211 252 212 - if local_path.exists(): 213 - already_exist.append((file_info, local_path)) 214 - else: 215 - to_download.append((file_info, local_path)) 216 253 217 - print(f"✓ {len(already_exist)} files already exist locally") 218 - print(f"⬇ {len(to_download)} files need to be downloaded") 254 + class PlaudBackend: 255 + """Syncable backend for Plaud audio recorder service.""" 219 256 220 - if not to_download: 221 - print("\n✓ All files are already synced!") 222 - return 0 257 + name: str = "plaud" 223 258 224 - if dry_run: 225 - print(f"\n{'DRY RUN MODE':-^70}") 226 - print("The following files would be downloaded:\n") 227 - total_size = 0 228 - for file_info, local_path in to_download: 229 - filename = file_info.get("filename", "unnamed") 230 - filesize = file_info.get("filesize", 0) 231 - total_size += filesize 232 - print(f" • {local_path.name}") 233 - print(f" Size: {format_size(filesize)}, Original: {filename}") 259 + def sync(self, journal_root: Path, *, dry_run: bool = True) -> dict[str, Any]: 260 + """Sync catalog from Plaud service. 234 261 235 - print(f"\nTotal download size: {format_size(total_size)}") 236 - print("\nTo actually download these files, run with --save flag") 237 - return len(to_download) 262 + Fetches the file list from the Plaud API, matches against existing 263 + imports, and saves sync state. When dry_run=False, downloads and 264 + imports new files through the import pipeline. 238 265 239 - # Actually download files 240 - print(f"\n{'DOWNLOADING FILES':-^70}\n") 241 - downloaded = 0 242 - failed = 0 266 + Returns: 267 + Summary dict with total, imported, available, downloaded, errors. 268 + """ 269 + from think.importers.sync import load_sync_state, save_sync_state 243 270 244 - for idx, (file_info, local_path) in enumerate(to_download, 1): 245 - file_id = file_info.get("id") 246 - filename = file_info.get("filename", "unnamed") 247 - filesize = file_info.get("filesize", 0) 271 + token = os.getenv("PLAUD_ACCESS_TOKEN") 272 + if not token: 273 + raise ValueError( 274 + "PLAUD_ACCESS_TOKEN not configured — set in Settings > API Keys" 275 + ) 248 276 249 - print(f"[{idx}/{len(to_download)}] {local_path.name} ({format_size(filesize)})") 277 + session = make_session() 250 278 251 - # Get temp URL 252 - temp_url = get_temp_url(session, token, file_id) 253 - if not temp_url: 254 - print(" ✗ Failed to get download URL", file=sys.stderr) 255 - failed += 1 256 - continue 279 + # Fetch current file list from Plaud API 280 + file_list = list_files(session, token) 281 + if file_list is None: 282 + raise RuntimeError("Failed to fetch file list from Plaud API") 257 283 258 - # Download file 259 - if download_to_file(session, temp_url, local_path): 260 - downloaded += 1 261 - print(" ✓ Downloaded") 262 - else: 263 - failed += 1 264 - print(" ✗ Download failed", file=sys.stderr) 284 + # Load existing sync state 285 + state = load_sync_state(journal_root, "plaud") or { 286 + "backend": "plaud", 287 + "files": {}, 288 + } 289 + known_files: dict[str, dict] = state.get("files", {}) 265 290 266 - print(f"\n{'SUMMARY':-^70}") 267 - print(f"✓ Downloaded: {downloaded}") 268 - if failed > 0: 269 - print(f"✗ Failed: {failed}") 270 - print(f"Total synced: {len(already_exist) + downloaded}/{len(file_list)}") 291 + # Collect files that need matching: new files + still-available files 292 + # (re-check available in case they were imported manually since last sync) 293 + needs_matching = [ 294 + f 295 + for f in file_list 296 + if f.get("id") not in known_files 297 + or known_files.get(f.get("id", ""), {}).get("status") == "available" 298 + ] 299 + matches = match_existing_imports(journal_root, needs_matching) 271 300 272 - return downloaded 301 + # Merge into state 302 + for file_info in file_list: 303 + file_id = file_info.get("id", "") 304 + if not file_id: 305 + continue 306 + 307 + if file_id in known_files: 308 + # Preserve existing status, update metadata 309 + entry = known_files[file_id] 310 + entry["filename"] = file_info.get("filename", entry.get("filename", "")) 311 + entry["filesize"] = file_info.get("filesize", entry.get("filesize", 0)) 312 + # Promote available -> imported if matched since last sync 313 + if entry.get("status") == "available" and file_id in matches: 314 + entry["status"] = "imported" 315 + entry["import_timestamp"] = matches[file_id] 316 + entry["matched_at"] = dt.datetime.now().isoformat() 317 + continue 273 318 319 + # New file — check if matched to an existing import 320 + entry: dict[str, Any] = { 321 + "filename": file_info.get("filename", "unnamed"), 322 + "fullname": file_info.get("fullname", ""), 323 + "filesize": file_info.get("filesize", 0), 324 + "start_time": file_info.get("start_time", 0), 325 + } 274 326 275 - class PlaudBackend: 276 - """Syncable backend for Plaud audio recorder service.""" 327 + if file_id in matches: 328 + entry["status"] = "imported" 329 + entry["import_timestamp"] = matches[file_id] 330 + entry["matched_at"] = dt.datetime.now().isoformat() 331 + else: 332 + entry["status"] = "available" 333 + 334 + known_files[file_id] = entry 335 + 336 + # Compute summary 337 + total = len(known_files) 338 + imported = sum(1 for f in known_files.values() if f.get("status") == "imported") 339 + available = sum( 340 + 1 for f in known_files.values() if f.get("status") == "available" 341 + ) 277 342 278 - name: str = "plaud" 343 + result: dict[str, Any] = { 344 + "total": total, 345 + "imported": imported, 346 + "available": available, 347 + "downloaded": 0, 348 + "errors": [], 349 + } 279 350 280 - def sync(self, journal_root: Path, *, dry_run: bool = True) -> dict[str, Any]: 281 - """Sync files from Plaud service. 351 + # Download and import if not dry-run 352 + if not dry_run and available > 0: 353 + to_process = [ 354 + (fid, info) 355 + for fid, info in known_files.items() 356 + if info.get("status") == "available" 357 + ] 358 + downloaded = 0 359 + errors: list[str] = [] 282 360 283 - Not yet implemented — Phase 2 will wire up actual sync. 284 - """ 285 - token = os.getenv("PLAUD_ACCESS_TOKEN") 286 - if not token: 287 - raise ValueError( 288 - "PLAUD_ACCESS_TOKEN not configured — set in Settings > API Keys" 361 + for idx, (file_id, info) in enumerate(to_process, 1): 362 + filename = info.get("filename", "unnamed") 363 + filesize = info.get("filesize", 0) 364 + start_time = info.get("start_time", 0) 365 + 366 + # Derive timestamp from Plaud recording start time 367 + if start_time: 368 + ts = timestamp_from_start_time(start_time) 369 + else: 370 + print( 371 + f" [{idx}/{len(to_process)}] {filename} — skipping " 372 + f"(no start_time)", 373 + file=sys.stderr, 374 + ) 375 + errors.append(f"{filename}: no start_time") 376 + continue 377 + 378 + fullname = info.get("fullname", f"{file_id}.opus") 379 + ext = pathlib.Path(fullname).suffix or ".opus" 380 + safe_name = f"{sanitize_filename(filename)}{ext}" 381 + 382 + print( 383 + f" [{idx}/{len(to_process)}] {filename} " 384 + f"({format_size(filesize)})" 385 + ) 386 + 387 + # Download to imports/{timestamp}/ 388 + import_dir = journal_root / "imports" / ts 389 + import_dir.mkdir(parents=True, exist_ok=True) 390 + dest_path = import_dir / safe_name 391 + 392 + # Get temp URL and download 393 + temp_url = get_temp_url(session, token, file_id) 394 + if not temp_url: 395 + msg = f"{filename}: failed to get download URL" 396 + print(f" FAILED — {msg}", file=sys.stderr) 397 + errors.append(msg) 398 + continue 399 + 400 + if not download_to_file(session, temp_url, dest_path): 401 + msg = f"{filename}: download failed" 402 + print(f" FAILED — {msg}", file=sys.stderr) 403 + errors.append(msg) 404 + continue 405 + 406 + print(f" Downloaded -> {dest_path.name}") 407 + 408 + # Run through import pipeline 409 + import_cmd = [ 410 + "sol", 411 + "import", 412 + str(dest_path), 413 + "--timestamp", 414 + ts, 415 + "--source", 416 + "plaud", 417 + "--auto", 418 + "--skip-summary", 419 + ] 420 + print(f" Importing {ts}...") 421 + try: 422 + proc = subprocess.run( 423 + import_cmd, 424 + capture_output=True, 425 + text=True, 426 + timeout=300, 427 + ) 428 + if proc.returncode == 0: 429 + info["status"] = "imported" 430 + info["import_timestamp"] = ts 431 + info["imported_at"] = dt.datetime.now().isoformat() 432 + downloaded += 1 433 + print(" Imported successfully") 434 + else: 435 + stderr_tail = ( 436 + proc.stderr.strip().split("\n")[-1] if proc.stderr else "" 437 + ) 438 + msg = f"{filename}: import failed — {stderr_tail}" 439 + print(" FAILED — import error", file=sys.stderr) 440 + logger.warning( 441 + "Import failed for %s: %s", filename, proc.stderr 442 + ) 443 + errors.append(msg) 444 + except subprocess.TimeoutExpired: 445 + msg = f"{filename}: import timed out" 446 + print(" FAILED — timed out", file=sys.stderr) 447 + errors.append(msg) 448 + 449 + result["downloaded"] = downloaded 450 + result["errors"] = errors 451 + # Update available count after processing 452 + result["imported"] = sum( 453 + 1 for f in known_files.values() if f.get("status") == "imported" 289 454 ) 290 - raise NotImplementedError("Plaud sync execution is not yet implemented") 455 + result["available"] = sum( 456 + 1 for f in known_files.values() if f.get("status") == "available" 457 + ) 458 + 459 + # Save updated state 460 + state["files"] = known_files 461 + state["last_sync"] = dt.datetime.now().isoformat() 462 + save_sync_state(journal_root, "plaud", state) 463 + 464 + return result 291 465 292 466 293 467 # Module-level backend instance for discovery