personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Fix concurrent write safety for sol call commands

Add file locking to todo operations (add/done/cancel) using fcntl.flock
with retry+backoff, matching the existing detected entities pattern.

Refactor attached entity commands (update/aka/attach) to write only the
affected entity's files instead of the load-all/save-all pattern that
could lose concurrent updates to different entities in the same facet.

Add file locking to entity observation writes to prevent lost updates
when multiple agents observe the same entity concurrently.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+171 -53
+48 -30
apps/entities/call.py
··· 10 10 11 11 import typer 12 12 13 - from think.entities.core import is_valid_entity_type 13 + from think.entities.core import entity_slug, is_valid_entity_type 14 + from think.entities.journal import get_or_create_journal_entity, save_journal_entity 14 15 from think.entities.loading import load_entities 15 16 from think.entities.matching import resolve_entity, validate_aka_uniqueness 16 17 from think.entities.observations import ( ··· 18 19 add_observation, 19 20 load_observations, 20 21 ) 22 + from think.entities.relationships import ( 23 + load_facet_relationship, 24 + save_facet_relationship, 25 + ) 21 26 from think.entities.saving import ( 22 27 save_detected_entity, 23 - save_entities, 24 28 update_detected_entity, 25 29 ) 26 30 from think.facets import log_call_action ··· 158 162 return 159 163 160 164 name = entity 161 - existing = load_entities( 162 - facet, day=None, include_detached=True, include_blocked=True 165 + now = now_ms() 166 + entity_id = entity_slug(name) 167 + 168 + # Create journal entity (identity record) if it doesn't exist 169 + get_or_create_journal_entity( 170 + entity_id=entity_id, 171 + name=name, 172 + entity_type=type_, 163 173 ) 164 - now = now_ms() 165 - existing.append( 174 + 175 + # Create facet relationship (per-entity file, no load-all needed) 176 + save_facet_relationship( 177 + facet, 178 + entity_id, 166 179 { 167 - "type": type_, 168 - "name": name, 180 + "entity_id": entity_id, 169 181 "description": description, 170 182 "attached_at": now, 171 183 "updated_at": now, 172 - } 184 + }, 173 185 ) 174 - save_entities(facet, existing, day=None) 175 186 176 187 log_call_action( 177 188 facet=facet, ··· 202 213 if day is None: 203 214 resolved = _resolve_or_exit(facet, entity) 204 215 resolved_name = resolved.get("name", entity) 205 - entities = load_entities( 206 - facet, day=None, include_detached=True, include_blocked=True 207 - ) 208 - 209 - target = None 210 - for e in entities: 211 - if not e.get("detached") and e.get("name") == resolved_name: 212 - target = e 213 - break 216 + entity_id = resolved.get("id", entity_slug(resolved_name)) 214 217 215 - if target is None: 218 + # Load and update only the target entity's relationship file 219 + relationship = load_facet_relationship(facet, entity_id) 220 + if relationship is None: 216 221 typer.echo(f"Error: Entity '{resolved_name}' not found.", err=True) 217 222 raise typer.Exit(1) 218 223 219 - target["description"] = description 220 - target["updated_at"] = now_ms() 221 - save_entities(facet, entities, day=None) 224 + relationship["description"] = description 225 + relationship["updated_at"] = now_ms() 226 + save_facet_relationship(facet, entity_id, relationship) 222 227 log_call_action( 223 228 facet=facet, 224 229 action="entity_update", ··· 275 280 typer.echo(f"Alias '{aka_value}' already exists for '{resolved_name}'.") 276 281 return 277 282 283 + # Validate uniqueness across all entities in facet 278 284 entities = load_entities( 279 285 facet, day=None, include_detached=True, include_blocked=True 280 286 ) ··· 287 293 ) 288 294 raise typer.Exit(1) 289 295 290 - for e in entities: 291 - if e.get("name") == resolved_name: 292 - aka_list.append(aka_value) 293 - e["aka"] = aka_list 294 - e["updated_at"] = now_ms() 295 - break 296 + entity_id = resolved.get("id", entity_slug(resolved_name)) 297 + aka_list.append(aka_value) 298 + 299 + # Update journal entity aka (identity-level) 300 + from think.entities.journal import load_journal_entity 301 + 302 + journal_entity = load_journal_entity(entity_id) 303 + if journal_entity: 304 + existing_aka = set(journal_entity.get("aka", [])) 305 + existing_aka.add(aka_value) 306 + journal_entity["aka"] = sorted(existing_aka) 307 + save_journal_entity(journal_entity) 308 + 309 + # Update facet relationship (per-entity file) 310 + relationship = load_facet_relationship(facet, entity_id) 311 + if relationship is not None: 312 + relationship["aka"] = aka_list 313 + relationship["updated_at"] = now_ms() 314 + save_facet_relationship(facet, entity_id, relationship) 296 315 297 - save_entities(facet, entities, day=None) 298 316 log_call_action( 299 317 facet=facet, 300 318 action="entity_add_aka",
+19 -6
apps/todos/call.py
··· 112 112 raise typer.Exit(1) 113 113 114 114 try: 115 - checklist = todo.TodoChecklist.load(day, facet) 116 - item = checklist.append_entry(text) 115 + 116 + def _add(checklist: todo.TodoChecklist) -> todo.TodoChecklist: 117 + checklist.append_entry(text) 118 + return checklist 119 + 120 + checklist = todo.TodoChecklist.locked_modify(day, facet, _add) 121 + item = checklist.items[-1] 117 122 log_call_action( 118 123 facet=facet, 119 124 action="todo_add", ··· 144 149 facet = resolve_sol_facet(facet) 145 150 146 151 try: 147 - checklist = todo.TodoChecklist.load(day, facet) 148 - item = checklist.mark_done(line_number) 152 + 153 + def _done(checklist: todo.TodoChecklist) -> tuple: 154 + item = checklist.mark_done(line_number) 155 + return checklist, item 156 + 157 + checklist, item = todo.TodoChecklist.locked_modify(day, facet, _done) 149 158 log_call_action( 150 159 facet=facet, 151 160 action="todo_done", ··· 179 188 facet = resolve_sol_facet(facet) 180 189 181 190 try: 182 - checklist = todo.TodoChecklist.load(day, facet) 183 - item = checklist.cancel_entry(line_number) 191 + 192 + def _cancel(checklist: todo.TodoChecklist) -> tuple: 193 + item = checklist.cancel_entry(line_number) 194 + return checklist, item 195 + 196 + checklist, item = todo.TodoChecklist.locked_modify(day, facet, _cancel) 184 197 log_call_action( 185 198 facet=facet, 186 199 action="todo_cancel",
+56
apps/todos/todo.py
··· 9 9 10 10 from __future__ import annotations 11 11 12 + import fcntl 12 13 import json 13 14 import logging 15 + import random 14 16 import re 17 + import time 15 18 from dataclasses import dataclass 16 19 from datetime import datetime 17 20 from pathlib import Path ··· 179 182 exists = False 180 183 181 184 return cls(day=day, facet=facet, path=path, items=items, exists=exists) 185 + 186 + @classmethod 187 + def locked_modify( 188 + cls, 189 + day: str, 190 + facet: str, 191 + modify_fn: Any, 192 + max_retries: int = 3, 193 + ) -> Any: 194 + """Perform a locked load-modify-save on a todo checklist. 195 + 196 + Acquires an exclusive file lock, loads current state, applies the 197 + mutation function, and returns the result. The modify_fn receives a 198 + fresh ``TodoChecklist`` and should call save-triggering methods 199 + (``append_entry``, ``mark_done``, ``cancel_entry``) which persist 200 + inside the lock. Retries with randomized backoff on transient OS 201 + errors. 202 + 203 + Args: 204 + day: Journal day in ``YYYYMMDD`` format. 205 + facet: Facet name. 206 + modify_fn: Called with a fresh TodoChecklist. Must return the 207 + value to propagate to the caller. 208 + max_retries: Maximum attempts (default 3). 209 + 210 + Returns: 211 + Whatever ``modify_fn`` returns. 212 + 213 + Raises: 214 + OSError: If all retries exhausted on transient errors. 215 + """ 216 + path = todo_file_path(day, facet) 217 + lock_path = path.parent / f"{path.name}.lock" 218 + 219 + last_error: Exception | None = None 220 + for attempt in range(max_retries): 221 + try: 222 + path.parent.mkdir(parents=True, exist_ok=True) 223 + with open(lock_path, "w") as lock_file: 224 + fcntl.flock(lock_file, fcntl.LOCK_EX) 225 + try: 226 + checklist = cls.load(day, facet) 227 + return modify_fn(checklist) 228 + finally: 229 + fcntl.flock(lock_file, fcntl.LOCK_UN) 230 + except (IndexError, TodoError): 231 + raise # Logical errors — don't retry 232 + except OSError as exc: 233 + last_error = exc 234 + if attempt < max_retries - 1: 235 + time.sleep(random.uniform(0.05, 0.3) * (attempt + 1)) 236 + 237 + raise last_error # type: ignore[misc] 182 238 183 239 def save(self) -> None: 184 240 """Persist the checklist back to disk, creating parent directories if needed."""
+48 -17
think/entities/observations.py
··· 10 10 and biographical facts that help with future interactions. 11 11 """ 12 12 13 + import fcntl 13 14 import json 15 + import random 16 + import time 14 17 from pathlib import Path 15 18 from typing import Any 16 19 ··· 110 113 content: str, 111 114 observation_number: int, 112 115 source_day: str | None = None, 116 + max_retries: int = 3, 113 117 ) -> dict[str, Any]: 114 - """Add an observation to an entity with guard validation. 118 + """Add an observation to an entity with guard validation and file locking. 115 119 116 - Requires the caller to provide the expected next observation number 117 - (current count + 1) to prevent stale writes. 120 + Acquires an exclusive file lock to serialize concurrent writes to the 121 + same entity's observations file. Requires the caller to provide the 122 + expected next observation number (current count + 1) to prevent stale 123 + writes. 118 124 119 125 Args: 120 126 facet: Facet name ··· 122 128 content: The observation text 123 129 observation_number: Expected next number; must be current_count + 1 124 130 source_day: Optional day (YYYYMMDD) when observation was made 131 + max_retries: Maximum attempts on transient OS errors (default 3) 125 132 126 133 Returns: 127 134 Dictionary with updated observations list and count ··· 129 136 Raises: 130 137 ObservationNumberError: If observation_number doesn't match expected 131 138 ValueError: If content is empty 139 + OSError: If all retries exhausted 132 140 133 141 Example: 134 142 >>> add_observation("work", "Alice", "Prefers morning meetings", 1, "20250113") ··· 138 146 if not content: 139 147 raise ValueError("Observation content cannot be empty") 140 148 141 - observations = load_observations(facet, name) 142 - expected = len(observations) + 1 149 + path = observations_file_path(facet, name) 150 + lock_path = path.parent / f"{path.name}.lock" 143 151 144 - if observation_number != expected: 145 - raise ObservationNumberError(expected, observation_number) 152 + last_error: Exception | None = None 153 + for attempt in range(max_retries): 154 + try: 155 + path.parent.mkdir(parents=True, exist_ok=True) 156 + with open(lock_path, "w") as lock_file: 157 + fcntl.flock(lock_file, fcntl.LOCK_EX) 158 + try: 159 + # Fresh load inside lock 160 + observations = load_observations(facet, name) 161 + expected = len(observations) + 1 146 162 147 - # Create new observation 148 - observation: dict[str, Any] = { 149 - "content": content, 150 - "observed_at": now_ms(), 151 - } 152 - if source_day: 153 - observation["source_day"] = source_day 163 + if observation_number != expected: 164 + raise ObservationNumberError(expected, observation_number) 154 165 155 - observations.append(observation) 156 - save_observations(facet, name, observations) 166 + observation: dict[str, Any] = { 167 + "content": content, 168 + "observed_at": now_ms(), 169 + } 170 + if source_day: 171 + observation["source_day"] = source_day 172 + 173 + observations.append(observation) 174 + save_observations(facet, name, observations) 175 + 176 + return { 177 + "observations": observations, 178 + "count": len(observations), 179 + } 180 + finally: 181 + fcntl.flock(lock_file, fcntl.LOCK_UN) 182 + except (ValueError, ObservationNumberError): 183 + raise # Logical errors — don't retry 184 + except OSError as exc: 185 + last_error = exc 186 + if attempt < max_retries - 1: 187 + time.sleep(random.uniform(0.05, 0.3) * (attempt + 1)) 157 188 158 - return {"observations": observations, "count": len(observations)} 189 + raise last_error # type: ignore[misc]