personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(observer): atomic writes for observer JSON metadata

Route both observer metadata writers (save_observer, increment_stat)
through atomic_write so a crash between truncate and rename cannot leave
the observer JSON empty or partial. The observer JSON holds the remote
pairing auth key; truncation there bricks sync until the remote device is
re-enrolled. Final-path 0o600 permissions and the existing return /
exception contracts are preserved.

Co-Authored-By: OpenAI Codex <codex@openai.com>

+58 -4
+55
apps/observer/tests/test_utils.py
··· 218 218 increment_stat("nonexistent", "segments_observed") 219 219 220 220 221 + class TestAtomicWriteCrashSafety: 222 + """Tests for atomic write crash safety.""" 223 + 224 + def test_save_observer_crash_preserves_existing_file( 225 + self, storage_env, monkeypatch 226 + ): 227 + """save_observer leaves prior observer data intact on replace failure.""" 228 + observer = { 229 + "key": "testkey123456789", 230 + "name": "original", 231 + "stats": {}, 232 + } 233 + assert save_observer(observer) is True 234 + 235 + def raising_stub(*args, **kwargs): 236 + raise OSError("simulated crash") 237 + 238 + monkeypatch.setattr("think.entities.core.os.replace", raising_stub) 239 + 240 + updated_observer = { 241 + "key": "testkey123456789", 242 + "name": "updated", 243 + "stats": {}, 244 + } 245 + assert save_observer(updated_observer) is False 246 + 247 + loaded = load_observer("testkey123456789") 248 + assert loaded is not None 249 + assert loaded["name"] == "original" 250 + assert list(storage_env.observers_dir.glob(".tmp_*")) == [] 251 + 252 + def test_increment_stat_crash_preserves_existing_file( 253 + self, storage_env, monkeypatch 254 + ): 255 + """increment_stat leaves prior observer data intact on replace failure.""" 256 + observer = { 257 + "key": "testkey123456789", 258 + "name": "test", 259 + "stats": {"events_received": 5}, 260 + } 261 + assert save_observer(observer) is True 262 + 263 + def raising_stub(*args, **kwargs): 264 + raise OSError("simulated crash") 265 + 266 + monkeypatch.setattr("think.entities.core.os.replace", raising_stub) 267 + 268 + increment_stat("testkey1", "events_received") 269 + 270 + loaded = load_observer("testkey123456789") 271 + assert loaded is not None 272 + assert loaded["stats"]["events_received"] == 5 273 + assert list(storage_env.observers_dir.glob(".tmp_*")) == [] 274 + 275 + 221 276 class TestFindSegmentBySha256: 222 277 """Tests for find_segment_by_sha256.""" 223 278
+3 -4
apps/observer/utils.py
··· 15 15 from pathlib import Path 16 16 17 17 from apps.utils import get_app_storage_path 18 + from think.entities.core import atomic_write 18 19 19 20 logger = logging.getLogger(__name__) 20 21 ··· 78 79 observers_dir = get_observers_dir() 79 80 observer_path = observers_dir / f"{key[:8]}.json" 80 81 try: 81 - with open(observer_path, "w") as f: 82 - json.dump(data, f, indent=2) 82 + atomic_write(observer_path, json.dumps(data, indent=2)) 83 83 os.chmod(observer_path, 0o600) 84 84 return True 85 85 except OSError: ··· 179 179 180 180 data["stats"][stat_name] = data["stats"].get(stat_name, 0) + 1 181 181 182 - with open(observer_path, "w") as f: 183 - json.dump(data, f, indent=2) 182 + atomic_write(observer_path, json.dumps(data, indent=2)) 184 183 os.chmod(observer_path, 0o600) 185 184 except (json.JSONDecodeError, OSError, KeyError) as e: 186 185 logger.warning(f"Failed to update {stat_name} for {key_prefix}: {e}")