a digital entity named phi that roams bsky phi.zzstoatzz.io
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

add observation reconciliation to extraction pipeline

instead of blindly appending new observations, each candidate is now
embedded and compared against existing observations in turbopuffer.
an LLM reconciler decides ADD/UPDATE/DELETE/NOOP per observation,
preventing duplicates and resolving contradictions at write time.
inspired by mem0's consolidation pattern.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz 715fb918 4aae5dc1

+142 -5
+142 -5
src/bot/memory/namespace_memory.py
··· 29 29 observations: list[Observation] = [] 30 30 31 31 32 + class ReconciliationAction(BaseModel): 33 + """Decision for how a new observation relates to an existing one.""" 34 + 35 + action: str = Field(description="one of: ADD, UPDATE, DELETE, NOOP") 36 + new_content: str | None = Field(default=None, description="merged content when action is UPDATE") 37 + new_tags: list[str] | None = Field(default=None, description="merged tags when action is UPDATE") 38 + reason: str = Field(description="brief explanation of the decision") 39 + 40 + 41 + class ReconciliationResult(BaseModel): 42 + """Result of reconciling a new observation against a similar existing one.""" 43 + 44 + decision: ReconciliationAction 45 + 46 + 32 47 EXTRACTION_SYSTEM_PROMPT = """\ 33 48 You extract facts about the USER from a conversation between a user and a bot. 34 49 ··· 77 92 78 93 Deduplicate against existing observations provided in the prompt. Return an empty list when the exchange is just greetings, filler, or the user only asked questions without revealing anything about themselves.""" 79 94 95 + RECONCILIATION_SYSTEM_PROMPT = """\ 96 + You reconcile a NEW observation against an EXISTING observation from memory. 97 + 98 + Decide one action: 99 + - ADD: the new observation contains genuinely different information. keep both. 100 + - UPDATE: the new observation refines, corrects, or supersedes the existing one. return merged content and tags. 101 + - DELETE: the existing observation is wrong, outdated, or fully redundant given the new one. the new one will be stored separately. 102 + - NOOP: the new observation adds nothing beyond what already exists. discard it. 103 + 104 + Corrections (e.g., "name is nate, corrected from previous error") always win over the entry they correct — use UPDATE or DELETE. 105 + When in doubt between ADD and NOOP, prefer NOOP. memory should be lean.""" 106 + 80 107 _extraction_agent: Agent[None, ExtractionResult] | None = None 108 + _reconciliation_agent: Agent[None, ReconciliationResult] | None = None 81 109 82 110 83 111 def get_extraction_agent() -> Agent[None, ExtractionResult]: ··· 90 118 system_prompt=EXTRACTION_SYSTEM_PROMPT, 91 119 ) 92 120 return _extraction_agent 121 + 122 + 123 + def get_reconciliation_agent() -> Agent[None, ReconciliationResult]: 124 + global _reconciliation_agent 125 + if _reconciliation_agent is None: 126 + _reconciliation_agent = Agent( 127 + name="observation-reconciler", 128 + model=f"anthropic:{settings.extraction_model}", 129 + output_type=ReconciliationResult, 130 + system_prompt=RECONCILIATION_SYSTEM_PROMPT, 131 + ) 132 + return _reconciliation_agent 93 133 94 134 EPISODIC_SCHEMA = { 95 135 "content": {"type": "string", "full_text_search": True}, ··· 252 292 schema=USER_NAMESPACE_SCHEMA, 253 293 ) 254 294 295 + async def _find_similar_observations(self, handle: str, embedding: list[float], top_k: int = 3) -> list[dict]: 296 + """Find existing observations similar to the given embedding.""" 297 + user_ns = self.get_user_namespace(handle) 298 + try: 299 + response = user_ns.query( 300 + rank_by=("vector", "ANN", embedding), 301 + top_k=top_k, 302 + filters={"kind": ["Eq", "observation"]}, 303 + include_attributes=["content", "tags", "created_at"], 304 + ) 305 + if response.rows: 306 + return [ 307 + {"id": row.id, "content": row.content, "tags": getattr(row, "tags", []), "created_at": getattr(row, "created_at", "")} 308 + for row in response.rows 309 + ] 310 + except Exception as e: 311 + if "attribute not found" in str(e) or "was not found" in str(e): 312 + return [] 313 + raise 314 + return [] 315 + 316 + async def _reconcile_observation(self, handle: str, obs: Observation) -> None: 317 + """Reconcile a single new observation against existing similar ones in turbopuffer.""" 318 + embedding = await self._get_embedding(obs.content) 319 + similar = await self._find_similar_observations(handle, embedding, top_k=3) 320 + 321 + if not similar: 322 + # nothing similar — just add 323 + await self._write_observation(handle, obs, embedding) 324 + logger.info(f"ADD (no similar) for @{handle}: {obs.content[:60]}") 325 + return 326 + 327 + # ask the LLM to reconcile against the most similar existing observation 328 + best_match = similar[0] 329 + prompt = ( 330 + f"EXISTING observation: {best_match['content']}\n" 331 + f"EXISTING tags: {best_match['tags']}\n\n" 332 + f"NEW observation: {obs.content}\n" 333 + f"NEW tags: {obs.tags}" 334 + ) 335 + result = await get_reconciliation_agent().run(prompt) 336 + decision = result.output.decision 337 + action = decision.action.upper() 338 + 339 + user_ns = self.get_user_namespace(handle) 340 + 341 + if action == "ADD": 342 + await self._write_observation(handle, obs, embedding) 343 + logger.info(f"ADD for @{handle}: {obs.content[:60]} ({decision.reason})") 344 + 345 + elif action == "UPDATE": 346 + # delete the old one, write the merged version 347 + user_ns.write(delete_rows=[best_match["id"]]) 348 + merged = Observation( 349 + content=decision.new_content or obs.content, 350 + tags=decision.new_tags or obs.tags, 351 + ) 352 + merged_embedding = await self._get_embedding(merged.content) 353 + await self._write_observation(handle, merged, merged_embedding) 354 + logger.info(f"UPDATE for @{handle}: '{best_match['content'][:40]}' -> '{merged.content[:40]}' ({decision.reason})") 355 + 356 + elif action == "DELETE": 357 + # delete the existing one, store the new one 358 + user_ns.write(delete_rows=[best_match["id"]]) 359 + await self._write_observation(handle, obs, embedding) 360 + logger.info(f"DELETE+ADD for @{handle}: removed '{best_match['content'][:40]}', added '{obs.content[:40]}' ({decision.reason})") 361 + 362 + elif action == "NOOP": 363 + logger.debug(f"NOOP for @{handle}: '{obs.content[:60]}' ({decision.reason})") 364 + 365 + else: 366 + # unknown action — fall back to ADD 367 + await self._write_observation(handle, obs, embedding) 368 + logger.warning(f"unknown reconciliation action '{action}' for @{handle}, falling back to ADD") 369 + 370 + async def _write_observation(self, handle: str, obs: Observation, embedding: list[float]) -> None: 371 + """Write a single observation to turbopuffer.""" 372 + user_ns = self.get_user_namespace(handle) 373 + entry_id = self._generate_id(f"user-{handle}", "observation", obs.content) 374 + user_ns.write( 375 + upsert_rows=[{ 376 + "id": entry_id, 377 + "vector": embedding, 378 + "kind": "observation", 379 + "content": obs.content, 380 + "tags": obs.tags, 381 + "created_at": datetime.now().isoformat(), 382 + }], 383 + distance_metric="cosine_distance", 384 + schema=USER_NAMESPACE_SCHEMA, 385 + ) 386 + 255 387 async def extract_and_store(self, handle: str, user_text: str, bot_text: str): 256 - """Extract observations from an exchange and store them. Meant to be fire-and-forget.""" 388 + """Extract observations from an exchange and reconcile against existing memory.""" 257 389 try: 258 - # fetch existing observations for dedup context 390 + # fetch existing observations for extraction context 259 391 existing = await self._get_observations(handle, top_k=20) 260 392 existing_text = "\n".join(f"- {o}" for o in existing) if existing else "none yet" 261 393 ··· 265 397 ) 266 398 result = await get_extraction_agent().run(prompt) 267 399 if result.output.observations: 268 - await self.store_observations(handle, result.output.observations) 269 - obs_summary = ", ".join(o.content[:60] for o in result.output.observations) 270 - logger.info(f"extracted {len(result.output.observations)} observations for @{handle}: {obs_summary}") 400 + # reconcile each candidate against existing memory 401 + for obs in result.output.observations: 402 + try: 403 + await self._reconcile_observation(handle, obs) 404 + except Exception as e: 405 + logger.warning(f"reconciliation failed for observation '{obs.content[:40]}': {e}") 406 + # fall back to direct store on reconciliation failure 407 + await self.store_observations(handle, [obs]) 271 408 else: 272 409 logger.debug(f"no new observations for @{handle}") 273 410 except Exception as e: