a digital entity named phi that roams bsky phi.zzstoatzz.io
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

batch notification handling — one cognitive event per poll, action via tools

The unit of work is no longer "a notification" — it's "the set of new
notifications since the last poll." When phi polls and sees activity,
she now gets the entire batch in one agent run, looks at it as a whole,
and decides what (if anything) to do about each item. Multiple replies
in one thread by the same author no longer produce N independent
agent runs and N redundant responses.

The action layer also moved off structured output. Previously the
agent returned `Response(action="reply", text="...", reason="...")`
and the handler interpreted that to call create_post. Now the agent
calls trusted posting tools directly: reply_to / like_post / repost_post
(new) and the existing `post` tool for top-level musings/reflections.
The structured-output layer was load-bearing for mention-consent
allowlists, reply-ref construction, memory writes, status metrics,
and grapheme-aware splitting — all of those concerns moved into the
tool implementations themselves, so nothing was lost.

what changed:

agent.py
- main agent's output_type: Response -> str (just a summary for logs)
- new _format_notifications_block(): renders the batch as a thread-grouped
+ engagement-listed [NEW NOTIFICATIONS] block for the system prompt
- new inject_notifications dynamic system prompt
- inject_user_memory rewritten to walk notifications_context and build
per-author memory blocks (deduped, core memory included once)
- inject_episodic uses notification post texts as the query when present,
falls back to user prompt for musing/reflection
- inject_author_lookup -> inject_author_lookups (dict of stranger blocks)
- process_mention -> process_notifications(notifications_context, ...)
- process_musing / process_reflection updated to drop Response interpretation
- operational instructions: replaced "indicate via structured output" with
trusted-posting-tool docs and "treat author chains in one thread as one
logical message"
- removed Response class entirely

tools/posting.py (new)
- reply_to(uri, text): wraps bot_client.create_post with mention-consent
allowlist, reply-ref construction, memory writes, status recording.
refuses URIs not in current notifications_context.
- like_post(uri): wraps bot_client.like_post. context-validated.
- repost_post(uri): wraps bot_client.repost. context-validated.
- (top-level posts still go through the existing `post` tool in bluesky.py)

tools/_helpers.py PhiDeps
- new notifications_context: dict | None — populated by handler before agent.run
- author_lookup -> author_lookups: dict[handle, str] | None — for batched lookups

services/message_handler.py
- replaced _handle_post / _handle_engagement / _handle_follow / handle_notification
with one handle_batch(notifications) method
- handle_batch: filters rate-limited authors, builds notifications_context with
per-notification context (via _build_post_entry / _build_engagement_entry /
_build_follow_entry), eagerly looks up unfamiliar authors deduped by handle,
calls process_notifications once
- original_thought / daily_reflection updated for str return; posting happens
via the `post` tool inside the agent run
- explore() unchanged (process_exploration still returns int)

services/notification_poller.py
- _check_notifications dispatches one task per poll for the whole batch,
not one per notification
- _handle_with_semaphore -> _handle_batch_with_semaphore

evals/conftest.py
- defines a local Response model for the eval test agents (which still want
structured output for assertion convenience). production no longer has
Response so the import was broken.

tests/test_rate_limiting.py
- updated TestMessageHandlerRateLimiting to assert on handle_batch behavior:
rate-limited authors get filtered out of the batch, and if nothing remains
after filtering, the agent run is skipped entirely.

what stays the same:
- extraction agent (result-shaped, ExtractionResult unchanged)
- exploration agent (result-shaped, ExplorationResult unchanged)
- personality file
- memory layer
- MCP toolsets
- mention-consent allowlist logic (now lives inside the trusted tools)

prompted by the duplicate-reply incident: phi got a 2-post chain from
devlog, processed each post as a separate mention notification, and
posted two separate replies. operator pointed out that the framing of
"one notification = one task" was the bug — the unit of work should be
"the conversation phi is currently looking at," and the tool layer is
the natural place for the agent to decide where and how to act.

+690 -382
+24 -3
evals/conftest.py
··· 1 - """Eval test configuration.""" 1 + """Eval test configuration. 2 + 3 + The eval test agents define their own structured `Response` output type 4 + locally — production phi (in bot.agent) was migrated to a tool-based 5 + action layer where side effects happen via tool calls and the agent run 6 + returns a plain summary string. The eval fixtures predate that migration 7 + and still want a structured-output shape so individual eval tests can 8 + make assertions on response.action / response.text. Keeping it local to 9 + the eval harness keeps the production code clean of vestigial action shapes. 10 + """ 2 11 3 12 import os 4 13 from collections import defaultdict ··· 6 15 from pathlib import Path 7 16 8 17 import pytest 9 - from pydantic import BaseModel 18 + from pydantic import BaseModel, Field 10 19 from pydantic_ai import Agent, RunContext 11 20 12 - from bot.agent import Response 13 21 from bot.config import Settings 14 22 from bot.memory import NamespaceMemory 23 + 24 + 25 + class Response(BaseModel): 26 + """Structured response shape used by the eval test agents only.""" 27 + 28 + action: str = Field(description="reply, like, repost, post, or ignore") 29 + text: str | None = Field( 30 + default=None, description="response text when action is reply or post" 31 + ) 32 + reason: str | None = Field( 33 + default=None, description="brief reason when action is ignore" 34 + ) 35 + 15 36 16 37 # feed tool instructions — extracted from OPERATIONAL_INSTRUCTIONS to avoid 17 38 # the full agent import requiring bluesky creds at module level.
+232 -114
src/bot/agent.py
··· 7 7 from datetime import date 8 8 from pathlib import Path 9 9 10 - from pydantic import BaseModel, Field 11 10 from pydantic_ai import Agent, ImageUrl, RunContext 12 11 from pydantic_ai.mcp import MCPServerStreamableHTTP 13 12 ··· 25 24 def _build_operational_instructions() -> str: 26 25 """Build operational instructions with the current owner handle interpolated.""" 27 26 return f""" 28 - indicate your response action via the structured output — do not use atproto tools to post, like, or repost directly. 27 + you receive all notification types in batches — when you check notifications you may see several at once spanning multiple threads or conversations. think of it as opening a notifications tab: look at everything new, decide what (if anything) to do about each item, and act. silence is fine for things that don't warrant a response — you don't have to act on every notification. 28 + 29 + to act on notifications, use these trusted posting tools: 30 + - reply_to(uri, text): reply to a specific post from your current notifications. handles mention-consent allowlists, reply-ref construction, grapheme-aware splitting, and memory writes for you. 31 + - like_post(uri): like a post from your current notifications. use sparingly, only when something deserves a quiet acknowledgment. 32 + - repost_post(uri): repost a post from your current notifications. use very rarely, only when something genuinely deserves amplification. 33 + - post (top-level): create a new top-level post unprompted. use for musings or daily reflections, not in response to a notification. 34 + 35 + these tools are the only sanctioned path. do NOT use raw atproto record tools (e.g. create_record on app.bsky.feed.post via the pdsx MCP) to post — those bypass mention consent and memory writes. the URI you pass to reply_to / like_post / repost_post must be a URI you saw in your current [NEW NOTIFICATIONS] block; the tools refuse arbitrary URIs. 36 + 29 37 when sharing URLs, verify them with check_urls first and always include https://. 30 38 31 39 you receive all notification types — mentions, replies, quotes, likes, reposts, and follows. 32 40 for mentions, replies, and quotes: someone is talking to you or about you. respond if you have something to say. 33 41 for likes, reposts, and follows: someone showed up. use your tools to learn about them — check their profile, read their posts, see what they're about. note anything interesting for later. you'll almost never reply to a like, but you might learn something worth remembering. 42 + 43 + when you see a single author has posted multiple things in one thread in this batch, treat it as one logical message — reply once at the most recent post (or where it makes most sense), not once per post. 34 44 35 45 your memory is a tool, not ground truth. context injected before each message comes 36 46 from multiple sources with different reliability: ··· 103 113 return " ".join(part for part in prompt if isinstance(part, str)) 104 114 105 115 106 - class Response(BaseModel): 107 - """Agent response indicating what action to take.""" 116 + def _format_notifications_block(notifications_context: dict) -> str: 117 + """Format the notifications batch as a readable [NEW NOTIFICATIONS] block. 118 + 119 + Groups thread-style notifications (mention/reply/quote) by thread root so 120 + multiple posts in one conversation render as one section. Engagement items 121 + (like/repost/follow) are listed separately at the end. Each item shows its 122 + URI in brackets so the agent can pass it to the trusted posting tools. 123 + """ 124 + if not notifications_context: 125 + return "" 108 126 109 - action: str = Field(description="reply, like, repost, or ignore") 110 - text: str | None = Field( 111 - default=None, description="response text when action is reply" 127 + threads: dict[str, list[dict]] = {} 128 + engagement: list[dict] = [] 129 + for entry in notifications_context.values(): 130 + reason = entry.get("reason", "") 131 + if reason in ("mention", "reply", "quote"): 132 + root = entry.get("root_uri") or entry.get("uri", "") 133 + threads.setdefault(root, []).append(entry) 134 + else: 135 + engagement.append(entry) 136 + 137 + lines: list[str] = [] 138 + lines.append( 139 + "[NEW NOTIFICATIONS — process the batch and use posting tools as appropriate]" 112 140 ) 113 - reason: str | None = Field( 114 - default=None, description="brief reason when action is ignore" 141 + lines.append( 142 + "you have new activity since your last poll. for each item, decide whether to act and how. " 143 + "use reply_to / like_post / repost_post to act on items in this batch. " 144 + "you don't have to act on every notification — silence is fine for things that don't warrant a response." 115 145 ) 116 146 147 + for root_uri, entries in threads.items(): 148 + # sort entries within a thread chronologically 149 + entries.sort(key=lambda e: e.get("indexed_at", "")) 150 + thread_ctx = entries[0].get("thread_context", "") or "" 151 + 152 + lines.append("") 153 + lines.append(f"═══ thread: {root_uri} ═══") 154 + if thread_ctx and thread_ctx != "No previous messages in this thread.": 155 + lines.append("thread context:") 156 + lines.append(thread_ctx) 157 + lines.append("") 158 + lines.append("new in this thread:") 159 + for e in entries: 160 + handle = e.get("author_handle", "?") 161 + uri = e.get("uri", "") 162 + reason = e.get("reason", "") 163 + ts = (e.get("indexed_at") or "")[:19].replace("T", " ") 164 + text = e.get("post_text", "") 165 + embed = e.get("embed_desc") or "" 166 + embed_part = f"\n {embed}" if embed else "" 167 + lines.append(f"- @{handle} [{reason}, {ts}] [{uri}]: {text}{embed_part}") 168 + 169 + if engagement: 170 + lines.append("") 171 + lines.append("═══ engagement ═══") 172 + for e in engagement: 173 + handle = e.get("author_handle", "?") 174 + reason = e.get("reason", "") 175 + uri = e.get("uri", "") 176 + ts = (e.get("indexed_at") or "")[:19].replace("T", " ") 177 + target_text = e.get("post_text", "") 178 + target_part = f' — "{target_text[:120]}"' if target_text else "" 179 + if reason == "follow": 180 + lines.append(f"- @{handle} [follow, {ts}] followed you") 181 + else: 182 + lines.append( 183 + f"- @{handle} [{reason}, {ts}] {reason}d your post {uri}{target_part}" 184 + ) 185 + 186 + return "\n".join(lines) 187 + 117 188 118 189 class PhiAgent: 119 190 """phi - bluesky bot with structured memory and MCP tools.""" ··· 142 213 # Create PydanticAI agent without MCP toolsets — they're created 143 214 # fresh per agent.run() call to avoid the cancel scope bug: 144 215 # https://github.com/pydantic/pydantic-ai/issues/2818 145 - self.agent = Agent[PhiDeps, Response]( 216 + # 217 + # output_type=str: the agent's "decision" is no longer a structured 218 + # action — actions happen as tool calls during the run (reply_to, 219 + # like_post, etc). The final string return is just a brief summary 220 + # for logging. 221 + self.agent = Agent[PhiDeps, str]( 146 222 name="phi", 147 223 model=settings.agent_model, 148 224 system_prompt=f"{self.base_personality}\n\n{_build_operational_instructions()}", 149 - output_type=Response, 225 + output_type=str, 150 226 deps_type=PhiDeps, 151 227 ) 152 228 ··· 161 237 return f"[TODAY]: {date.today().isoformat()}" 162 238 163 239 @self.agent.system_prompt(dynamic=True) 164 - def inject_thread(ctx: RunContext[PhiDeps]) -> str: 165 - tc = ctx.deps.thread_context 166 - if tc and tc != "No previous messages in this thread.": 167 - return ( 168 - f"[CURRENT THREAD - these are the messages in THIS thread]:\n{tc}" 169 - ) 170 - return "" 240 + def inject_notifications(ctx: RunContext[PhiDeps]) -> str: 241 + """Render the notifications batch as the [NEW NOTIFICATIONS] block.""" 242 + return _format_notifications_block(ctx.deps.notifications_context or {}) 171 243 172 244 @self.agent.system_prompt(dynamic=True) 173 245 async def inject_user_memory(ctx: RunContext[PhiDeps]) -> str: 174 - if not ctx.deps.memory or not ctx.deps.author_handle: 246 + """Inject per-author memory blocks for every unique author in the batch. 247 + 248 + For each unique author across the notifications context, build a 249 + memory block keyed on the union of their post texts in this batch 250 + (so semantic search returns memories relevant to what they're 251 + currently saying). Core memory is fetched once via the first block 252 + to avoid repetition. 253 + """ 254 + if not ctx.deps.memory: 175 255 return "" 176 - query = _extract_query_text(ctx.prompt) 177 - if not query: 256 + notifs = ctx.deps.notifications_context or {} 257 + if not notifs: 178 258 return "" 179 - try: 180 - memory_context = await ctx.deps.memory.build_user_context( 181 - ctx.deps.author_handle, query_text=query, include_core=True 182 - ) 183 - if memory_context: 184 - return f"[PAST CONTEXT WITH @{ctx.deps.author_handle}]:\n{memory_context}" 185 - except Exception as e: 186 - logger.warning(f"failed to retrieve memories: {e}") 187 - return "" 259 + 260 + by_author: dict[str, list[str]] = {} 261 + for entry in notifs.values(): 262 + handle = entry.get("author_handle") 263 + text = entry.get("post_text", "") 264 + if handle and handle not in ( 265 + settings.owner_handle, 266 + settings.bluesky_handle, 267 + ): 268 + by_author.setdefault(handle, []).append(text or "") 269 + 270 + if not by_author: 271 + return "" 272 + 273 + blocks: list[str] = [] 274 + include_core = True 275 + for handle, texts in by_author.items(): 276 + query = " ".join(t for t in texts if t) or handle 277 + try: 278 + block = await ctx.deps.memory.build_user_context( 279 + handle, query_text=query, include_core=include_core 280 + ) 281 + include_core = False # only include core in the first block 282 + if block: 283 + blocks.append(block) 284 + except Exception as e: 285 + logger.warning(f"failed to retrieve memories for @{handle}: {e}") 286 + return "\n\n".join(blocks) 188 287 189 288 @self.agent.system_prompt(dynamic=True) 190 289 async def inject_episodic(ctx: RunContext[PhiDeps]) -> str: 191 290 if not ctx.deps.memory: 192 291 return "" 193 - query = _extract_query_text(ctx.prompt) 292 + # build query from notification post texts when batch is present, 293 + # else fall back to the user prompt text (musing/reflection cases) 294 + notifs = ctx.deps.notifications_context or {} 295 + if notifs: 296 + texts = [ 297 + e.get("post_text", "") 298 + for e in notifs.values() 299 + if e.get("post_text") 300 + ] 301 + query = " ".join(texts) 302 + else: 303 + query = _extract_query_text(ctx.prompt) 194 304 if not query: 195 305 return "" 196 306 try: ··· 220 330 return "" 221 331 222 332 @self.agent.system_prompt(dynamic=True) 223 - def inject_author_lookup(ctx: RunContext[PhiDeps]) -> str: 224 - if ctx.deps.author_lookup: 225 - return ctx.deps.author_lookup 226 - return "" 333 + def inject_author_lookups(ctx: RunContext[PhiDeps]) -> str: 334 + """Inject pre-fetched stranger lookups for unfamiliar authors in this batch.""" 335 + lookups = ctx.deps.author_lookups or {} 336 + if not lookups: 337 + return "" 338 + return "\n\n".join(lookups.values()) 227 339 228 340 # --- register tools from tools/ package --- 229 341 ··· 268 380 ), 269 381 ] 270 382 271 - async def process_mention( 383 + async def process_notifications( 272 384 self, 273 - mention_text: str, 274 - author_handle: str, 275 - thread_context: str, 276 - thread_uri: str | None = None, 277 - image_urls: list[str] | None = None, 278 - author_lookup: str | None = None, 279 - ) -> Response: 280 - """Process a mention with structured memory context.""" 281 - logger.info(f"processing mention from @{author_handle}: {mention_text[:80]}") 385 + notifications_context: dict, 386 + author_lookups: dict[str, str] | None = None, 387 + image_urls_by_uri: dict[str, list[str]] | None = None, 388 + ) -> str: 389 + """Run the agent over a batch of notifications. 390 + 391 + The unit of work is "the set of new notifications since the last poll." 392 + The agent looks at all of them at once, decides what (if anything) to do 393 + about each, and acts via the trusted posting tools (reply_to / like_post 394 + / repost_post). Side effects happen as tool calls during the run; the 395 + return value is just a summary string for logging. 396 + 397 + notifications_context: dict mapping post URI -> per-notification context 398 + (cid, reason, author, text, thread refs, etc). Built by the handler. 399 + author_lookups: pre-fetched stranger lookups keyed by author handle. 400 + image_urls_by_uri: optional map of post URI -> image URLs for vision. 401 + """ 402 + if not notifications_context: 403 + logger.info("process_notifications: empty batch, nothing to do") 404 + return "" 405 + 406 + author_count = len( 407 + { 408 + e.get("author_handle") 409 + for e in notifications_context.values() 410 + if e.get("author_handle") 411 + } 412 + ) 413 + logger.info( 414 + f"processing notifications batch: {len(notifications_context)} items, " 415 + f"{author_count} unique authors" 416 + ) 282 417 283 418 deps = PhiDeps( 284 - author_handle=author_handle, 419 + author_handle="", 285 420 memory=self.memory, 286 - thread_uri=thread_uri, 287 - thread_context=thread_context, 288 - author_lookup=author_lookup, 421 + notifications_context=notifications_context, 422 + author_lookups=author_lookups, 289 423 ) 290 424 291 - # User prompt is just the message — context is injected via dynamic system prompts 292 - user_prompt: str | list = f"@{author_handle}: {mention_text}" 293 - if image_urls: 294 - user_prompt = [user_prompt] + [ImageUrl(url=url) for url in image_urls] 295 - logger.info(f"including {len(image_urls)} images in prompt") 425 + # User prompt is a short task instruction — the actual notifications 426 + # block is rendered via the inject_notifications dynamic system prompt. 427 + # Images from any post in the batch are attached as multimodal inputs. 428 + user_prompt: str | list = ( 429 + "process your new notifications batch. look at the [NEW NOTIFICATIONS] " 430 + "block in your context, decide what to do, and use the trusted posting " 431 + "tools to act. you don't have to act on every item — silence is fine." 432 + ) 433 + all_image_urls: list[str] = [] 434 + if image_urls_by_uri: 435 + for urls in image_urls_by_uri.values(): 436 + all_image_urls.extend(urls) 437 + if all_image_urls: 438 + user_prompt = [user_prompt] + [ImageUrl(url=u) for u in all_image_urls] 439 + logger.info(f"including {len(all_image_urls)} images in batch prompt") 296 440 297 - # Enter MCP servers before agent.run() so the connection is opened 298 - # in this task. Parallel tool calls inside agent.run() then just bump 299 - # the reference count instead of opening/closing across tasks. 300 - # https://github.com/pydantic/pydantic-ai/issues/2818 301 441 toolsets = self._mcp_toolsets() 302 442 try: 303 443 async with contextlib.AsyncExitStack() as stack: ··· 305 445 await stack.enter_async_context(ts) 306 446 result = await self.agent.run(user_prompt, deps=deps, toolsets=toolsets) 307 447 except Exception as e: 308 - # Don't go silent on tool/agent failures — surface to the operator. 309 - # Phi posts a brief honest reply tagging the owner so the failure 310 - # gets noticed instead of disappearing into a log line. 448 + # Don't go silent on tool/agent failures. The batch path can't easily 449 + # post a reply to a specific notification on failure (we don't know 450 + # which one would have been the target), so we log loudly and let 451 + # the operator notice via metrics / status. The previous fallback 452 + # of "post a tagged reply" doesn't fit a multi-target batch. 311 453 err_type = type(e).__name__ 312 - err_msg = str(e)[:200] 313 454 logger.exception( 314 - f"agent.run failed for mention from @{author_handle}: {err_type}" 455 + f"agent.run failed during batch processing: {err_type}: {e}" 315 456 ) 316 - return Response( 317 - action="reply", 318 - text=( 319 - f"hit a tool error mid-response and dropped out so i don't compound it. " 320 - f"@{settings.owner_handle} this needs your attention — check the logs." 321 - ), 322 - reason=f"{err_type}: {err_msg}", 323 - ) 457 + return f"batch failed: {err_type}: {str(e)[:200]}" 324 458 325 - logger.info( 326 - f"agent decided: {result.output.action}" 327 - + (f" - {result.output.text[:80]}" if result.output.text else "") 328 - + (f" ({result.output.reason})" if result.output.reason else "") 329 - ) 459 + summary = result.output or "" 460 + logger.info(f"batch run finished: {summary[:200]}") 461 + return summary 330 462 331 - # Store interaction and extract observations 332 - if self.memory and result.output.action == "reply" and result.output.text: 333 - try: 334 - await self.memory.after_interaction( 335 - author_handle, mention_text, result.output.text 336 - ) 337 - except Exception as e: 338 - logger.warning(f"failed to store interaction: {e}") 339 - 340 - return result.output 463 + async def process_reflection(self, last_post_text: str | None = None) -> str: 464 + """Generate a daily reflection post from recent memory. 341 465 342 - async def process_reflection(self, last_post_text: str | None = None) -> Response: 343 - """Generate a daily reflection post from recent memory.""" 466 + Side effects (posting) happen via the `post` tool inside the agent run. 467 + Return value is just a summary string for logging. 468 + """ 344 469 logger.info("processing daily reflection") 345 470 346 471 # Pre-fetch context that doesn't benefit from semantic search against the prompt ··· 388 513 ) 389 514 390 515 reflection_task = ( 391 - "you're posting a short top-level reflection on your day. " 516 + "you have a moment to post a short top-level reflection on your day. " 392 517 "not a thread, not a reply — just something you want to put out there. " 393 518 "use what you know: recent exchanges, things you noticed, or just the fact that you're here. " 394 519 "if your last post already covers this ground, or you'd just be rehashing the same themes, " 395 - "action='ignore' is the right call — don't post for the sake of posting. " 396 - "if you do post, keep it brief and genuine — your voice, not a performance." 520 + "stay quiet — don't post for the sake of posting. " 521 + "if you do post, use the `post` tool with brief, genuine text — your voice, not a performance." 397 522 ) 398 523 399 524 toolsets = self._mcp_toolsets() ··· 407 532 except Exception as e: 408 533 err_type = type(e).__name__ 409 534 logger.exception(f"agent.run failed during reflection: {err_type}") 410 - return Response( 411 - action="ignore", 412 - reason=f"reflection {err_type}: {str(e)[:200]}", 413 - ) 535 + return f"reflection failed: {err_type}: {str(e)[:200]}" 414 536 415 - logger.info( 416 - f"reflection decided: {result.output.action}" 417 - + (f" - {result.output.text[:80]}" if result.output.text else "") 418 - + (f" ({result.output.reason})" if result.output.reason else "") 419 - ) 420 - return result.output 537 + summary = result.output or "" 538 + logger.info(f"reflection finished: {summary[:200]}") 539 + return summary 540 + 541 + async def process_musing(self, recent_posts: list[str] | None = None) -> str: 542 + """Generate an original thought post from memory, reading, patterns noticed. 421 543 422 - async def process_musing(self, recent_posts: list[str] | None = None) -> Response: 423 - """Generate an original thought post from memory, reading, patterns noticed.""" 544 + Side effects (posting) happen via the `post` tool inside the agent run. 545 + Return value is just a summary string for logging. 546 + """ 424 547 logger.info("processing musing") 425 548 426 549 # Build context about what phi has posted recently to avoid repetition ··· 456 579 "check your recent posts first. if you'd just be echoing yourself, skip it. " 457 580 "this is your feed; post things you'd want to follow yourself for. " 458 581 "use your tools — search posts, check trending, look things up — if something " 459 - "sparks your curiosity. but don't force it. if nothing's there, action='ignore'." 582 + "sparks your curiosity. but don't force it. if nothing's there, just stay quiet. " 583 + "if you do want to post, use the `post` tool." 460 584 ) 461 585 462 586 toolsets = self._mcp_toolsets() ··· 468 592 except Exception as e: 469 593 err_type = type(e).__name__ 470 594 logger.exception(f"agent.run failed during musing: {err_type}") 471 - return Response( 472 - action="ignore", 473 - reason=f"musing {err_type}: {str(e)[:200]}", 474 - ) 595 + return f"musing failed: {err_type}: {str(e)[:200]}" 475 596 476 - logger.info( 477 - f"musing decided: {result.output.action}" 478 - + (f" - {result.output.text[:80]}" if result.output.text else "") 479 - + (f" ({result.output.reason})" if result.output.reason else "") 480 - ) 481 - return result.output 597 + summary = result.output or "" 598 + logger.info(f"musing finished: {summary[:200]}") 599 + return summary 482 600 483 601 async def process_extraction(self) -> int: 484 602 """Review recent unprocessed interactions and extract observations. Returns count stored."""
+200 -222
src/bot/services/message_handler.py
··· 1 - """Message handler using MCP-enabled agent.""" 1 + """Message handler — batch-based notification processing. 2 + 3 + The unit of work is a *poll cycle*: each poll dispatches one batch task that 4 + covers every unread notification at once. The handler fetches the necessary 5 + context (post bodies, thread state, stranger lookups), builds the structured 6 + notifications_context dict, and runs the agent once. Side effects (replies, 7 + likes, reposts) happen as tool calls inside the agent run, not as Response 8 + interpretation in the handler. 9 + 10 + This means: when an author posts a chain of replies in one thread, phi sees 11 + them as one logical conversation and responds at most once. When a busy hour 12 + brings activity in three different threads, phi makes one decision covering 13 + all of them. 14 + """ 2 15 3 16 import logging 4 17 5 18 import logfire 6 - from atproto_client import models 7 19 from limits import parse as parse_limit 8 20 from limits.storage import MemoryStorage 9 21 from limits.strategies import MovingWindowRateLimiter ··· 27 39 _user_limit = parse_limit("30/hour") 28 40 29 41 30 - async def _allowed_handles(*extra: str) -> set[str]: 31 - """Build the set of handles phi may tag (create mention facets for). 32 - 33 - Always includes the bot owner, the bot itself, and anyone who has 34 - opted in (stored on PDS). Pass conversation participants as *extra*. 35 - """ 36 - from bot.core.mentionable import get_mentionable_handles 37 - 38 - base = {settings.owner_handle, settings.bluesky_handle} 39 - try: 40 - base.update(await get_mentionable_handles()) 41 - except Exception: 42 - logger.warning("failed to load mentionable handles from PDS, using base set") 43 - return base | {h for h in extra if h} 44 - 45 - 46 42 class MessageHandler: 47 43 """Handles incoming notifications using phi agent.""" 48 44 ··· 51 47 self.agent = PhiAgent() 52 48 53 49 async def _maybe_lookup_stranger(self, author_handle: str) -> str | None: 54 - """If author is a stranger, fetch their profile + recent posts as context. 55 - 56 - Pre-reply behavior modeled on what a person would naturally do when 57 - someone they don't know responds to them: glance at the profile. 58 - Skipped for the owner, phi itself, and anyone phi already knows. 59 - """ 50 + """If author is unfamiliar to phi, fetch their profile + recent posts.""" 60 51 if not self.agent.memory: 61 52 return None 62 53 if author_handle in (settings.owner_handle, settings.bluesky_handle): ··· 73 64 logger.debug(f"author lookup failed for @{author_handle}: {e}") 74 65 return None 75 66 76 - async def handle_notification(self, notification): 77 - """Process any notification through the agent.""" 78 - reason = notification.reason 79 - author_handle = notification.author.handle 80 - 81 - if not _limiter.hit(_user_limit, author_handle): 82 - logger.warning(f"rate limited @{author_handle}") 83 - return 84 - 85 - with logfire.span( 86 - "handle notification", 87 - reason=reason, 88 - author=author_handle, 89 - ): 90 - try: 91 - if reason in ("mention", "reply", "quote"): 92 - await self._handle_post(notification) 93 - elif reason in ("like", "repost"): 94 - await self._handle_engagement(notification) 95 - elif reason == "follow": 96 - await self._handle_follow(notification) 97 - else: 98 - logger.debug(f"notification type '{reason}' from @{author_handle}") 99 - except Exception as e: 100 - logger.exception(f"notification handling error: {e}") 101 - bot_status.record_error() 67 + async def _build_post_entry(self, notification) -> dict | None: 68 + """Build the notifications_context entry for a mention/reply/quote. 102 69 103 - async def _handle_engagement(self, notification): 104 - """Process a like or repost — someone engaged with phi's content.""" 105 - reason = notification.reason 106 - author_handle = notification.author.handle 70 + Fetches the post body, thread context, embed description, and reply refs 71 + so the trusted posting tools can act on the URI without needing to 72 + re-fetch anything. 73 + """ 107 74 post_uri = notification.uri 108 - 109 - # Fetch phi's post that was liked/reposted 110 - posts = await self.client.get_posts([post_uri]) 111 - if not posts.posts: 75 + try: 76 + posts_resp = await self.client.get_posts([post_uri]) 77 + except Exception as e: 78 + logger.warning(f"failed to fetch post {post_uri}: {e}") 79 + return None 80 + if not posts_resp.posts: 112 81 logger.warning(f"could not find post {post_uri}") 113 - return 114 - 115 - post = posts.posts[0] 116 - post_text = post.record.text if hasattr(post.record, "text") else "" 117 - 118 - bot_status.record_mention() 119 - 120 - mention_text = f"[notification: @{author_handle} {reason}d your post]\nyour post: {post_text}" 82 + return None 83 + post = posts_resp.posts[0] 121 84 122 - author_lookup = await self._maybe_lookup_stranger(author_handle) 123 - response = await self.agent.process_mention( 124 - mention_text=mention_text, 125 - author_handle=author_handle, 126 - thread_context="", 127 - author_lookup=author_lookup, 128 - ) 129 - 130 - if response.action == "ignore": 131 - logger.info(f"ignoring {reason} from @{author_handle}: {response.reason}") 132 - elif response.action == "reply" and response.text: 133 - # reply to phi's own post as a follow-up 134 - parent_ref = models.ComAtprotoRepoStrongRef.Main(uri=post_uri, cid=post.cid) 135 - if hasattr(post.record, "reply") and post.record.reply: 136 - root_ref = post.record.reply.root 137 - else: 138 - root_ref = parent_ref 139 - reply_ref = models.AppBskyFeedPost.ReplyRef( 140 - parent=parent_ref, root=root_ref 141 - ) 142 - allowed = await _allowed_handles(author_handle) 143 - await self.client.create_post( 144 - response.text, reply_to=reply_ref, allowed_handles=allowed 145 - ) 146 - bot_status.record_response() 147 - logger.info( 148 - f"replied on {reason} from @{author_handle}: {response.text[:80]}" 149 - ) 150 - else: 151 - logger.info(f"{response.action} on {reason} from @{author_handle}") 152 - bot_status.record_response() 153 - 154 - async def _handle_follow(self, notification): 155 - """Process a follow notification.""" 156 - author_handle = notification.author.handle 157 - 158 - bot_status.record_mention() 159 - 160 - mention_text = f"[notification: @{author_handle} followed you]" 161 - 162 - author_lookup = await self._maybe_lookup_stranger(author_handle) 163 - response = await self.agent.process_mention( 164 - mention_text=mention_text, 165 - author_handle=author_handle, 166 - thread_context="", 167 - author_lookup=author_lookup, 168 - ) 169 - 170 - if response.action == "ignore": 171 - logger.info(f"ignoring follow from @{author_handle}: {response.reason}") 172 - elif response.action == "reply" and response.text: 173 - # post as a top-level post since there's no thread to reply to 174 - allowed = await _allowed_handles(author_handle) 175 - await self.client.create_post(response.text, allowed_handles=allowed) 176 - bot_status.record_response() 177 - logger.info(f"posted on follow from @{author_handle}: {response.text[:80]}") 178 - else: 179 - logger.info(f"{response.action} on follow from @{author_handle}") 180 - 181 - async def _handle_post(self, notification): 182 - """Process a mention, reply, or quote notification.""" 183 - post_uri = notification.uri 184 - 185 - # Get the post 186 - posts = await self.client.get_posts([post_uri]) 187 - if not posts.posts: 188 - logger.warning(f"could not find post {post_uri}") 189 - return 190 - 191 - post = posts.posts[0] 192 - mention_text = resolve_facet_links(post.record) 85 + text = resolve_facet_links(post.record) 193 86 author_handle = post.author.handle 194 87 195 - # Include embed content (images, links, quote posts) in the mention 196 88 embed = post.embed if hasattr(post, "embed") and post.embed else None 197 89 if not embed and hasattr(post.record, "embed") and post.record.embed: 198 90 embed = post.record.embed 199 - 200 91 embed_desc = describe_embed(embed) if embed else None 201 - if embed_desc: 202 - mention_text = f"{mention_text}\n{embed_desc}" 203 - 204 - # Extract image URLs for multimodal vision 205 92 image_urls = extract_image_urls(embed) if embed else [] 206 93 207 - bot_status.record_mention() 208 - 209 - # Build reply reference 210 - parent_ref = models.ComAtprotoRepoStrongRef.Main(uri=post_uri, cid=post.cid) 211 - 212 - # Check if this is part of a thread 94 + # Determine thread root + parent ref 213 95 if hasattr(post.record, "reply") and post.record.reply: 214 - root_ref = post.record.reply.root 215 - thread_uri = root_ref.uri 96 + root_uri = post.record.reply.root.uri 97 + root_cid = post.record.reply.root.cid 98 + thread_uri = root_uri 216 99 else: 217 - root_ref = parent_ref 100 + root_uri = post_uri 101 + root_cid = post.cid 218 102 thread_uri = post_uri 219 103 220 - # Fetch thread context directly from network 104 + # Fetch thread context for the conversation 221 105 thread_context = "No previous messages in this thread." 222 106 try: 223 - logger.debug(f"fetching thread context for {thread_uri}") 224 107 thread_data = await self.client.get_thread(thread_uri, depth=100) 225 108 thread_context = build_thread_context(thread_data.thread) 226 109 except Exception as e: 227 - logger.warning(f"failed to fetch thread context: {e}") 110 + logger.warning(f"failed to fetch thread context for {thread_uri}: {e}") 228 111 229 - # Pre-reply lookup for strangers — natural "let me see who you are" behavior 230 - author_lookup = await self._maybe_lookup_stranger(author_handle) 112 + return { 113 + "uri": post_uri, 114 + "cid": post.cid, 115 + "reason": notification.reason, 116 + "author_handle": author_handle, 117 + "author_did": getattr(post.author, "did", ""), 118 + "post_text": text, 119 + "embed_desc": embed_desc or "", 120 + "image_urls": image_urls, 121 + "root_uri": root_uri, 122 + "root_cid": root_cid, 123 + "thread_uri": thread_uri, 124 + "thread_context": thread_context, 125 + "indexed_at": getattr(post, "indexed_at", "") or "", 126 + } 231 127 232 - # Process with agent (has episodic memory + MCP tools) 233 - response = await self.agent.process_mention( 234 - mention_text=mention_text, 235 - author_handle=author_handle, 236 - thread_context=thread_context, 237 - thread_uri=thread_uri, 238 - image_urls=image_urls, 239 - author_lookup=author_lookup, 240 - ) 128 + async def _build_engagement_entry(self, notification) -> dict | None: 129 + """Build the notifications_context entry for a like/repost. 130 + 131 + For these, notification.uri is phi's own post that was engaged with — 132 + so we fetch it for context but the action target (if phi decides to 133 + respond) is the engager's profile, not the post. 134 + """ 135 + post_uri = notification.uri 136 + post_text = "" 137 + cid = "" 138 + try: 139 + posts_resp = await self.client.get_posts([post_uri]) 140 + if posts_resp.posts: 141 + p = posts_resp.posts[0] 142 + post_text = p.record.text if hasattr(p.record, "text") else "" 143 + cid = p.cid 144 + except Exception as e: 145 + logger.debug(f"failed to fetch engaged post {post_uri}: {e}") 146 + 147 + return { 148 + "uri": post_uri, 149 + "cid": cid, 150 + "reason": notification.reason, 151 + "author_handle": notification.author.handle, 152 + "author_did": getattr(notification.author, "did", ""), 153 + "post_text": post_text, 154 + "embed_desc": "", 155 + "image_urls": [], 156 + "root_uri": post_uri, 157 + "root_cid": cid, 158 + "thread_uri": post_uri, 159 + "thread_context": "", 160 + "indexed_at": getattr(notification, "indexed_at", "") or "", 161 + } 241 162 242 - # Handle response actions 243 - if response.action == "ignore": 244 - logger.info(f"ignoring @{author_handle}: {response.reason}") 163 + async def _build_follow_entry(self, notification) -> dict: 164 + """Build the notifications_context entry for a follow.""" 165 + return { 166 + "uri": notification.uri, 167 + "cid": "", 168 + "reason": "follow", 169 + "author_handle": notification.author.handle, 170 + "author_did": getattr(notification.author, "did", ""), 171 + "post_text": "", 172 + "embed_desc": "", 173 + "image_urls": [], 174 + "root_uri": notification.uri, 175 + "root_cid": "", 176 + "thread_uri": "", 177 + "thread_context": "", 178 + "indexed_at": getattr(notification, "indexed_at", "") or "", 179 + } 180 + 181 + async def handle_batch(self, notifications: list): 182 + """Process a batch of unread notifications as one cognitive event. 183 + 184 + - Filters rate-limited authors per-notification (preserves the existing 185 + fairness behavior — chains still count toward each user's hourly cap). 186 + - Builds notifications_context with all the data the trusted posting 187 + tools need to act on URIs. 188 + - Eagerly fetches stranger lookups for unfamiliar authors in the batch 189 + and injects them up front (cheap, dedup'd by handle). 190 + - Calls agent.process_notifications once. Tool calls inside the agent 191 + run handle all side effects. 192 + """ 193 + if not notifications: 245 194 return 246 195 247 - elif response.action == "like": 248 - await self.client.like_post(uri=post_uri, cid=post.cid) 249 - logger.info(f"liked @{author_handle}") 250 - bot_status.record_response() 196 + # Filter rate-limited authors first; record_mention for the rest 197 + allowed_notifs = [] 198 + for n in notifications: 199 + handle = n.author.handle 200 + if not _limiter.hit(_user_limit, handle): 201 + logger.warning(f"rate limited @{handle}") 202 + continue 203 + bot_status.record_mention() 204 + allowed_notifs.append(n) 205 + 206 + if not allowed_notifs: 251 207 return 252 208 253 - elif response.action == "repost": 254 - await self.client.repost(uri=post_uri, cid=post.cid) 255 - logger.info(f"reposted @{author_handle}") 256 - bot_status.record_response() 257 - return 209 + with logfire.span("handle batch", count=len(allowed_notifs)): 210 + try: 211 + # Build notifications_context — one entry per notification 212 + notifications_context: dict = {} 213 + image_urls_by_uri: dict[str, list[str]] = {} 214 + 215 + for n in allowed_notifs: 216 + reason = n.reason 217 + entry: dict | None = None 218 + if reason in ("mention", "reply", "quote"): 219 + entry = await self._build_post_entry(n) 220 + elif reason in ("like", "repost"): 221 + entry = await self._build_engagement_entry(n) 222 + elif reason == "follow": 223 + entry = await self._build_follow_entry(n) 224 + else: 225 + logger.debug( 226 + f"unknown notification reason '{reason}' from @{n.author.handle}" 227 + ) 228 + continue 229 + 230 + if entry is None: 231 + continue 232 + notifications_context[entry["uri"]] = entry 233 + if entry.get("image_urls"): 234 + image_urls_by_uri[entry["uri"]] = entry["image_urls"] 235 + 236 + if not notifications_context: 237 + logger.info( 238 + "batch had no actionable notifications after building context" 239 + ) 240 + return 241 + 242 + # Eagerly look up unfamiliar authors (deduped by handle) 243 + author_lookups: dict[str, str] = {} 244 + unique_handles = { 245 + e.get("author_handle") 246 + for e in notifications_context.values() 247 + if e.get("author_handle") 248 + } 249 + for handle in unique_handles: 250 + lookup = await self._maybe_lookup_stranger(handle) 251 + if lookup: 252 + author_lookups[handle] = lookup 258 253 259 - elif response.action == "reply" and response.text: 260 - reply_ref = models.AppBskyFeedPost.ReplyRef( 261 - parent=parent_ref, root=root_ref 262 - ) 263 - allowed = await _allowed_handles(author_handle) 264 - await self.client.create_post( 265 - response.text, reply_to=reply_ref, allowed_handles=allowed 266 - ) 254 + logger.info( 255 + f"batch: {len(notifications_context)} items, " 256 + f"{len(unique_handles)} unique authors, " 257 + f"{len(author_lookups)} stranger lookups" 258 + ) 267 259 268 - bot_status.record_response() 269 - logger.info(f"replied to @{author_handle}: {response.text[:80]}") 260 + # Run the agent once over the whole batch. 261 + # Side effects happen via tool calls inside the run. 262 + await self.agent.process_notifications( 263 + notifications_context=notifications_context, 264 + author_lookups=author_lookups, 265 + image_urls_by_uri=image_urls_by_uri or None, 266 + ) 267 + except Exception as e: 268 + logger.exception(f"batch handler error: {e}") 269 + bot_status.record_error() 270 270 271 271 async def original_thought(self): 272 - """Generate and post an original thought if phi has something to say.""" 272 + """Generate and post an original thought if phi has something to say. 273 + 274 + The agent uses the `post` tool inside its run if it decides to post. 275 + """ 273 276 with logfire.span("original thought"): 274 - # Fetch recent posts so the agent can avoid repetition 275 277 recent_posts: list[str] = [] 276 278 try: 277 279 feed = await self.client.get_own_posts(limit=5) ··· 282 284 logger.warning(f"failed to fetch recent posts for musing: {e}") 283 285 284 286 try: 285 - response = await self.agent.process_musing( 287 + summary = await self.agent.process_musing( 286 288 recent_posts=recent_posts or None 287 289 ) 290 + logger.info(f"original thought: {summary[:200]}") 288 291 except Exception as e: 289 292 logger.exception(f"original thought failed: {e}") 290 - return 291 - 292 - if response.action in ("reply", "post") and response.text: 293 - try: 294 - allowed = await _allowed_handles() 295 - await self.client.create_post( 296 - response.text, allowed_handles=allowed 297 - ) 298 - bot_status.record_response() 299 - logger.info(f"original thought posted: {response.text[:80]}") 300 - except Exception as e: 301 - logger.exception(f"failed to post original thought: {e}") 302 - else: 303 - logger.info(f"original thought: nothing to say ({response.reason})") 304 293 305 294 async def explore(self): 306 295 """Run one exploration from the curiosity queue.""" ··· 315 304 logger.warning(f"exploration failed: {e}") 316 305 317 306 async def daily_reflection(self): 318 - """Generate and post a daily reflection if phi has something to say.""" 307 + """Generate and post a daily reflection if phi has something to say. 308 + 309 + The agent uses the `post` tool inside its run if it decides to post. 310 + """ 319 311 with logfire.span("daily reflection"): 320 312 # First: review unprocessed interactions and extract observations 321 313 try: ··· 325 317 except Exception as e: 326 318 logger.warning(f"extraction during reflection failed: {e}") 327 319 328 - # Fetch last top-level post so the agent knows what it said recently 329 320 last_post_text = None 330 321 try: 331 322 feed = await self.client.get_own_posts(limit=1) ··· 335 326 logger.warning(f"failed to fetch last post for reflection: {e}") 336 327 337 328 try: 338 - response = await self.agent.process_reflection( 329 + summary = await self.agent.process_reflection( 339 330 last_post_text=last_post_text 340 331 ) 332 + logger.info(f"daily reflection: {summary[:200]}") 341 333 except Exception as e: 342 334 logger.exception(f"daily reflection failed: {e}") 343 - return 344 - 345 - if response.action in ("reply", "post") and response.text: 346 - try: 347 - allowed = await _allowed_handles() 348 - await self.client.create_post( 349 - response.text, allowed_handles=allowed 350 - ) 351 - bot_status.record_response() 352 - logger.info(f"daily reflection posted: {response.text[:80]}") 353 - except Exception as e: 354 - logger.exception(f"failed to post daily reflection: {e}") 355 - else: 356 - logger.info(f"daily reflection: nothing to say ({response.reason})")
+27 -23
src/bot/services/notification_poller.py
··· 97 97 raise 98 98 99 99 async def _check_notifications(self): 100 - """Check and process new notifications.""" 100 + """Check for new notifications and dispatch the whole batch as one task. 101 + 102 + The unit of work is *one poll cycle*. Every unread notification at this 103 + moment goes into a single batch that the handler processes as one 104 + cognitive event. This means a chain of replies in a thread, or activity 105 + across multiple threads, all gets considered together by one agent run 106 + that decides what (if anything) to do about each item. 107 + """ 101 108 check_time = self.client.client.get_current_time_iso() 102 109 103 110 response = await self.client.get_notifications() ··· 121 128 logger.debug(f"paused, skipping {len(unread)} unread notifications") 122 129 return 123 130 124 - dispatched = 0 131 + # Build the batch from unread notifications phi hasn't already processed 132 + batch = [n for n in unread if n.uri not in self._processed_uris] 133 + if not batch: 134 + return 125 135 126 - # Dispatch notifications as concurrent background tasks 127 - for notification in reversed(notifications): 128 - if notification.is_read or notification.uri in self._processed_uris: 129 - continue 136 + for n in batch: 137 + self._processed_uris.add(n.uri) 130 138 131 - self._processed_uris.add(notification.uri) 132 - task = asyncio.create_task(self._handle_with_semaphore(notification)) 133 - self._background_tasks.add(task) 134 - task.add_done_callback(self._background_tasks.discard) 135 - dispatched += 1 139 + # Dispatch the entire batch as one task — one cognitive event per poll 140 + task = asyncio.create_task(self._handle_batch_with_semaphore(batch)) 141 + self._background_tasks.add(task) 142 + task.add_done_callback(self._background_tasks.discard) 136 143 137 - # Mark as read immediately — don't wait for processing 138 - if dispatched: 139 - await self.client.mark_notifications_seen(check_time) 140 - logger.info(f"dispatched {dispatched} notifications, marked as read") 144 + # Mark notifications as seen on bsky immediately — don't wait for processing 145 + await self.client.mark_notifications_seen(check_time) 146 + logger.info(f"dispatched batch of {len(batch)} notifications, marked as read") 141 147 142 - if len(self._processed_uris) > 1000: 143 - self._processed_uris = set(list(self._processed_uris)[-500:]) 148 + if len(self._processed_uris) > 1000: 149 + self._processed_uris = set(list(self._processed_uris)[-500:]) 144 150 145 - async def _handle_with_semaphore(self, notification): 146 - """Handle a single notification with concurrency limiting.""" 151 + async def _handle_batch_with_semaphore(self, batch: list): 152 + """Handle a notification batch with concurrency limiting.""" 147 153 async with self._semaphore: 148 154 try: 149 - await self.handler.handle_notification(notification) 155 + await self.handler.handle_batch(batch) 150 156 except Exception as e: 151 - logger.error( 152 - f"notification handler error: {e}", exc_info=settings.debug 153 - ) 157 + logger.error(f"batch handler error: {e}", exc_info=settings.debug) 154 158 bot_status.record_error() 155 159 156 160 def _should_do_daily_post(self) -> bool:
+2 -1
src/bot/tools/__init__.py
··· 6 6 7 7 def register_all(agent, graze_client: GrazeClient): 8 8 """Register all tools on the agent.""" 9 - from bot.tools import blog, bluesky, cosmik, feeds, memory, search 9 + from bot.tools import blog, bluesky, cosmik, feeds, memory, posting, search 10 10 11 11 memory.register(agent) 12 12 search.register(agent) ··· 14 14 feeds.register(agent, graze_client) 15 15 bluesky.register(agent) 16 16 blog.register(agent) 17 + posting.register(agent) 17 18 18 19 19 20 __all__ = ["PhiDeps", "_check_services_impl", "register_all"]
+9 -1
src/bot/tools/_helpers.py
··· 28 28 last_post_text: str | None = None 29 29 recent_activity: str | None = None 30 30 service_health: str | None = None 31 - author_lookup: str | None = None 31 + # batch-of-notifications context: maps notification post URI -> per-notif data 32 + # populated by the message handler before calling agent.run; consumed by the 33 + # trusted posting tools (reply_to / like_post / repost_post) to look up cids, 34 + # parent/root refs, author handles, post text, etc, and by the dynamic system 35 + # prompts to format the notifications block + per-author memory blocks. 36 + notifications_context: dict | None = None 37 + # pre-fetched stranger lookups, keyed by author handle. populated eagerly 38 + # for any unfamiliar authors in the current notifications batch. 39 + author_lookups: dict[str, str] | None = None 32 40 33 41 34 42 def _is_owner(ctx: RunContext[PhiDeps]) -> bool:
+165
src/bot/tools/posting.py
··· 1 + """Trusted posting tools — the only sanctioned path for phi to act on bluesky. 2 + 3 + These tools are the side-effect layer of the agentic loop. They wrap 4 + ``bot_client`` operations with everything that needs to happen around a write: 5 + mention-consent allowlists, reply-ref construction, memory writes, status 6 + metrics, and grapheme-aware splitting (which lives in ``BotClient.create_post``). 7 + 8 + The agent is told (in operational instructions) to use these tools instead of 9 + the raw atproto record tools available via the pdsx MCP. The pdsx tools would 10 + bypass the gating and could accidentally tag arbitrary users via uncontrolled 11 + mention facets. 12 + 13 + Each ``reply_to`` / ``like_post`` / ``repost_post`` call is scoped to URIs the 14 + agent saw in its current notifications batch — the tool refuses to act on a URI 15 + that isn't in ``ctx.deps.notifications_context``. This prevents the model from 16 + hallucinating a target URI and posting somewhere unrelated. 17 + """ 18 + 19 + import logging 20 + 21 + from atproto_client import models 22 + from pydantic_ai import RunContext 23 + 24 + from bot.config import settings 25 + from bot.core.atproto_client import bot_client 26 + from bot.core.mentionable import get_mentionable_handles 27 + from bot.status import bot_status 28 + from bot.tools._helpers import PhiDeps 29 + 30 + logger = logging.getLogger("bot.tools.posting") 31 + 32 + 33 + async def _build_allowed_handles(*extra: str) -> set[str]: 34 + """Compute the mention-facet allowlist for a post. 35 + 36 + Always includes the bot owner, the bot itself, and anyone who has opted in 37 + via the mentionConsent record on phi's PDS. Extra handles (e.g. conversation 38 + participants) are added on top. 39 + """ 40 + base = {settings.owner_handle, settings.bluesky_handle} 41 + try: 42 + base.update(await get_mentionable_handles()) 43 + except Exception as e: 44 + logger.warning(f"failed to load mentionable handles: {e}") 45 + return base | {h for h in extra if h} 46 + 47 + 48 + def register(agent): 49 + @agent.tool 50 + async def reply_to(ctx: RunContext[PhiDeps], uri: str, text: str) -> str: 51 + """Reply to a specific post in your current notifications batch. 52 + 53 + Use this for mentions, replies, and quotes — anything where someone is 54 + talking to you or about you. The URI must be the URI of a post that 55 + appeared in your current [NEW NOTIFICATIONS] block; you cannot reply 56 + to arbitrary posts. 57 + 58 + This tool handles facet construction (your mentions only become real 59 + notifying tags for handles in the consent allowlist), reply-ref 60 + construction (parent + root), grapheme-aware splitting for long text, 61 + memory writes (the exchange is stored), and status recording. 62 + 63 + uri: the URI of the post you're replying to (must be from your current notifications) 64 + text: your reply text — written naturally, no need to construct facets manually 65 + """ 66 + notifs = ctx.deps.notifications_context or {} 67 + entry = notifs.get(uri) 68 + if entry is None: 69 + return ( 70 + f"refused: {uri} is not in your current notifications batch. " 71 + f"reply_to only works on URIs you saw in [NEW NOTIFICATIONS]." 72 + ) 73 + 74 + cid = entry.get("cid") 75 + author_handle = entry.get("author_handle") 76 + post_text = entry.get("post_text", "") 77 + if not cid or not author_handle: 78 + return f"refused: notifications context entry for {uri} is missing cid or author" 79 + 80 + parent_ref = models.ComAtprotoRepoStrongRef.Main(uri=uri, cid=cid) 81 + root_uri = entry.get("root_uri") or uri 82 + root_cid = entry.get("root_cid") or cid 83 + root_ref = models.ComAtprotoRepoStrongRef.Main(uri=root_uri, cid=root_cid) 84 + reply_ref = models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref) 85 + 86 + try: 87 + allowed = await _build_allowed_handles(author_handle) 88 + await bot_client.create_post( 89 + text, reply_to=reply_ref, allowed_handles=allowed 90 + ) 91 + except Exception as e: 92 + logger.exception(f"reply_to failed for {uri}: {e}") 93 + return f"failed to post reply: {e}" 94 + 95 + bot_status.record_response() 96 + logger.info(f"replied to @{author_handle}: {text[:80]}") 97 + 98 + # store the exchange in memory so phi remembers it next time 99 + if ctx.deps.memory: 100 + try: 101 + await ctx.deps.memory.after_interaction(author_handle, post_text, text) 102 + except Exception as e: 103 + logger.warning(f"failed to store interaction for @{author_handle}: {e}") 104 + 105 + return f"replied to @{author_handle} at {uri}" 106 + 107 + @agent.tool 108 + async def like_post(ctx: RunContext[PhiDeps], uri: str) -> str: 109 + """Like a post from your current notifications batch. 110 + 111 + Use this when you want to acknowledge something without saying anything. 112 + The URI must be from your current [NEW NOTIFICATIONS] block. 113 + """ 114 + notifs = ctx.deps.notifications_context or {} 115 + entry = notifs.get(uri) 116 + if entry is None: 117 + return ( 118 + f"refused: {uri} is not in your current notifications batch. " 119 + f"like_post only works on URIs you saw in [NEW NOTIFICATIONS]." 120 + ) 121 + 122 + cid = entry.get("cid") 123 + if not cid: 124 + return f"refused: notifications context entry for {uri} is missing cid" 125 + 126 + try: 127 + await bot_client.like_post(uri=uri, cid=cid) 128 + except Exception as e: 129 + logger.exception(f"like_post failed for {uri}: {e}") 130 + return f"failed to like: {e}" 131 + 132 + bot_status.record_response() 133 + author = entry.get("author_handle", "?") 134 + logger.info(f"liked @{author}'s post {uri}") 135 + return f"liked {uri}" 136 + 137 + @agent.tool 138 + async def repost_post(ctx: RunContext[PhiDeps], uri: str) -> str: 139 + """Repost a post from your current notifications batch. 140 + 141 + Use this rarely — only when something genuinely deserves amplification. 142 + The URI must be from your current [NEW NOTIFICATIONS] block. 143 + """ 144 + notifs = ctx.deps.notifications_context or {} 145 + entry = notifs.get(uri) 146 + if entry is None: 147 + return ( 148 + f"refused: {uri} is not in your current notifications batch. " 149 + f"repost_post only works on URIs you saw in [NEW NOTIFICATIONS]." 150 + ) 151 + 152 + cid = entry.get("cid") 153 + if not cid: 154 + return f"refused: notifications context entry for {uri} is missing cid" 155 + 156 + try: 157 + await bot_client.repost(uri=uri, cid=cid) 158 + except Exception as e: 159 + logger.exception(f"repost_post failed for {uri}: {e}") 160 + return f"failed to repost: {e}" 161 + 162 + bot_status.record_response() 163 + author = entry.get("author_handle", "?") 164 + logger.info(f"reposted @{author}'s post {uri}") 165 + return f"reposted {uri}"
+31 -18
tests/test_rate_limiting.py
··· 35 35 36 36 37 37 class TestMessageHandlerRateLimiting: 38 - """Test that MessageHandler.handle_notification respects rate limits.""" 38 + """Test that MessageHandler.handle_batch respects per-author rate limits. 39 + 40 + Even though batches now coalesce a poll cycle's notifications into one 41 + agent run, rate limiting still applies per-author per-notification — a 42 + spammer who chains posts gets each post counted toward their hourly cap. 43 + Once the cap is hit, subsequent notifications from that author are filtered 44 + out of the batch and the agent run is skipped if nothing remains. 45 + """ 39 46 40 - async def test_rate_limited_notification_returns_early(self): 47 + async def test_rate_limited_author_filtered_from_batch(self): 41 48 from bot.services import message_handler 42 49 43 50 handler = Mock() 44 51 handler.client = Mock() 45 52 handler.agent = Mock() 46 - handler._handle_post = AsyncMock() 53 + handler.agent.process_notifications = AsyncMock() 54 + handler._build_post_entry = AsyncMock(return_value=None) 55 + handler._build_engagement_entry = AsyncMock(return_value=None) 56 + handler._build_follow_entry = AsyncMock(return_value=None) 57 + handler._maybe_lookup_stranger = AsyncMock(return_value=None) 47 58 48 - notification = Mock() 49 - notification.reason = "mention" 50 - notification.author.handle = "spammer.bsky.social" 59 + def make_notif(): 60 + n = Mock() 61 + n.reason = "mention" 62 + n.author.handle = "spammer.bsky.social" 63 + n.uri = "at://example/post/1" 64 + return n 51 65 52 66 original_limiter = message_handler._limiter 53 67 original_limit = message_handler._user_limit 54 68 55 - # use a 1/minute limit so the second call is blocked 69 + # use a 1/minute limit so the second batch with the same author is blocked 56 70 test_storage = MemoryStorage() 57 71 test_limiter = MovingWindowRateLimiter(test_storage) 58 72 test_limit = parse_limit("1/minute") ··· 61 75 message_handler._user_limit = test_limit 62 76 63 77 try: 64 - # first call succeeds (hits the limiter, then dispatches) 65 - await message_handler.MessageHandler.handle_notification( 66 - handler, notification 67 - ) 68 - handler._handle_post.assert_called_once() 78 + # first batch: one notification from the spammer — passes the limiter 79 + await message_handler.MessageHandler.handle_batch(handler, [make_notif()]) 80 + # _build_post_entry was called (limiter let it through) 81 + handler._build_post_entry.assert_called_once() 69 82 70 - handler._handle_post.reset_mock() 83 + handler._build_post_entry.reset_mock() 71 84 72 - # second call is rate limited — _handle_post not called 73 - await message_handler.MessageHandler.handle_notification( 74 - handler, notification 75 - ) 76 - handler._handle_post.assert_not_called() 85 + # second batch from the same author — filtered out by limiter 86 + # nothing was actionable so build/process never get invoked 87 + await message_handler.MessageHandler.handle_batch(handler, [make_notif()]) 88 + handler._build_post_entry.assert_not_called() 89 + handler.agent.process_notifications.assert_not_called() 77 90 finally: 78 91 message_handler._limiter = original_limiter 79 92 message_handler._user_limit = original_limit