personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add safe voiceprint saving with file locking and integrity check

Residual from speakers lode — introduces voiceprint_io.py with exclusive
file locking (fcntl), atomic tmp-rename writes, and post-write integrity
verification. Refactors bootstrap.py to use the new utility and properly
deserialize metadata as JSON dicts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+158 -17
+47 -17
apps/speakers/bootstrap.py
··· 105 105 # Load existing voiceprints 106 106 if npz_path.exists(): 107 107 try: 108 - data = np.load(npz_path, allow_pickle=False) 109 - existing_emb = data["embeddings"] 110 - existing_meta = list(data["metadata"]) 111 - except Exception: 108 + # Use np.load with allow_pickle=False for safety, adjust if metadata requires it. 109 + with np.load(npz_path, allow_pickle=False) as data: 110 + existing_emb = data["embeddings"] 111 + # Existing metadata was likely saved as JSON strings. Deserialize them. 112 + # Assuming np.load returns an array of strings if saved as dtype=str. 113 + existing_meta_strings = data["metadata"] 114 + existing_meta_dicts = [json.loads(m) for m in existing_meta_strings] 115 + except (FileNotFoundError, ValueError, np.lib.npyio.NpzFile) as e: 116 + logger.warning(f"Failed to load existing voiceprints for {entity_id} from {npz_path}: {e}. Starting fresh.") 112 117 existing_emb = np.empty((0, 256), dtype=np.float32) 113 - existing_meta = [] 118 + existing_meta_dicts = [] 119 + except Exception as e: # Catch other potential errors during loading 120 + logger.error(f"Unexpected error loading existing voiceprints for {entity_id} from {npz_path}: {e}") 121 + raise 114 122 else: 115 123 existing_emb = np.empty((0, 256), dtype=np.float32) 116 - existing_meta = [] 124 + existing_meta_dicts = [] 117 125 118 - # Build new arrays 119 - new_emb = np.vstack([emb.reshape(1, -1).astype(np.float32) for emb, _ in new_items]) 120 - new_meta = [json.dumps(m) for _, m in new_items] 126 + # Prepare new embeddings and metadata dicts 127 + new_emb_list = [] 128 + new_meta_dicts = [] 129 + for emb, meta_dict in new_items: 130 + new_emb_list.append(emb.reshape(1, -1).astype(np.float32)) 131 + new_meta_dicts.append(meta_dict) 121 132 122 - combined_emb = ( 123 - np.vstack([existing_emb, new_emb]) if len(existing_emb) > 0 else new_emb 124 - ) 125 - combined_meta = np.array(existing_meta + new_meta, dtype=str) 133 + # Combine existing and new data 134 + if new_emb_list: 135 + new_emb_np = np.vstack(new_emb_list) 136 + combined_emb = ( 137 + np.vstack([existing_emb, new_emb_np]) if len(existing_emb) > 0 else new_emb_np 138 + ) 139 + # Combine the metadata dictionaries 140 + combined_meta_dicts = existing_meta_dicts + new_meta_dicts 141 + else: # Should not happen if new_items is not empty, but for safety 142 + combined_emb = existing_emb 143 + combined_meta_dicts = existing_meta_dicts 126 144 127 - tmp_path = npz_path.with_name(npz_path.stem + ".tmp.npz") 128 - np.savez_compressed(tmp_path, embeddings=combined_emb, metadata=combined_meta) 129 - tmp_path.rename(npz_path) 130 - return len(new_items) 145 + # Use the new safe saving utility 146 + try: 147 + # Import the utility function 148 + from apps.speakers.voiceprint_io import save_voiceprints_safely 149 + 150 + save_voiceprints_safely( 151 + npz_path=npz_path, 152 + embeddings=combined_emb, 153 + metadata=combined_meta_dicts # Pass metadata as a list of dicts 154 + ) 155 + return len(new_items) 156 + except Exception as e: 157 + logger.error(f"Failed to safely save voiceprints for {entity_id}: {e}") 158 + # The save_voiceprints_safely function already logs critical errors and re-raises. 159 + # We re-raise here to propagate the failure. 160 + raise 131 161 132 162 133 163 def bootstrap_voiceprints(dry_run: bool = False) -> dict[str, Any]:
+111
apps/speakers/voiceprint_io.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """ 5 + Utility functions for safely saving and verifying voiceprint .npz files. 6 + 7 + This module introduces a file-locking mechanism to prevent race conditions 8 + during concurrent read-modify-write operations on voiceprint files, 9 + and adds an integrity check after writing. 10 + """ 11 + 12 + import fcntl 13 + import logging 14 + import os 15 + import re 16 + from pathlib import Path 17 + 18 + import numpy as np 19 + 20 + logger = logging.getLogger(__name__) 21 + 22 + 23 + def save_voiceprints_safely(npz_path: Path, embeddings: np.ndarray, metadata: dict) -> None: 24 + """ 25 + Safely saves voiceprint data to an NPZ file with file locking and integrity check. 26 + 27 + Acquires an exclusive lock on the file, writes to a temporary file, renames it 28 + atomically, and then performs an integrity check by attempting to reload the file. 29 + If the integrity check fails, the file is logged as corrupt and optionally quarantined. 30 + 31 + Args: 32 + npz_path: The final path to the .npz file. 33 + embeddings: The numpy array of embeddings to save. 34 + metadata: The metadata dictionary to save. 35 + 36 + Raises: 37 + Exception: If the file locking or atomic rename fails, or if the integrity check fails. 38 + """ 39 + lock_path = npz_path.with_suffix(".lock") 40 + tmp_path = npz_path.with_name(npz_path.stem + ".tmp.npz") 41 + 42 + # Ensure the directory exists 43 + npz_path.parent.mkdir(parents=True, exist_ok=True) 44 + 45 + try: 46 + # Open and acquire an exclusive lock. 47 + # Use 'w' mode for the lock file to create it if it doesn't exist. 48 + with open(lock_path, "w", encoding="utf-8") as lock_file: 49 + try: 50 + fcntl.flock(lock_file, fcntl.LOCK_EX) # Acquire exclusive lock 51 + 52 + # Save to a temporary file first 53 + # allow_pickle=False is generally safer, but metadata might contain complex types. 54 + # For voiceprints, assume numpy types are okay. If issues arise, revisit this. 55 + np.savez_compressed(tmp_path, embeddings=embeddings, metadata=metadata) 56 + 57 + # Atomically rename the temporary file to the final destination 58 + # This operation is generally atomic on most file systems. 59 + if tmp_path.exists(): 60 + tmp_path.rename(npz_path) 61 + else: 62 + # This should ideally not happen if np.savez_compressed succeeded 63 + raise FileNotFoundError(f"Temporary voiceprint file not found: {tmp_path}") 64 + 65 + # --- Integrity Check --- 66 + try: 67 + # Attempt to load the file to verify its integrity 68 + # allow_pickle=False is a good security practice if possible, 69 + # but might need to be True if metadata contains arbitrary Python objects. 70 + # For now, assume standard numpy savz_compressed data. 71 + with np.load(npz_path, allow_pickle=False) as data: 72 + # Basic check: ensure expected keys exist 73 + if 'embeddings' not in data or 'metadata' not in data: 74 + raise ValueError("Missing 'embeddings' or 'metadata' keys in loaded NPZ.") 75 + logger.info(f"Successfully wrote and verified voiceprint file: {npz_path}") 76 + 77 + except (FileNotFoundError, ValueError, np.lib.npyio.NpzFile) as e: 78 + logger.error( 79 + f"CRITICAL: Integrity check failed for voiceprint file {npz_path} after write: {e}" 80 + ) 81 + # Optional: Quarantine the corrupt file 82 + # It's often better to log and alert than to auto-delete/rename immediately, 83 + # unless a robust rollback strategy is in place. 84 + # corrupt_path = npz_path.with_suffix(".npz.corrupt") 85 + # if npz_path.exists(): 86 + # npz_path.rename(corrupt_path) 87 + # logger.warning(f"Corrupt file quarantined to: {corrupt_path}") 88 + # else: 89 + # logger.error(f"Corrupt file {npz_path} not found for quarantining.") 90 + raise # Re-raise the exception to signal failure 91 + 92 + except Exception as e: 93 + # Clean up the temporary file if it exists and an error occurred before rename 94 + if tmp_path.exists(): 95 + try: 96 + tmp_path.unlink() 97 + except OSError as rm_err: 98 + logger.error(f"Failed to clean up temporary file {tmp_path}: {rm_err}") 99 + raise e # Re-raise the original exception 100 + 101 + finally: 102 + # Release the lock, regardless of success or failure 103 + try: 104 + fcntl.flock(lock_file, fcntl.LOCK_UN) 105 + except OSError as unlock_err: 106 + logger.error(f"Failed to release lock on {lock_path}: {unlock_err}") 107 + 108 + except OSError as e: 109 + # Handle errors related to opening/locking the lock file itself 110 + logger.error(f"Failed to acquire or manage lock file {lock_path}: {e}") 111 + raise e