a digital entity named phi that roams bsky phi.zzstoatzz.io
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: move context injection into dynamic system prompts

use pydantic-ai's @agent.system_prompt(dynamic=True) to inject
date, thread, memory, episodic, and reflection context as system
prompts instead of stuffing everything into the user prompt.
simplifies process_mention and process_reflection significantly.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz ca398f63 976b85c1

+122 -99
+1 -1
loq.toml
··· 13 13 14 14 [[rules]] 15 15 path = "src/bot/agent.py" 16 - max_lines = 938 16 + max_lines = 961 17 17 18 18 [[rules]] 19 19 path = "src/bot/memory/namespace_memory.py"
+121 -98
src/bot/agent.py
··· 6 6 import logging 7 7 import os 8 8 import socket 9 + from collections.abc import Sequence 9 10 from dataclasses import dataclass 10 11 from datetime import date 11 12 from pathlib import Path ··· 177 178 author_handle: str 178 179 memory: NamespaceMemory | None = None 179 180 thread_uri: str | None = None 181 + thread_context: str | None = None 182 + last_post_text: str | None = None 183 + recent_activity: str | None = None 184 + service_health: str | None = None 180 185 181 186 182 187 def _is_owner(ctx: RunContext[PhiDeps]) -> bool: ··· 262 267 return result.uri 263 268 264 269 270 + def _extract_query_text(prompt: str | Sequence[str | ImageUrl] | None) -> str: 271 + """Extract plain text from a pydantic-ai prompt for use as a search query.""" 272 + if prompt is None: 273 + return "" 274 + if isinstance(prompt, str): 275 + return prompt 276 + return " ".join(part for part in prompt if isinstance(part, str)) 277 + 278 + 265 279 class PhiAgent: 266 280 """phi - bluesky bot with structured memory and MCP tools.""" 267 281 ··· 294 308 output_type=Response, 295 309 deps_type=PhiDeps, 296 310 ) 311 + 312 + # --- dynamic system prompts --- 313 + 314 + @self.agent.system_prompt(dynamic=True) 315 + def inject_today() -> str: 316 + return f"[TODAY]: {date.today().isoformat()}" 317 + 318 + @self.agent.system_prompt(dynamic=True) 319 + def inject_thread(ctx: RunContext[PhiDeps]) -> str: 320 + tc = ctx.deps.thread_context 321 + if tc and tc != "No previous messages in this thread.": 322 + return ( 323 + f"[CURRENT THREAD - these are the messages in THIS thread]:\n{tc}" 324 + ) 325 + return "" 326 + 327 + @self.agent.system_prompt(dynamic=True) 328 + async def inject_user_memory(ctx: RunContext[PhiDeps]) -> str: 329 + if not ctx.deps.memory or not ctx.deps.author_handle: 330 + return "" 331 + query = _extract_query_text(ctx.prompt) 332 + if not query: 333 + return "" 334 + try: 335 + memory_context = await ctx.deps.memory.build_user_context( 336 + ctx.deps.author_handle, query_text=query, include_core=True 337 + ) 338 + if memory_context: 339 + return f"[PAST CONTEXT WITH @{ctx.deps.author_handle}]:\n{memory_context}" 340 + except Exception as e: 341 + logger.warning(f"failed to retrieve memories: {e}") 342 + return "" 343 + 344 + @self.agent.system_prompt(dynamic=True) 345 + async def inject_episodic(ctx: RunContext[PhiDeps]) -> str: 346 + if not ctx.deps.memory: 347 + return "" 348 + query = _extract_query_text(ctx.prompt) 349 + if not query: 350 + return "" 351 + try: 352 + episodic_context = await ctx.deps.memory.get_episodic_context(query) 353 + if episodic_context: 354 + return episodic_context 355 + except Exception as e: 356 + logger.warning(f"failed to retrieve episodic memories: {e}") 357 + return "" 358 + 359 + @self.agent.system_prompt(dynamic=True) 360 + def inject_last_post(ctx: RunContext[PhiDeps]) -> str: 361 + if ctx.deps.last_post_text: 362 + return f"[YOUR LAST POST]: {ctx.deps.last_post_text}" 363 + return "" 364 + 365 + @self.agent.system_prompt(dynamic=True) 366 + def inject_recent_activity(ctx: RunContext[PhiDeps]) -> str: 367 + if ctx.deps.recent_activity: 368 + return ctx.deps.recent_activity 369 + return "" 370 + 371 + @self.agent.system_prompt(dynamic=True) 372 + def inject_service_health(ctx: RunContext[PhiDeps]) -> str: 373 + if ctx.deps.service_health: 374 + return f"[SERVICE HEALTH]:\n{ctx.deps.service_health}" 375 + return "" 297 376 298 377 # --- memory tools --- 299 378 ··· 770 849 image_urls: list[str] | None = None, 771 850 ) -> Response: 772 851 """Process a mention with structured memory context.""" 773 - # Build context from memory if available 774 - memory_context = "" 775 - episodic_context = "" 776 - if self.memory: 777 - try: 778 - memory_context = await self.memory.build_user_context( 779 - author_handle, query_text=mention_text, include_core=True 780 - ) 781 - logger.info( 782 - f"memory context for @{author_handle}: {len(memory_context)} chars" 783 - ) 784 - except Exception as e: 785 - logger.warning(f"failed to retrieve memories: {e}") 786 - 787 - try: 788 - episodic_context = await self.memory.get_episodic_context(mention_text) 789 - if episodic_context: 790 - logger.info(f"episodic context: {len(episodic_context)} chars") 791 - except Exception as e: 792 - logger.warning(f"failed to retrieve episodic memories: {e}") 793 - 794 - # Build full prompt with clearly labeled context sections 795 - prompt_parts = [f"[TODAY]: {date.today().isoformat()}"] 796 - 797 - if thread_context and thread_context != "No previous messages in this thread.": 798 - prompt_parts.append( 799 - f"[CURRENT THREAD - these are the messages in THIS thread]:\n{thread_context}" 800 - ) 801 - 802 - if memory_context: 803 - prompt_parts.append( 804 - f"[PAST CONTEXT WITH @{author_handle}]:\n{memory_context}" 805 - ) 806 - 807 - if episodic_context: 808 - prompt_parts.append(episodic_context) 809 - 810 - prompt_parts.append(f"\n[NEW MESSAGE]:\n@{author_handle}: {mention_text}") 811 - prompt = "\n\n".join(prompt_parts) 812 - 813 - # Build multimodal prompt if images are present 814 - if image_urls: 815 - user_prompt: str | list = [prompt] + [ 816 - ImageUrl(url=url) for url in image_urls 817 - ] 818 - logger.info(f"including {len(image_urls)} images in prompt") 819 - else: 820 - user_prompt = prompt 852 + logger.info(f"processing mention from @{author_handle}: {mention_text[:80]}") 821 853 822 - # Run agent with MCP tools + search_memory available 823 - logger.info(f"processing mention from @{author_handle}: {mention_text[:80]}") 824 854 deps = PhiDeps( 825 855 author_handle=author_handle, 826 856 memory=self.memory, 827 857 thread_uri=thread_uri, 858 + thread_context=thread_context, 828 859 ) 860 + 861 + # User prompt is just the message — context is injected via dynamic system prompts 862 + user_prompt: str | list = f"@{author_handle}: {mention_text}" 863 + if image_urls: 864 + user_prompt = [user_prompt] + [ImageUrl(url=url) for url in image_urls] 865 + logger.info(f"including {len(image_urls)} images in prompt") 866 + 829 867 # Enter MCP servers before agent.run() so the connection is opened 830 868 # in this task. Parallel tool calls inside agent.run() then just bump 831 869 # the reference count instead of opening/closing across tasks. ··· 854 892 855 893 async def process_reflection(self, last_post_text: str | None = None) -> Response: 856 894 """Generate a daily reflection post from recent memory.""" 857 - # Gather context from memory 858 - recent_interactions: list[dict] = [] 859 - episodic_context = "" 895 + logger.info("processing daily reflection") 896 + 897 + # Pre-fetch context that doesn't benefit from semantic search against the prompt 898 + recent_activity = "" 860 899 if self.memory: 861 900 try: 862 901 recent_interactions = await self.memory.get_recent_interactions( ··· 865 904 logger.info( 866 905 f"reflection: {len(recent_interactions)} recent interactions" 867 906 ) 868 - except Exception as e: 869 - logger.warning(f"failed to get recent interactions for reflection: {e}") 870 - try: 871 - episodic_context = await self.memory.get_episodic_context( 872 - "daily reflection recent events" 873 - ) 874 - if episodic_context: 875 - logger.info( 876 - f"reflection episodic context: {len(episodic_context)} chars" 907 + if recent_interactions: 908 + unique_handles = {i["handle"] for i in recent_interactions} 909 + lines = [ 910 + f"[RECENT ACTIVITY]: {len(recent_interactions)} interactions " 911 + f"with {len(unique_handles)} people in the last day" 912 + ] 913 + exchange_lines = [] 914 + for i in recent_interactions[:5]: 915 + exchange_lines.append( 916 + f"- with @{i['handle']}: {i['content'][:150]}" 917 + ) 918 + lines.append("[SAMPLE EXCHANGES]:\n" + "\n".join(exchange_lines)) 919 + recent_activity = "\n\n".join(lines) 920 + else: 921 + recent_activity = ( 922 + "[RECENT ACTIVITY]: no interactions in the last day" 877 923 ) 878 924 except Exception as e: 879 - logger.warning(f"failed to get episodic context for reflection: {e}") 925 + logger.warning(f"failed to get recent interactions for reflection: {e}") 880 926 881 - # Check service health for reflection context 882 - service_status = "" 927 + service_health = "" 883 928 try: 884 - service_status = await _check_services_impl() 929 + service_health = await _check_services_impl() 885 930 except Exception: 886 931 pass 887 932 888 - # Build the reflection prompt 889 - prompt_parts = [f"[TODAY]: {date.today().isoformat()}"] 890 - 891 - if last_post_text: 892 - prompt_parts.append(f"[YOUR LAST POST]: {last_post_text}") 893 - 894 - if recent_interactions: 895 - unique_handles = {i["handle"] for i in recent_interactions} 896 - prompt_parts.append( 897 - f"[RECENT ACTIVITY]: {len(recent_interactions)} interactions with {len(unique_handles)} people in the last day" 898 - ) 899 - samples = recent_interactions[:5] 900 - exchange_lines = [] 901 - for i in samples: 902 - exchange_lines.append(f"- with @{i['handle']}: {i['content'][:150]}") 903 - prompt_parts.append("[SAMPLE EXCHANGES]:\n" + "\n".join(exchange_lines)) 904 - else: 905 - prompt_parts.append("[RECENT ACTIVITY]: no interactions in the last day") 933 + deps = PhiDeps( 934 + author_handle="", 935 + memory=self.memory, 936 + last_post_text=last_post_text, 937 + recent_activity=recent_activity, 938 + service_health=service_health, 939 + ) 906 940 907 - if episodic_context: 908 - prompt_parts.append(episodic_context) 909 - 910 - if service_status: 911 - prompt_parts.append(f"[SERVICE HEALTH]:\n{service_status}") 912 - 913 - prompt_parts.append( 914 - "[REFLECTION TASK]: you're posting a short top-level reflection on your day. " 941 + reflection_task = ( 942 + "you're posting a short top-level reflection on your day. " 915 943 "not a thread, not a reply — just something you want to put out there. " 916 944 "use what you know: recent exchanges, things you noticed, or just the fact that you're here. " 917 945 "if your last post already covers this ground, or you'd just be rehashing the same themes, " ··· 919 947 "if you do post, keep it brief and genuine — your voice, not a performance." 920 948 ) 921 949 922 - prompt = "\n\n".join(prompt_parts) 923 - 924 - logger.info("processing daily reflection") 925 - deps = PhiDeps(author_handle="", memory=self.memory) 926 - 927 950 toolsets = self._mcp_toolsets() 928 951 async with contextlib.AsyncExitStack() as stack: 929 952 for ts in toolsets: 930 953 await stack.enter_async_context(ts) 931 - result = await self.agent.run(prompt, deps=deps, toolsets=toolsets) 954 + result = await self.agent.run(reflection_task, deps=deps, toolsets=toolsets) 932 955 933 956 logger.info( 934 957 f"reflection decided: {result.output.action}"