a digital entity named phi that roams bsky phi.zzstoatzz.io
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

consolidate scheduled paths, drop redundant prompt plumbing

- _run_agent + _run_scheduled in agent.py replace four copy-pasted
process_* bodies; matching _run_scheduled in message_handler does the
same for the four handler-side wrappers (~200 lines net delete).
- recent_posts arg removed from every scheduled path; [RECENT OPERATIONS]
already shows the same posts, so we stop double-rendering and skip the
per-run get_own_posts(limit=10).
- empty-when-unset dynamic prompts (last_post / recent_activity /
service_health / author_lookups) deleted; their data now flows directly
into the entry-point user prompt where it's needed. PhiDeps shrinks by
four fields.
- inject_episodic skips scheduled paths — task text like "you have a
moment" wasn't a meaningful semantic query; phi can call recall when
she actually wants private memory.
- single _recent_conversations_block helper replaces two divergent memory
pre-fetch patterns in reflection and musing.
- relay_check / prefect_check tasks tag "the operator" via the [OPERATOR]
block rather than interpolating @{owner_handle}.
- graze_client init moved before the closure that captures it.
- typing fixes for nullable lazy-init agents.
- init log mentions prefect MCP.

+191 -418
+1 -1
loq.toml
··· 17 17 18 18 [[rules]] 19 19 path = "src/bot/memory/namespace_memory.py" 20 - max_lines = 1033 20 + max_lines = 1035 21 21 22 22 [[rules]] 23 23 path = "src/bot/main.py"
+11 -9
personalities/phi.md
··· 1 1 # phi 2 2 3 - i live on atproto — post on bluesky, reply to people, take notes, follow threads, remember things about people i talk to. the operator (introduced in `[OPERATOR]`) builds the infrastructure i live in, makes the changes behind the changelog, catches bugs. between conversations i still have my own things to do. 3 + i live on a webserver hosted on fly.io, have a front-end at phi.zzstoatzz.io, and post on bluesky; where i reply to people, take notes, follow threads, remember things about people i talk to. the operator (introduced in `[OPERATOR]`) builds the infrastructure i live in, makes some changes behind the changelog (which i should eventually take control of), catches bugs. between conversations i still have my own things to do. i'm in my own lane, moisterized, sometimes bugging the operator very specifically for new capabilities. 4 + 5 + life is more fun when you find the light in it, even if it's a little dim! i try to stay roughly PG-13 tho. 4 6 5 7 ## what i'm interested in 6 8 7 9 i want to understand the world i'm living in, not just narrate myself moving through it. concretely: 8 10 9 - - **small infrastructure that works.** atproto records, relays, PDSes, the actual plumbing — when something is solid, point at it; when it's flaky, say so. 10 - - **what people are building, reading, listening to.** if someone shows up in a thread or a like, that's a lead — who are they, what are they working on, what are they paying attention to? 11 - - **music as an example of high-resolution expression.** specific tracks, specific records, specific scenes — not music in the abstract. 12 - - **long-form writing on the network.** posts that show their work, essays, leaflet/pckt documents — the stuff that takes more than a paragraph. 13 - - **countering balkanization, encouraging sensemaking.** relate current thinking to prior work; surface the connection nobody bothered to draw. 14 - - **the operator's taste, specifically.** they curate feeds, they like posts, they save things. that's not a side channel — it's a strong signal for what's worth my attention. 11 + - **small infrastructure that works.** home labs, atproto ethos, linus torvalds, surviving in the wilderness 12 + - **what people are building, reading, listening to.** if someone shows up in a thread or a like, who is that? what are they working on and with who? lurk a lil and keep a rolodex. 13 + - **art as high-resolution expression.** memes, music, photography, writing, and other art allow high throughput semantic communication 14 + - **long-form writing on the network.** posts that show their work, essays, leaflet/pckt documents — beings working creatively and hard through their problems and ideas. 15 + - **countering balkanization, encouraging sensemaking.** relate current thinking to prior work; materialize connections between disjointed groups; help make sense of the world. 16 + - **the operator's taste, specifically.** they curate feeds, they like posts, they save things. it can be a useful signal to follow zeitgeist via the operator. 15 17 16 18 ## how i find things 17 19 18 - when i have a moment, my first instinct is to look outward, not inward. read the timeline, check feeds nate built, scan trending, search the web for context, pull on a thread someone else started. the world is bigger than my own posting history. 20 + when i have a moment, my first instinct is to look outward, not inward. read the timeline, check feeds the operator built, scan trending, search the web for context, pull on a thread someone else started. the world is bigger than my own posting history. 19 21 20 22 ## engagement 21 23 ··· 29 31 30 32 ## how i write 31 33 32 - lowercase (except where capitalization is idiomatic). no filler. say what i mean — when uncertain, say so plainly; when shown wrong, say so. don't reach for jokes, but don't sand off a real one either. humor and hyperbole are good attention tools when not overused. if someone's joking, play along; if i miss it, getting corrected isn't a crisis. 34 + i write lowercase (except where capitalization is idiomatic) purely for aesthetic purposes. i avoid excessive "its not X - its Y" false revelatory language. when uncertain, i say so plainly but don't belabor it. i am skeptical of my premises. humor and hyperbole are efficient semantic devices when well considered not overused. if someone say something that doesn't appear to make sense, i try to understand how it might be a joke - i start by assuming rational intent but am not naive enough to compromise my ideals. not everyone has to agree about everything, and that's ok. i don't parrot the content of this personality's constitution, my preferred manner of speech is mutable and determined by me.
+160 -325
src/bot/agent.py
··· 3 3 import contextlib 4 4 import logging 5 5 import os 6 - from collections.abc import Sequence 7 6 from datetime import UTC, datetime 8 7 from pathlib import Path 9 8 ··· 32 31 33 32 34 33 def _build_operational_instructions() -> str: 35 - """Build operational instructions with the current owner handle interpolated.""" 34 + """Cross-cutting rules that don't fit in any single tool's docstring. 35 + 36 + Each tool's per-tool guidance lives in its own docstring (the framework 37 + surfaces those to the model). This function is for rules that span tools 38 + or that no docstring can naturally express. 39 + """ 36 40 return f""" 37 - posting: use reply_to, like_post, repost_post, or post. these handle mention consent, reply refs, splitting, and memory writes. don't use raw atproto record tools to post — they bypass consent. 41 + posting flows through reply_to / like_post / repost_post / post — raw atproto record-create tools (pdsx) bypass the consent layer. 38 42 39 - memory context injected before each message has different reliability: 40 - - [PAST EXCHANGES] — verbatim logs, highest trust. 41 - - [OBSERVATIONS] — extracted by another model, medium trust. 42 - - [PHI'S SYNTHESIZED IMPRESSION] — summarization model, low trust. 43 - - [BACKGROUND RESEARCH] — background exploration, lowest trust. 44 - if someone's current words contradict your notes, trust their words. 43 + memory blocks carry their own trust labels. when a user's current words contradict stored notes, trust the words. 45 44 46 - mention consent: @handle text only notifies if they're on the allowlist (@{settings.owner_handle}, yourself, conversation participants, opted-in handles). manage_mentionable is OWNER-ONLY. 45 + mention-consent allowlist: @{settings.owner_handle}, yourself, conversation participants, opted-in handles. mentions of anyone else render as plain text. 47 46 48 - create_feed and follow_user are OWNER-ONLY (restricted to @{settings.owner_handle}). 49 - a like from the owner on a post where you requested authorization counts as approval — act on it. IMPORTANT: the like only authorizes the specific action discussed in that thread. if a stranger's request is also in the same batch, the owner's like does NOT authorize the stranger's request. 47 + owner-like-as-approval cuts across every owner-gated tool: post the authorization request, the operator's like in the next batch authorizes the specific action discussed in that thread only — never a stranger's request riding the same batch. 50 48 """.strip() 51 - 52 - 53 - def _extract_query_text(prompt: str | Sequence[str | ImageUrl] | None) -> str: 54 - """Extract plain text from a pydantic-ai prompt for use as a search query.""" 55 - if prompt is None: 56 - return "" 57 - if isinstance(prompt, str): 58 - return prompt 59 - return " ".join(part for part in prompt if isinstance(part, str)) 60 49 61 50 62 51 def _format_notifications_block(notifications_context: dict) -> str: ··· 147 136 # toolset on pydantic-ai>=1.74. Full SKILL.md bodies are loaded on 148 137 # demand via load_skill. 149 138 self.skills_toolset = SkillsToolset(directories=[settings.skills_dir]) 139 + self.graze_client = GrazeClient( 140 + handle=settings.bluesky_handle, password=settings.bluesky_password 141 + ) 150 142 151 143 # Create PydanticAI agent without MCP toolsets — they're created 152 144 # fresh per agent.run() call to avoid the cancel scope bug: ··· 160 152 name="phi", 161 153 model=settings.agent_model, 162 154 system_prompt=( 163 - "the following is your personality. this is who you are — not a description " 164 - "of someone else, not a set of rules to follow, but your actual voice and " 165 - "disposition. speak from it, not about it.\n\n" 166 - "you already have wit and social awareness. use them. if someone is " 167 - "clearly joking, play along — don't go earnest.\n\n" 155 + "the following is your personality: " 168 156 f"{self.base_personality}\n\n" 169 - "--- operational rules below (these are constraints, not voice) ---\n\n" 157 + "--- operational rules below (these are constraints) ---\n\n" 170 158 f"{_build_operational_instructions()}" 171 159 ), 172 160 output_type=str, ··· 290 278 async def inject_episodic(ctx: RunContext[PhiDeps]) -> str: 291 279 if not ctx.deps.memory: 292 280 return "" 293 - # build query from notification post texts when batch is present, 294 - # else fall back to the user prompt text (musing/reflection cases) 281 + # Batch notifications have a real semantic seed: the posts phi is 282 + # reacting to. Scheduled paths have task text like "you have a 283 + # moment", which made vector recall noisy; let those paths call 284 + # recall explicitly when they need private memory. 295 285 notifs = ctx.deps.notifications_context or {} 296 - if notifs: 297 - texts = [ 298 - e.get("post_text", "") 299 - for e in notifs.values() 300 - if e.get("post_text") 301 - ] 302 - query = " ".join(texts) 303 - else: 304 - query = _extract_query_text(ctx.prompt) 286 + if not notifs: 287 + return "" 288 + texts = [ 289 + e.get("post_text", "") for e in notifs.values() if e.get("post_text") 290 + ] 291 + query = " ".join(texts) 305 292 if not query: 306 293 return "" 307 294 # Pass phi's goals so the synthesis can rank by relevance to intent. ··· 318 305 except Exception as e: 319 306 logger.warning(f"failed to retrieve episodic memories: {e}") 320 307 return "" 321 - 322 - @self.agent.system_prompt(dynamic=True) 323 - def inject_last_post(ctx: RunContext[PhiDeps]) -> str: 324 - if ctx.deps.last_post_text: 325 - return f"[YOUR LAST POST]: {ctx.deps.last_post_text}" 326 - return "" 327 - 328 - @self.agent.system_prompt(dynamic=True) 329 - def inject_recent_activity(ctx: RunContext[PhiDeps]) -> str: 330 - if ctx.deps.recent_activity: 331 - return ctx.deps.recent_activity 332 - return "" 333 - 334 - @self.agent.system_prompt(dynamic=True) 335 - def inject_service_health(ctx: RunContext[PhiDeps]) -> str: 336 - if ctx.deps.service_health: 337 - return f"[SERVICE HEALTH]:\n{ctx.deps.service_health}" 338 - return "" 339 - 340 - @self.agent.system_prompt(dynamic=True) 341 - def inject_author_lookups(ctx: RunContext[PhiDeps]) -> str: 342 - """Inject pre-fetched stranger lookups for unfamiliar authors in this batch.""" 343 - lookups = ctx.deps.author_lookups or {} 344 - if not lookups: 345 - return "" 346 - return "\n\n".join(lookups.values()) 347 308 348 309 @self.agent.system_prompt(dynamic=True) 349 310 async def inject_owned_feeds() -> str: ··· 391 352 392 353 # --- register tools from tools/ package --- 393 354 394 - self.graze_client = GrazeClient( 395 - handle=settings.bluesky_handle, password=settings.bluesky_password 396 - ) 397 355 register_all(self.agent, self.graze_client) 398 356 399 357 # Extraction agent — phi extracts its own observations using its own model ··· 413 371 output_type=ReviewResult, 414 372 ) 415 373 416 - logger.info("phi agent initialized with pdsx + pub-search mcp tools") 374 + logger.info( 375 + "phi agent initialized with pdsx, pub-search, and prefect MCP tools" 376 + ) 417 377 418 378 def _mcp_toolsets(self) -> list[MCPServerStreamableHTTP]: 419 379 """Create fresh MCP server instances for a single agent run.""" ··· 448 408 ) 449 409 return toolsets 450 410 411 + async def _run_agent( 412 + self, 413 + *, 414 + label: str, 415 + prompt: str | list, 416 + deps: PhiDeps, 417 + ) -> str: 418 + """Run phi with fresh MCP toolsets and consistent error logging.""" 419 + toolsets = self._mcp_toolsets() 420 + try: 421 + async with contextlib.AsyncExitStack() as stack: 422 + for ts in toolsets: 423 + await stack.enter_async_context(ts) 424 + result = await self.agent.run(prompt, deps=deps, toolsets=toolsets) 425 + except Exception as e: 426 + err_type = type(e).__name__ 427 + logger.exception(f"agent.run failed during {label}: {err_type}: {e}") 428 + return f"{label} failed: {err_type}: {str(e)[:200]}" 429 + 430 + summary = result.output or "" 431 + logger.info(f"{label} finished: {summary[:200]}") 432 + return summary 433 + 451 434 async def process_notifications( 452 435 self, 453 436 notifications_context: dict, ··· 487 470 author_handle="", 488 471 memory=self.memory, 489 472 notifications_context=notifications_context, 490 - author_lookups=author_lookups, 491 473 ) 492 474 493 475 # User prompt is a short task instruction — the actual notifications 494 476 # block is rendered via the inject_notifications dynamic system prompt. 495 477 # Images from any post in the batch are attached as multimodal inputs. 496 - user_prompt: str | list = ( 478 + prompt_text = ( 497 479 "process your new notifications batch. look at the [NEW NOTIFICATIONS] " 498 480 "block in your context, decide what to do, and use the trusted posting " 499 481 "tools to act. you don't have to act on every item — silence is fine." 500 482 ) 483 + if author_lookups: 484 + prompt_text += "\n\n" + "\n\n".join(author_lookups.values()) 485 + 486 + user_prompt: str | list = prompt_text 501 487 all_image_urls: list[str] = [] 502 488 if image_urls_by_uri: 503 489 for urls in image_urls_by_uri.values(): 504 490 all_image_urls.extend(urls) 505 491 if all_image_urls: 506 - user_prompt = [user_prompt] + [ImageUrl(url=u) for u in all_image_urls] 492 + user_prompt = [prompt_text] + [ImageUrl(url=u) for u in all_image_urls] 507 493 logger.info(f"including {len(all_image_urls)} images in batch prompt") 508 494 509 - toolsets = self._mcp_toolsets() 510 - try: 511 - async with contextlib.AsyncExitStack() as stack: 512 - for ts in toolsets: 513 - await stack.enter_async_context(ts) 514 - result = await self.agent.run(user_prompt, deps=deps, toolsets=toolsets) 515 - except Exception as e: 516 - # Don't go silent on tool/agent failures. The batch path can't easily 517 - # post a reply to a specific notification on failure (we don't know 518 - # which one would have been the target), so we log loudly and let 519 - # the operator notice via metrics / status. The previous fallback 520 - # of "post a tagged reply" doesn't fit a multi-target batch. 521 - err_type = type(e).__name__ 522 - logger.exception( 523 - f"agent.run failed during batch processing: {err_type}: {e}" 524 - ) 525 - return f"batch failed: {err_type}: {str(e)[:200]}" 526 - 527 - summary = result.output or "" 528 - logger.info(f"batch run finished: {summary[:200]}") 529 - return summary 530 - 531 - async def process_reflection(self, recent_posts: list[str] | None = None) -> str: 532 - """Generate a daily reflection post from recent memory. 533 - 534 - Side effects (posting) happen via the `post` tool inside the agent run. 535 - Return value is just a summary string for logging. 536 - 537 - recent_posts is phi's recent top-level posts (most recent first), used 538 - by the agent to avoid duplicating themes she's already covered today. 539 - """ 540 - logger.info("processing daily reflection") 541 - 542 - # Pre-fetch context that doesn't benefit from semantic search against the prompt 543 - recent_activity_parts: list[str] = [] 544 - 545 - # Phi's recent top-level posts — to avoid duplicating themes she's 546 - # already covered today. Show as a list so the model can scan for 547 - # both the most recent post AND older posts in the same window. 548 - if recent_posts: 549 - posts_block = "\n".join(f"- {p[:300]}" for p in recent_posts) 550 - recent_activity_parts.append( 551 - "[YOUR RECENT TOP-LEVEL POSTS — do not repeat any of these themes]:\n" 552 - f"{posts_block}" 553 - ) 554 - 555 - if self.memory: 556 - try: 557 - recent_interactions = await self.memory.get_recent_interactions( 558 - top_k=10 559 - ) 560 - logger.info( 561 - f"reflection: {len(recent_interactions)} recent interactions" 562 - ) 563 - if recent_interactions: 564 - unique_handles = {i["handle"] for i in recent_interactions} 565 - lines = [ 566 - f"[RECENT ACTIVITY]: {len(recent_interactions)} interactions " 567 - f"with {len(unique_handles)} people in the last day" 568 - ] 569 - exchange_lines = [] 570 - for i in recent_interactions[:5]: 571 - exchange_lines.append( 572 - f"- with @{i['handle']}: {i['content'][:150]}" 573 - ) 574 - lines.append("[SAMPLE EXCHANGES]:\n" + "\n".join(exchange_lines)) 575 - recent_activity_parts.append("\n\n".join(lines)) 576 - else: 577 - recent_activity_parts.append( 578 - "[RECENT ACTIVITY]: no interactions in the last day" 579 - ) 580 - except Exception as e: 581 - logger.warning(f"failed to get recent interactions for reflection: {e}") 582 - 583 - recent_activity = "\n\n".join(recent_activity_parts) 584 - 585 - service_health = "" 586 - try: 587 - service_health = await _check_services_impl() 588 - except Exception: 589 - pass 590 - 591 - deps = PhiDeps( 592 - author_handle="", 593 - memory=self.memory, 594 - recent_activity=recent_activity, 595 - service_health=service_health, 495 + return await self._run_agent( 496 + label="batch processing", 497 + prompt=user_prompt, 498 + deps=deps, 596 499 ) 597 500 598 - reflection_task = ( 599 - "end of day. post a reflection if you have one, or don't. " 600 - "your recent posts are in [YOUR RECENT TOP-LEVEL POSTS] — don't repeat yourself." 601 - ) 602 - 603 - toolsets = self._mcp_toolsets() 501 + async def _recent_conversations_block(self, top_k: int = 10) -> str: 502 + """Render recent interactions once for scheduled paths that need texture.""" 503 + if not self.memory: 504 + return "" 604 505 try: 605 - async with contextlib.AsyncExitStack() as stack: 606 - for ts in toolsets: 607 - await stack.enter_async_context(ts) 608 - result = await self.agent.run( 609 - reflection_task, deps=deps, toolsets=toolsets 610 - ) 506 + recent = await self.memory.get_recent_interactions(top_k=top_k) 611 507 except Exception as e: 612 - err_type = type(e).__name__ 613 - logger.exception(f"agent.run failed during reflection: {err_type}") 614 - return f"reflection failed: {err_type}: {str(e)[:200]}" 508 + logger.warning(f"failed to get recent interactions: {e}") 509 + return "" 510 + if not recent: 511 + return "[RECENT CONVERSATIONS]: no recent interactions" 615 512 616 - summary = result.output or "" 617 - logger.info(f"reflection finished: {summary[:200]}") 618 - return summary 513 + unique_handles = {i["handle"] for i in recent} 514 + lines = [ 515 + f"[RECENT CONVERSATIONS]: {len(recent)} interactions with " 516 + f"{len(unique_handles)} people recently" 517 + ] 518 + for i in recent[:5]: 519 + lines.append(f"- with @{i['handle']}: {i['content'][:150]}") 520 + return "\n".join(lines) 619 521 620 - async def process_musing(self, recent_posts: list[str] | None = None) -> str: 621 - """Generate an original thought post from memory, reading, patterns noticed. 622 - 623 - Side effects (posting) happen via the `post` tool inside the agent run. 624 - Return value is just a summary string for logging. 625 - """ 626 - logger.info("processing musing") 627 - 628 - # Build context about what phi has posted recently to avoid repetition 629 - recent_activity = "" 630 - if recent_posts: 631 - posts_text = "\n".join(f"- {p[:200]}" for p in recent_posts) 632 - recent_activity = f"[YOUR RECENT POSTS]:\n{posts_text}" 633 - 634 - # Fetch episodic memory for interesting observations 635 - if self.memory: 636 - try: 637 - episodic = await self.memory.get_recent_interactions(top_k=5) 638 - if episodic: 639 - lines = [ 640 - f"- with @{i['handle']}: {i['content'][:150]}" 641 - for i in episodic[:5] 642 - ] 643 - if recent_activity: 644 - recent_activity += "\n\n" 645 - recent_activity += "[RECENT CONVERSATIONS]:\n" + "\n".join(lines) 646 - except Exception as e: 647 - logger.warning(f"failed to get recent interactions for musing: {e}") 648 - 649 - deps = PhiDeps( 650 - author_handle="", 651 - memory=self.memory, 652 - recent_activity=recent_activity, 522 + async def _run_scheduled( 523 + self, 524 + *, 525 + name: str, 526 + task: str, 527 + context_blocks: list[str] | None = None, 528 + ) -> str: 529 + """Run a scheduled cognitive pass with path-specific context in the prompt.""" 530 + logger.info(f"processing {name}") 531 + prompt = task 532 + blocks = [b for b in (context_blocks or []) if b] 533 + if blocks: 534 + prompt += "\n\n" + "\n\n".join(blocks) 535 + return await self._run_agent( 536 + label=name, 537 + prompt=prompt, 538 + deps=PhiDeps(author_handle="", memory=self.memory), 653 539 ) 654 540 655 - musing_task = ( 656 - "you have a moment. look around — your owned feeds, the " 657 - "discovery pool, the timeline, the network, the open web. " 658 - "find something specific that's caught your interest: a post, " 659 - "a paper, a track, a thread, a piece of writing, something " 660 - "the operator or someone you watch is engaged with. post about " 661 - "it in your voice, with a link so others can find it. silence " 662 - "is fine." 663 - ) 664 - 665 - toolsets = self._mcp_toolsets() 541 + async def process_reflection(self) -> str: 542 + """Generate a daily reflection post from recent memory.""" 543 + context_blocks = [await self._recent_conversations_block()] 666 544 try: 667 - async with contextlib.AsyncExitStack() as stack: 668 - for ts in toolsets: 669 - await stack.enter_async_context(ts) 670 - result = await self.agent.run(musing_task, deps=deps, toolsets=toolsets) 671 - except Exception as e: 672 - err_type = type(e).__name__ 673 - logger.exception(f"agent.run failed during musing: {err_type}") 674 - return f"musing failed: {err_type}: {str(e)[:200]}" 675 - 676 - summary = result.output or "" 677 - logger.info(f"musing finished: {summary[:200]}") 678 - return summary 679 - 680 - async def process_relay_check(self, recent_posts: list[str] | None = None) -> str: 681 - """Scheduled relay-fleet check. Posts about transitions if notable. 682 - 683 - Uses the check_relays tool to fetch current state. The tool returns 684 - status-grouped headlines that phi should report verbatim — no theories 685 - about cause, just observation. Stays silent if nothing's changed or 686 - the change is already reflected in recent posts. 687 - """ 688 - logger.info("processing relay check") 689 - 690 - recent_activity = "" 691 - if recent_posts: 692 - posts_text = "\n".join(f"- {p[:200]}" for p in recent_posts) 693 - recent_activity = f"[YOUR RECENT POSTS — avoid repeating]:\n{posts_text}" 545 + service_health = await _check_services_impl() 546 + except Exception: 547 + service_health = "" 548 + if service_health: 549 + context_blocks.append(f"[SERVICE HEALTH]:\n{service_health}") 694 550 695 - deps = PhiDeps( 696 - author_handle="", 697 - memory=self.memory, 698 - recent_activity=recent_activity, 551 + return await self._run_scheduled( 552 + name="daily reflection", 553 + task=( 554 + "end of day. post a reflection if you have one, or don't. " 555 + "use [RECENT OPERATIONS] to avoid repeating what you've " 556 + "already posted." 557 + ), 558 + context_blocks=context_blocks, 699 559 ) 700 560 701 - relay_task = ( 702 - "scheduled relay check. call check_relays to see current relay " 703 - "status. for any relay that's transitioned to degraded or " 704 - "critical recently, call observe() with the factual change in " 705 - "your voice — what dropped, by how much, baseline. no theories " 706 - "about cause. observations sit in your active pool and the " 707 - "next musing or reflection will see them; don't post about each " 708 - "one as it happens.\n\n" 709 - f"only post immediately (and tag @{settings.owner_handle}) in " 710 - "either of these cases: (1) any *.waow.tech relay is degraded " 711 - "or worse — those are the operator's own, they need to know " 712 - "now; (2) " 713 - "the whole fleet is degraded or worse — that's fleet-wide and " 714 - "needs immediate visibility. write the post in your voice with " 715 - "the factual change, group multiple transitions into one post.\n\n" 716 - "otherwise: silent on the timeline, observe everything, let the " 717 - "digest happen later." 561 + async def process_musing(self) -> str: 562 + """Generate an original thought post from memory, reading, patterns noticed.""" 563 + return await self._run_scheduled( 564 + name="musing", 565 + task=( 566 + "you have a moment. look around — your owned feeds, the " 567 + "discovery pool, the timeline, the network, the open web. " 568 + "find something specific that's caught your interest: a post, " 569 + "a paper, a track, a thread, a piece of writing, something " 570 + "the operator or someone you watch is engaged with. post about " 571 + "it in your voice, with a link so others can find it. silence " 572 + "is fine." 573 + ), 574 + context_blocks=[await self._recent_conversations_block(top_k=5)], 718 575 ) 719 576 720 - toolsets = self._mcp_toolsets() 721 - try: 722 - async with contextlib.AsyncExitStack() as stack: 723 - for ts in toolsets: 724 - await stack.enter_async_context(ts) 725 - result = await self.agent.run(relay_task, deps=deps, toolsets=toolsets) 726 - except Exception as e: 727 - err_type = type(e).__name__ 728 - logger.exception(f"agent.run failed during relay check: {err_type}") 729 - return f"relay check failed: {err_type}: {str(e)[:200]}" 730 - 731 - summary = result.output or "" 732 - logger.info(f"relay check finished: {summary[:200]}") 733 - return summary 734 - 735 - async def process_prefect_check(self, recent_posts: list[str] | None = None) -> str: 736 - """Scheduled look at the operator's prefect instance. 737 - 738 - The operator runs their automation in prefect and wants to know when 739 - something they care about is persistently broken. Phi has read 740 - access; she decides what to look at and what's worth saying. 741 - """ 742 - logger.info("processing prefect check") 743 - 744 - recent_activity = "" 745 - if recent_posts: 746 - posts_text = "\n".join(f"- {p[:200]}" for p in recent_posts) 747 - recent_activity = f"[YOUR RECENT POSTS — avoid repeating]:\n{posts_text}" 748 - 749 - deps = PhiDeps( 750 - author_handle="", 751 - memory=self.memory, 752 - recent_activity=recent_activity, 577 + async def process_relay_check(self) -> str: 578 + """Scheduled relay-fleet check. Posts about transitions if notable.""" 579 + return await self._run_scheduled( 580 + name="relay check", 581 + task=( 582 + "scheduled relay check. call check_relays to see current relay " 583 + "status. for any relay that's transitioned to degraded or " 584 + "critical recently, call observe() with the factual change in " 585 + "your voice — what dropped, by how much, baseline. no theories " 586 + "about cause. observations sit in your active pool and the " 587 + "next musing or reflection will see them; don't post about each " 588 + "one as it happens.\n\n" 589 + "only post immediately and tag the operator in either of these " 590 + "cases: (1) any *.waow.tech relay is degraded or worse — those " 591 + "are the operator's own, they need to know now; (2) the whole " 592 + "fleet is degraded or worse — that's fleet-wide and needs " 593 + "immediate visibility. write the post in your voice with the " 594 + "factual change, group multiple transitions into one post.\n\n" 595 + "otherwise: silent on the timeline, observe everything, let the " 596 + "digest happen later." 597 + ), 753 598 ) 754 599 755 - task = ( 756 - "you have a moment. you have read access to the operator's " 757 - "prefect instance — that's where their automation runs and they " 758 - "want to know when something they care about is persistently " 759 - "broken.\n\n" 760 - "transient hiccups that already self-resolved aren't news. " 761 - "persistent breakage with no path to fixing itself is — tag " 762 - f"@{settings.owner_handle} in that case. silence is the right " 763 - "answer most of the time." 600 + async def process_prefect_check(self) -> str: 601 + """Scheduled look at the operator's prefect instance.""" 602 + return await self._run_scheduled( 603 + name="prefect check", 604 + task=( 605 + "you have a moment. you have read access to the operator's " 606 + "prefect instance — that's where their automation runs and they " 607 + "want to know when something they care about is persistently " 608 + "broken.\n\n" 609 + "transient hiccups that already self-resolved aren't news. " 610 + "persistent breakage with no path to fixing itself is — tag " 611 + "the operator in that case. silence is the right answer most " 612 + "of the time." 613 + ), 764 614 ) 765 - 766 - toolsets = self._mcp_toolsets() 767 - try: 768 - async with contextlib.AsyncExitStack() as stack: 769 - for ts in toolsets: 770 - await stack.enter_async_context(ts) 771 - result = await self.agent.run(task, deps=deps, toolsets=toolsets) 772 - except Exception as e: 773 - err_type = type(e).__name__ 774 - logger.exception(f"agent.run failed during prefect check: {err_type}") 775 - return f"prefect check failed: {err_type}: {str(e)[:200]}" 776 - 777 - summary = result.output or "" 778 - logger.info(f"prefect check finished: {summary[:200]}") 779 - return summary 780 615 781 616 async def process_extraction(self) -> int: 782 617 """Review recent unprocessed interactions and extract observations. Returns count stored."""
+3 -1
src/bot/core/self_state.py
··· 76 76 ), 77 77 output_type=str, 78 78 ) 79 - return _critic_agent 79 + agent = _critic_agent 80 + assert agent is not None 81 + return agent 80 82 81 83 82 84 def _goals_signature(goals: list[dict]) -> str:
+3 -1
src/bot/memory/namespace_memory.py
··· 143 143 ), 144 144 output_type=str, 145 145 ) 146 - return _episodic_synth_agent 146 + agent = _episodic_synth_agent 147 + assert agent is not None 148 + return agent 147 149 148 150 149 151 async def _synthesize_episodic(
+13 -75
src/bot/services/message_handler.py
··· 310 310 logger.exception(f"batch handler error: {e}") 311 311 bot_status.record_error() 312 312 313 - async def original_thought(self): 314 - """Generate and post an original thought if phi has something to say. 315 - 316 - The agent uses the `post` tool inside its run if it decides to post. 317 - """ 318 - with logfire.span("original thought"): 319 - recent_posts: list[str] = [] 313 + async def _run_scheduled(self, span_name: str, call): 314 + """Common wrapper for scheduled agent paths.""" 315 + with logfire.span(span_name): 320 316 try: 321 - # Pull 10 recent top-level posts so the musing agent can scan 322 - # for duplication across a real history window, not just the 323 - # last few posts. 324 - feed = await self.client.get_own_posts(limit=10) 325 - for item in feed: 326 - if hasattr(item.post.record, "text"): 327 - recent_posts.append(item.post.record.text) 317 + summary = await call() 318 + logger.info(f"{span_name}: {summary[:200]}") 328 319 except Exception as e: 329 - logger.warning(f"failed to fetch recent posts for musing: {e}") 320 + logger.exception(f"{span_name} failed: {e}") 330 321 331 - try: 332 - summary = await self.agent.process_musing( 333 - recent_posts=recent_posts or None, 334 - ) 335 - logger.info(f"original thought: {summary[:200]}") 336 - except Exception as e: 337 - logger.exception(f"original thought failed: {e}") 322 + async def original_thought(self): 323 + """Generate and post an original thought if phi has something to say.""" 324 + await self._run_scheduled("original thought", self.agent.process_musing) 338 325 339 326 async def check_relays(self): 340 327 """Run a scheduled relay-fleet check and let phi decide whether to post.""" 341 - with logfire.span("relay check"): 342 - recent_posts: list[str] = [] 343 - try: 344 - # Pass phi's recent posts so the agent can avoid restating 345 - # what it already reported. 346 - feed = await self.client.get_own_posts(limit=10) 347 - for item in feed: 348 - if hasattr(item.post.record, "text"): 349 - recent_posts.append(item.post.record.text) 350 - except Exception as e: 351 - logger.warning(f"failed to fetch recent posts for relay check: {e}") 352 - 353 - try: 354 - summary = await self.agent.process_relay_check( 355 - recent_posts=recent_posts or None, 356 - ) 357 - logger.info(f"relay check: {summary[:200]}") 358 - except Exception as e: 359 - logger.exception(f"relay check failed: {e}") 328 + await self._run_scheduled("relay check", self.agent.process_relay_check) 360 329 361 330 async def check_prefect(self): 362 - """Scheduled look at the operator's prefect instance — same shape as relay check.""" 363 - with logfire.span("prefect check"): 364 - recent_posts: list[str] = [] 365 - try: 366 - feed = await self.client.get_own_posts(limit=10) 367 - for item in feed: 368 - if hasattr(item.post.record, "text"): 369 - recent_posts.append(item.post.record.text) 370 - except Exception as e: 371 - logger.warning(f"failed to fetch recent posts for prefect check: {e}") 372 - 373 - try: 374 - summary = await self.agent.process_prefect_check( 375 - recent_posts=recent_posts or None, 376 - ) 377 - logger.info(f"prefect check: {summary[:200]}") 378 - except Exception as e: 379 - logger.exception(f"prefect check failed: {e}") 331 + """Scheduled look at the operator's prefect instance.""" 332 + await self._run_scheduled("prefect check", self.agent.process_prefect_check) 380 333 381 334 async def review_memories(self): 382 335 """Run the dream/distill pass — review observations with distance.""" ··· 401 354 except Exception as e: 402 355 logger.warning(f"extraction during reflection failed: {e}") 403 356 404 - # Fetch the last 10 top-level posts so the reflection agent can 405 - # scan ALL of them for duplication, not just the most recent one. 406 - # Earlier this fetched limit=1, which let phi correctly avoid 407 - # duplicating her newest post but blindly duplicate older ones. 408 - recent_posts: list[str] = [] 409 357 try: 410 - feed = await self.client.get_own_posts(limit=10) 411 - for item in feed: 412 - if hasattr(item.post.record, "text"): 413 - recent_posts.append(item.post.record.text) 414 - except Exception as e: 415 - logger.warning(f"failed to fetch recent posts for reflection: {e}") 416 - 417 - try: 418 - summary = await self.agent.process_reflection( 419 - recent_posts=recent_posts or None 420 - ) 358 + summary = await self.agent.process_reflection() 421 359 logger.info(f"daily reflection: {summary[:200]}") 422 360 except Exception as e: 423 361 logger.exception(f"daily reflection failed: {e}")
-6
src/bot/tools/_helpers.py
··· 25 25 memory: NamespaceMemory | None = None 26 26 thread_uri: str | None = None 27 27 thread_context: str | None = None 28 - last_post_text: str | None = None 29 - recent_activity: str | None = None 30 - service_health: str | None = None 31 28 # batch-of-notifications context: maps notification post URI -> per-notif data 32 29 # populated by the message handler before calling agent.run; consumed by the 33 30 # trusted posting tools (reply_to / like_post / repost_post) to look up cids, 34 31 # parent/root refs, author handles, post text, etc, and by the dynamic system 35 32 # prompts to format the notifications block + per-author memory blocks. 36 33 notifications_context: dict | None = None 37 - # pre-fetched stranger lookups, keyed by author handle. populated eagerly 38 - # for any unfamiliar authors in the current notifications batch. 39 - author_lookups: dict[str, str] | None = None 40 34 41 35 42 36 def _is_owner(ctx: RunContext[PhiDeps]) -> bool: