personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add concurrency-safe save/update for detected entities

Introduce save_detected_entity and update_detected_entity with fcntl
file locking to serialize concurrent writers to the same facet+day
file, preventing lost updates. Includes randomized backoff retry on
transient OS errors.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+259 -10
+114
tests/test_entities.py
··· 30 30 parse_knowledge_graph_entities, 31 31 rename_entity_memory, 32 32 resolve_entity, 33 + save_detected_entity, 33 34 save_entities, 34 35 save_observations, 35 36 touch_entities_from_activity, 36 37 touch_entity, 37 38 unblock_journal_entity, 39 + update_detected_entity, 38 40 validate_aka_uniqueness, 39 41 ) 40 42 ··· 284 286 assert "acme" in journal_ids 285 287 assert "alice" in journal_ids 286 288 assert "beta_corp" in journal_ids 289 + 290 + 291 + def test_save_detected_entity_basic(fixture_journal, tmp_path): 292 + """Test save_detected_entity adds an entity with locking.""" 293 + os.environ["JOURNAL_PATH"] = str(tmp_path) 294 + (tmp_path / "facets" / "test_facet" / "entities").mkdir(parents=True) 295 + 296 + result = save_detected_entity("test_facet", "20250101", "Person", "Alice", "Friend") 297 + assert result["name"] == "Alice" 298 + assert result["type"] == "Person" 299 + 300 + loaded = load_entities("test_facet", "20250101") 301 + assert len(loaded) == 1 302 + assert loaded[0]["name"] == "Alice" 303 + 304 + 305 + def test_save_detected_entity_duplicate(fixture_journal, tmp_path): 306 + """Test save_detected_entity raises on duplicate.""" 307 + os.environ["JOURNAL_PATH"] = str(tmp_path) 308 + (tmp_path / "facets" / "test_facet" / "entities").mkdir(parents=True) 309 + 310 + save_detected_entity("test_facet", "20250101", "Person", "Alice", "Friend") 311 + 312 + import pytest as _pytest 313 + 314 + with _pytest.raises(ValueError, match="already detected"): 315 + save_detected_entity("test_facet", "20250101", "Person", "Alice", "Different") 316 + 317 + 318 + def test_save_detected_entity_concurrent(fixture_journal, tmp_path): 319 + """Test concurrent save_detected_entity calls don't lose data.""" 320 + import threading 321 + 322 + os.environ["JOURNAL_PATH"] = str(tmp_path) 323 + (tmp_path / "facets" / "test_facet" / "entities").mkdir(parents=True) 324 + 325 + errors = [] 326 + count = 10 327 + 328 + def detect_entity(i): 329 + try: 330 + save_detected_entity( 331 + "test_facet", "20250101", "Person", f"Entity{i}", f"Description {i}" 332 + ) 333 + except Exception as exc: 334 + errors.append(exc) 335 + 336 + threads = [threading.Thread(target=detect_entity, args=(i,)) for i in range(count)] 337 + for t in threads: 338 + t.start() 339 + for t in threads: 340 + t.join() 341 + 342 + assert not errors, f"Unexpected errors: {errors}" 343 + loaded = load_entities("test_facet", "20250101") 344 + assert len(loaded) == count, f"Expected {count} entities, got {len(loaded)}" 345 + 346 + names = {e["name"] for e in loaded} 347 + for i in range(count): 348 + assert f"Entity{i}" in names, f"Entity{i} missing from saved entities" 349 + 350 + 351 + def test_save_detected_entity_retry_on_error(fixture_journal, tmp_path): 352 + """Test that save_detected_entity retries on transient OSError.""" 353 + from unittest.mock import patch 354 + 355 + os.environ["JOURNAL_PATH"] = str(tmp_path) 356 + (tmp_path / "facets" / "test_facet" / "entities").mkdir(parents=True) 357 + 358 + call_count = 0 359 + original_atomic_write = __import__( 360 + "think.entities.core", fromlist=["atomic_write"] 361 + ).atomic_write 362 + 363 + def flaky_atomic_write(path, content, prefix=".tmp_"): 364 + nonlocal call_count 365 + call_count += 1 366 + if call_count == 1: 367 + raise PermissionError("Simulated transient error") 368 + return original_atomic_write(path, content, prefix) 369 + 370 + with patch("think.entities.saving.atomic_write", side_effect=flaky_atomic_write): 371 + save_detected_entity("test_facet", "20250101", "Person", "Alice", "Friend") 372 + 373 + assert call_count == 2 # First attempt failed, second succeeded 374 + loaded = load_entities("test_facet", "20250101") 375 + assert len(loaded) == 1 376 + assert loaded[0]["name"] == "Alice" 377 + 378 + 379 + def test_update_detected_entity(fixture_journal, tmp_path): 380 + """Test update_detected_entity with locking.""" 381 + os.environ["JOURNAL_PATH"] = str(tmp_path) 382 + (tmp_path / "facets" / "test_facet" / "entities").mkdir(parents=True) 383 + 384 + save_detected_entity("test_facet", "20250101", "Person", "Alice", "Friend") 385 + result = update_detected_entity("test_facet", "20250101", "Alice", "Best friend") 386 + assert result["description"] == "Best friend" 387 + 388 + loaded = load_entities("test_facet", "20250101") 389 + assert loaded[0]["description"] == "Best friend" 390 + 391 + 392 + def test_update_detected_entity_not_found(fixture_journal, tmp_path): 393 + """Test update_detected_entity raises when entity missing.""" 394 + os.environ["JOURNAL_PATH"] = str(tmp_path) 395 + (tmp_path / "facets" / "test_facet" / "entities").mkdir(parents=True) 396 + 397 + import pytest as _pytest 398 + 399 + with _pytest.raises(ValueError, match="not found"): 400 + update_detected_entity("test_facet", "20250101", "Nobody", "Desc") 287 401 288 402 289 403 def test_load_all_attached_entities(fixture_journal):
+3 -9
tests/test_models.py
··· 640 640 context="test", 641 641 ) 642 642 643 - log_file = tmp_path / "tokens" / ( 644 - __import__("time").strftime("%Y%m%d") + ".jsonl" 645 - ) 643 + log_file = tmp_path / "tokens" / (__import__("time").strftime("%Y%m%d") + ".jsonl") 646 644 entry = json.loads(log_file.read_text().strip()) 647 645 assert entry["usage"]["total_tokens"] == 1200 648 646 assert entry["usage"]["input_tokens"] == 1000 ··· 663 661 context="test", 664 662 ) 665 663 666 - log_file = tmp_path / "tokens" / ( 667 - __import__("time").strftime("%Y%m%d") + ".jsonl" 668 - ) 664 + log_file = tmp_path / "tokens" / (__import__("time").strftime("%Y%m%d") + ".jsonl") 669 665 entry = json.loads(log_file.read_text().strip()) 670 666 assert entry["usage"]["total_tokens"] == 1500 671 667 ··· 688 684 context="test", 689 685 ) 690 686 691 - log_file = tmp_path / "tokens" / ( 692 - __import__("time").strftime("%Y%m%d") + ".jsonl" 693 - ) 687 + log_file = tmp_path / "tokens" / (__import__("time").strftime("%Y%m%d") + ".jsonl") 694 688 entry = json.loads(log_file.read_text().strip()) 695 689 assert entry["usage"]["cached_tokens"] == 800 696 690 assert entry["usage"]["total_tokens"] == 1200
+4
think/entities/__init__.py
··· 102 102 103 103 # Entity saving 104 104 from think.entities.saving import ( 105 + save_detected_entity, 105 106 save_entities, 107 + update_detected_entity, 106 108 update_entity_description, 107 109 ) 108 110 ··· 147 149 "load_recent_entity_names", 148 150 "parse_entity_file", 149 151 # Saving 152 + "save_detected_entity", 150 153 "save_entities", 154 + "update_detected_entity", 151 155 "update_entity_description", 152 156 # Matching 153 157 "find_matching_entity",
+138 -1
think/entities/saving.py
··· 5 5 6 6 This module handles saving entities to storage: 7 7 - save_entities: Save attached or detected entities for a facet 8 + - save_detected_entity: Concurrency-safe single entity detection with file locking 8 9 - update_entity_description: Update a single entity's description with guard 9 10 """ 10 11 12 + import fcntl 11 13 import json 14 + import random 15 + import time 12 16 13 17 from think.entities.core import EntityDict, atomic_write, entity_slug 14 18 from think.entities.journal import get_or_create_journal_entity, save_journal_entity ··· 35 39 36 40 # Format as JSONL and write atomically 37 41 content = "".join(json.dumps(e, ensure_ascii=False) + "\n" for e in sorted_entities) 38 - atomic_write(path, content, prefix=".entities_") 42 + atomic_write(path, content, prefix="entities_") 39 43 40 44 41 45 def _save_entities_attached(facet: str, entities: list[EntityDict]) -> None: ··· 150 154 _save_entities_detected(facet, entities, day) 151 155 else: 152 156 _save_entities_attached(facet, entities) 157 + 158 + 159 + def _locked_modify_detected( 160 + facet: str, 161 + day: str, 162 + modify_fn: callable, 163 + max_retries: int = 3, 164 + ) -> list[EntityDict]: 165 + """Perform a locked read-modify-write on detected entities. 166 + 167 + Acquires an exclusive file lock, loads current state, applies the 168 + mutation function, and writes back atomically. Retries with randomized 169 + backoff on transient OS errors. 170 + 171 + Args: 172 + facet: Facet name 173 + day: Day in YYYYMMDD format 174 + modify_fn: Called with current entity list, must return the new list. 175 + May raise ValueError for logical errors (not retried). 176 + max_retries: Maximum attempts (default 3) 177 + 178 + Returns: 179 + The entity list as written 180 + 181 + Raises: 182 + ValueError: From modify_fn (logical errors, not retried) 183 + OSError: If all retries exhausted on transient errors 184 + """ 185 + path = detected_entities_path(facet, day) 186 + lock_path = path.parent / f"{path.name}.lock" 187 + 188 + last_error: Exception | None = None 189 + for attempt in range(max_retries): 190 + try: 191 + path.parent.mkdir(parents=True, exist_ok=True) 192 + with open(lock_path, "w") as lock_file: 193 + fcntl.flock(lock_file, fcntl.LOCK_EX) 194 + try: 195 + # Fresh load inside lock — sees all prior writers' changes 196 + entities = load_entities(facet, day) 197 + entities = modify_fn(entities) 198 + _save_entities_detected(facet, entities, day) 199 + return entities 200 + finally: 201 + fcntl.flock(lock_file, fcntl.LOCK_UN) 202 + except ValueError: 203 + raise # Logical errors (duplicate, not found) — don't retry 204 + except OSError as exc: 205 + last_error = exc 206 + if attempt < max_retries - 1: 207 + time.sleep(random.uniform(0.05, 0.3) * (attempt + 1)) 208 + 209 + raise last_error # type: ignore[misc] 210 + 211 + 212 + def save_detected_entity( 213 + facet: str, 214 + day: str, 215 + entity_type: str, 216 + name: str, 217 + description: str, 218 + ) -> EntityDict: 219 + """Add a single detected entity with concurrency-safe file locking. 220 + 221 + Uses exclusive file locking to serialize concurrent writers to the same 222 + facet+day file, preventing lost updates. Retries with randomized backoff 223 + on transient OS errors. 224 + 225 + Args: 226 + facet: Facet name 227 + day: Day in YYYYMMDD format 228 + entity_type: Entity type (e.g. "Person", "Company") 229 + name: Entity name 230 + description: Entity description 231 + 232 + Returns: 233 + The saved entity dict (with generated id) 234 + 235 + Raises: 236 + ValueError: If entity with same name already detected for this day 237 + OSError: If all retries exhausted 238 + """ 239 + new_entity: EntityDict = { 240 + "type": entity_type, 241 + "name": name, 242 + "description": description, 243 + } 244 + name_lower = name.lower() 245 + 246 + def _add_entity(entities: list[EntityDict]) -> list[EntityDict]: 247 + for e in entities: 248 + if e.get("name", "").lower() == name_lower: 249 + raise ValueError(f"Entity '{name}' already detected for {day}") 250 + entities.append(new_entity) 251 + return entities 252 + 253 + _locked_modify_detected(facet, day, _add_entity) 254 + 255 + # Return with id filled in (set by _save_entities_detected) 256 + return new_entity 257 + 258 + 259 + def update_detected_entity( 260 + facet: str, 261 + day: str, 262 + name: str, 263 + description: str, 264 + ) -> EntityDict: 265 + """Update a detected entity's description with concurrency-safe locking. 266 + 267 + Args: 268 + facet: Facet name 269 + day: Day in YYYYMMDD format 270 + name: Entity name to find 271 + description: New description 272 + 273 + Returns: 274 + The updated entity dict 275 + 276 + Raises: 277 + ValueError: If entity not found 278 + OSError: If all retries exhausted 279 + """ 280 + 281 + def _update_entity(entities: list[EntityDict]) -> list[EntityDict]: 282 + for e in entities: 283 + if e.get("name") == name: 284 + e["description"] = description 285 + return entities 286 + raise ValueError(f"Entity '{name}' not found for {day}") 287 + 288 + result = _locked_modify_detected(facet, day, _update_entity) 289 + return next(e for e in result if e.get("name") == name) 153 290 154 291 155 292 def update_entity_description(