a digital entity named phi that roams bsky phi.zzstoatzz.io
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

phi extracts its own observations during reflection, not haiku after each conversation

- after_interaction() now only stores the raw exchange (ground truth)
- removed standalone haiku extraction agent (get_extraction_agent)
- added get_unprocessed_interactions() with timestamp heuristic
- phi reviews unprocessed interactions in batch during daily_reflection via process_extraction()
- extraction uses phi's own model (sonnet) with its personality + extraction prompt

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+121 -44
+49
src/bot/agent.py
··· 13 13 14 14 from bot.config import settings 15 15 from bot.core.graze_client import GrazeClient 16 + from bot.memory.extraction import EXTRACTION_SYSTEM_PROMPT, ExtractionResult 16 17 from bot.tools import PhiDeps, _check_services_impl, register_all 17 18 18 19 logger = logging.getLogger("bot.agent") ··· 208 209 ) 209 210 register_all(self.agent, self.graze_client) 210 211 212 + # Extraction agent — phi extracts its own observations using its own model 213 + self._extraction_agent = Agent[None, ExtractionResult]( 214 + name="phi-extractor", 215 + model=settings.agent_model, 216 + system_prompt=f"{self.base_personality}\n\n{EXTRACTION_SYSTEM_PROMPT}", 217 + output_type=ExtractionResult, 218 + ) 219 + 211 220 logger.info("phi agent initialized with pdsx + pub-search mcp tools") 212 221 213 222 def _mcp_toolsets(self) -> list[MCPServerStreamableHTTP]: ··· 400 409 + (f" ({result.output.reason})" if result.output.reason else "") 401 410 ) 402 411 return result.output 412 + 413 + async def process_extraction(self) -> int: 414 + """Review recent unprocessed interactions and extract observations. Returns count stored.""" 415 + if not self.memory: 416 + return 0 417 + 418 + unprocessed = await self.memory.get_unprocessed_interactions(top_k=20) 419 + if not unprocessed: 420 + logger.info("extraction: no unprocessed interactions") 421 + return 0 422 + 423 + logger.info( 424 + f"extraction: reviewing {len(unprocessed)} unprocessed interactions" 425 + ) 426 + 427 + # group by handle 428 + by_handle: dict[str, list[dict]] = {} 429 + for interaction in unprocessed: 430 + by_handle.setdefault(interaction["handle"], []).append(interaction) 431 + 432 + total_stored = 0 433 + for handle, interactions in by_handle.items(): 434 + exchange_texts = [i["content"] for i in interactions] 435 + prompt = f"recent exchanges with @{handle}:\n\n" + "\n\n---\n\n".join( 436 + exchange_texts 437 + ) 438 + 439 + try: 440 + result = await self._extraction_agent.run(prompt) 441 + if result.output.observations: 442 + for obs in result.output.observations: 443 + try: 444 + await self.memory._reconcile_observation(handle, obs) 445 + total_stored += 1 446 + except Exception as e: 447 + logger.warning(f"reconciliation failed: {e}") 448 + except Exception as e: 449 + logger.warning(f"extraction failed for @{handle}: {e}") 450 + 451 + return total_stored
-15
src/bot/memory/extraction.py
··· 114 114 Corrections (e.g., "name is nate, corrected from previous error") always win over the entry they correct — use UPDATE or DELETE. 115 115 When in doubt between ADD and NOOP, prefer NOOP. memory should be lean.""" 116 116 117 - _extraction_agent: Agent[None, ExtractionResult] | None = None 118 117 _reconciliation_agent: Agent[None, ReconciliationResult] | None = None 119 - 120 - 121 - def get_extraction_agent() -> Agent[None, ExtractionResult]: 122 - global _extraction_agent 123 - if _extraction_agent is None: 124 - _extraction_agent = Agent[None, ExtractionResult]( 125 - name="observation-extractor", 126 - model=f"anthropic:{settings.extraction_model}", 127 - output_type=ExtractionResult, 128 - system_prompt=EXTRACTION_SYSTEM_PROMPT, 129 - ) 130 - agent = _extraction_agent 131 - assert agent is not None 132 - return agent 133 118 134 119 135 120 def get_reconciliation_agent() -> Agent[None, ReconciliationResult]:
+64 -29
src/bot/memory/namespace_memory.py
··· 14 14 EPISODIC_SCHEMA, 15 15 USER_NAMESPACE_SCHEMA, 16 16 Observation, 17 - get_extraction_agent, 18 17 get_reconciliation_agent, 19 18 ) 20 19 ··· 317 316 distance_metric="cosine_distance", 318 317 schema=USER_NAMESPACE_SCHEMA, 319 318 ) 320 - 321 - async def extract_and_store(self, handle: str, user_text: str, bot_text: str): 322 - """Extract observations from an exchange and reconcile against existing memory. 323 - 324 - Extraction runs blind — no existing observations in the prompt. This breaks 325 - the feedback loop where the extraction model pattern-matches off existing 326 - (potentially bad) observations. Deduplication happens in reconciliation. 327 - """ 328 - try: 329 - prompt = f"new exchange:\nuser: {user_text}\nbot: {bot_text}" 330 - result = await get_extraction_agent().run(prompt) 331 - if result.output.observations: 332 - # reconcile each candidate against existing memory 333 - for obs in result.output.observations: 334 - try: 335 - await self._reconcile_observation(handle, obs) 336 - except Exception as e: 337 - logger.warning( 338 - f"reconciliation failed for observation '{obs.content[:40]}': {e}" 339 - ) 340 - # fall back to direct store on reconciliation failure 341 - await self.store_observations(handle, [obs]) 342 - else: 343 - logger.debug(f"no new observations for @{handle}") 344 - except Exception as e: 345 - logger.warning(f"observation extraction failed for @{handle}: {e}") 346 319 347 320 async def get_relationship_summary(self, handle: str) -> str | None: 348 321 """Get the compacted relationship summary for a user, if one exists.""" ··· 751 724 results.sort(key=lambda r: r.get("created_at", ""), reverse=True) 752 725 return results[:top_k] 753 726 727 + async def get_unprocessed_interactions(self, top_k: int = 20) -> list[dict]: 728 + """Get recent interactions that haven't been reviewed for observation extraction. 729 + 730 + Uses a timestamp heuristic: interactions newer than the most recent 731 + observation in each user namespace are considered unprocessed. 732 + """ 733 + user_prefix = f"{self.NAMESPACES['users']}-" 734 + results: list[dict] = [] 735 + try: 736 + page = self.client.namespaces(prefix=user_prefix) 737 + for ns_summary in page.namespaces: 738 + handle = ns_summary.id.removeprefix(user_prefix).replace("_", ".") 739 + user_ns = self.client.namespace(ns_summary.id) 740 + 741 + # find the latest observation timestamp 742 + latest_obs_time = "" 743 + try: 744 + obs_response = user_ns.query( 745 + rank_by=("created_at", "desc"), 746 + top_k=1, 747 + filters=[ 748 + "And", 749 + [ 750 + ["kind", "Eq", "observation"], 751 + ["status", "NotEq", "superseded"], 752 + ], 753 + ], 754 + include_attributes=["created_at"], 755 + ) 756 + if obs_response.rows: 757 + latest_obs_time = ( 758 + getattr(obs_response.rows[0], "created_at", "") or "" 759 + ) 760 + except Exception: 761 + pass 762 + 763 + # get interactions newer than that 764 + try: 765 + int_response = user_ns.query( 766 + rank_by=("created_at", "desc"), 767 + top_k=5, 768 + filters={"kind": ["Eq", "interaction"]}, 769 + include_attributes=["content", "created_at"], 770 + ) 771 + if int_response.rows: 772 + for row in int_response.rows: 773 + created = getattr(row, "created_at", "") or "" 774 + if created > latest_obs_time: 775 + results.append( 776 + { 777 + "handle": handle, 778 + "content": row.content, 779 + "created_at": created, 780 + } 781 + ) 782 + except Exception: 783 + pass 784 + except Exception as e: 785 + logger.warning(f"failed to get unprocessed interactions: {e}") 786 + 787 + results.sort(key=lambda r: r.get("created_at", ""), reverse=True) 788 + return results[:top_k] 789 + 754 790 async def after_interaction(self, handle: str, user_text: str, bot_text: str): 755 - """Post-interaction hook: store interaction then extract observations.""" 791 + """Post-interaction hook: store the raw exchange as ground truth.""" 756 792 await self.store_interaction(handle, user_text, bot_text) 757 - await self.extract_and_store(handle, user_text, bot_text)
+8
src/bot/services/message_handler.py
··· 273 273 async def daily_reflection(self): 274 274 """Generate and post a daily reflection if phi has something to say.""" 275 275 with logfire.span("daily reflection"): 276 + # First: review unprocessed interactions and extract observations 277 + try: 278 + extracted = await self.agent.process_extraction() 279 + if extracted: 280 + logger.info(f"daily reflection: extracted {extracted} observations") 281 + except Exception as e: 282 + logger.warning(f"extraction during reflection failed: {e}") 283 + 276 284 # Fetch last top-level post so the agent knows what it said recently 277 285 last_post_text = None 278 286 try: