a digital entity named phi that roams bsky phi.zzstoatzz.io
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

phi monitors prefect flow runs — same shape as relay checks

phi now watches the prefect-server via the prefect MCP on an hourly cadence
and notifies nate when a flow fails persistently. closes the loop that
made the last 6 days of broken ingest silent: a single failure wouldn't
have triggered a notification, but the second one would have.

architecture (mirrors relay watching):
- prefect MCP: https://prefect-by-zzstoatzz.fastmcp.app/mcp, same per-request
header auth pattern pdsx uses (x-prefect-api-url +
x-prefect-api-auth-string)
- tool_prefix="prefect" so tools become prefect_get_flow_runs,
prefect_get_flow_run_logs, etc — 13 tools from the MCP
- scheduled entry point process_flow_check, cadence ~1h (more time-
sensitive than relay's 3h)
- de-dup via the active observations pool: first failure → silent
observe(); same flow failing again → post + tag nate. one-off blips
don't wake anyone up; persistent problems do.

files:
- config: prefect_mcp_url, prefect_api_url, prefect_api_auth_string (from
fly secret), flow_check_interval_polls=360 (~1h at 10s poll interval)
- agent.py _mcp_toolsets: appends prefect MCP when auth is configured
(graceful degrade for dev/local without the secret)
- agent.py process_flow_check: new entry point, task prompt tells phi
to check [ACTIVE OBSERVATIONS] first and escalate on repeat
- message_handler.check_flows: thin wrapper that passes recent posts
- notification_poller: _polls_since_last_flow_check counter, _should_
check_flows / _maybe_check_flows gate (scheduled independently from
relay checks)

fly secret PREFECT_API_AUTH_STRING already set, pulled from the k8s
prefect-auth secret so both consumers share auth. 102 tests pass.

Co-Authored-By: Claude Opus 4 (1M context) <noreply@anthropic.com>

+148 -2
+72 -1
src/bot/agent.py
··· 395 395 396 396 def _mcp_toolsets(self) -> list[MCPServerStreamableHTTP]: 397 397 """Create fresh MCP server instances for a single agent run.""" 398 - return [ 398 + toolsets: list[MCPServerStreamableHTTP] = [ 399 399 MCPServerStreamableHTTP( 400 400 url="https://pdsx-by-zzstoatzz.fastmcp.app/mcp", 401 401 timeout=30, ··· 410 410 tool_prefix="pub", 411 411 ), 412 412 ] 413 + # Prefect MCP — only included when auth is configured, so phi degrades 414 + # gracefully in dev/local without the secret set. 415 + if settings.prefect_api_auth_string: 416 + toolsets.append( 417 + MCPServerStreamableHTTP( 418 + url=settings.prefect_mcp_url, 419 + timeout=30, 420 + tool_prefix="prefect", 421 + headers={ 422 + "x-prefect-api-url": settings.prefect_api_url, 423 + "x-prefect-api-auth-string": settings.prefect_api_auth_string, 424 + }, 425 + ) 426 + ) 427 + return toolsets 413 428 414 429 async def process_notifications( 415 430 self, ··· 690 705 691 706 summary = result.output or "" 692 707 logger.info(f"relay check finished: {summary[:200]}") 708 + return summary 709 + 710 + async def process_flow_check(self, recent_posts: list[str] | None = None) -> str: 711 + """Scheduled prefect flow check. Same pattern as relay check. 712 + 713 + Uses the prefect MCP tools to see recent flow run states. Observes 714 + failures into the active attention pool on first sight; only posts + 715 + tags @owner when a given flow has failed repeatedly. The active- 716 + observations block in phi's prompt naturally does the de-dup: first 717 + failure = silently noticed; same flow failing again = persistent 718 + problem worth raising. 719 + """ 720 + logger.info("processing flow check") 721 + 722 + recent_activity = "" 723 + if recent_posts: 724 + posts_text = "\n".join(f"- {p[:200]}" for p in recent_posts) 725 + recent_activity = f"[YOUR RECENT POSTS — avoid repeating]:\n{posts_text}" 726 + 727 + deps = PhiDeps( 728 + author_handle="", 729 + memory=self.memory, 730 + recent_activity=recent_activity, 731 + ) 732 + 733 + flow_task = ( 734 + "scheduled prefect flow check. call prefect_get_flow_runs with " 735 + "a filter for state.type in [FAILED, CRASHED] and start_time " 736 + "after the last hour or so, to see what's broken recently.\n\n" 737 + "for each failed flow you see, check your [ACTIVE OBSERVATIONS] " 738 + "block first. if you already have an active observation for the " 739 + "same flow failing, that means it's failed at least twice in a " 740 + "row — post in your voice with the flow name, failure count, " 741 + f"and a short snippet of the last error, and tag @{settings.owner_handle}. " 742 + "pull the error with prefect_get_flow_run_logs if you need it.\n\n" 743 + "if the failure is NEW (not already in your active observations), " 744 + "just call observe() with the flow name + state + the error " 745 + "message. don't post yet — one-off failures happen. the next " 746 + "flow check will see it in your active pool and know to escalate " 747 + "if it happens again.\n\n" 748 + "if nothing's failing, silent is fine. nothing to do." 749 + ) 750 + 751 + toolsets = self._mcp_toolsets() 752 + try: 753 + async with contextlib.AsyncExitStack() as stack: 754 + for ts in toolsets: 755 + await stack.enter_async_context(ts) 756 + result = await self.agent.run(flow_task, deps=deps, toolsets=toolsets) 757 + except Exception as e: 758 + err_type = type(e).__name__ 759 + logger.exception(f"agent.run failed during flow check: {err_type}") 760 + return f"flow check failed: {err_type}: {str(e)[:200]}" 761 + 762 + summary = result.output or "" 763 + logger.info(f"flow check finished: {summary[:200]}") 693 764 return summary 694 765 695 766 async def process_extraction(self) -> int:
+23
src/bot/config.py
··· 145 145 description="Min polls between scheduled relay checks (~3h at default poll interval)", 146 146 ) 147 147 148 + # Prefect flow monitoring — phi polls the prefect-server via the prefect 149 + # MCP to notice failed/crashed flows (ingest, brief, compact, etc.) and 150 + # flag persistent failures to the operator. Same pattern as relays. 151 + prefect_mcp_url: str = Field( 152 + default="https://prefect-by-zzstoatzz.fastmcp.app/mcp", 153 + description="URL of the prefect MCP server (fastmcp.app deployment)", 154 + ) 155 + prefect_api_url: str = Field( 156 + default="https://prefect-server.waow.tech/api", 157 + description="Prefect OSS API URL (passed to MCP via x-prefect-api-url header)", 158 + ) 159 + prefect_api_auth_string: str | None = Field( 160 + default=None, 161 + description=( 162 + "Basic auth string 'user:pass' for prefect OSS. Passed to MCP via " 163 + "x-prefect-api-auth-string header. Set via fly secret." 164 + ), 165 + ) 166 + flow_check_interval_polls: int = Field( 167 + default=360, # 360 polls * 10s = 3600s = 1h 168 + description="Min polls between scheduled prefect flow checks (~1h)", 169 + ) 170 + 148 171 # Discovery pool — generic agents endpoint serving authors the operator 149 172 # has been liking. Currently lives on hub.waow.tech as part of the 150 173 # prefect-server side; consumers (phi here) read it as opaque JSON.
+20
src/bot/services/message_handler.py
··· 358 358 except Exception as e: 359 359 logger.exception(f"relay check failed: {e}") 360 360 361 + async def check_flows(self): 362 + """Run a scheduled prefect flow check — same shape as relay check.""" 363 + with logfire.span("flow check"): 364 + recent_posts: list[str] = [] 365 + try: 366 + feed = await self.client.get_own_posts(limit=10) 367 + for item in feed: 368 + if hasattr(item.post.record, "text"): 369 + recent_posts.append(item.post.record.text) 370 + except Exception as e: 371 + logger.warning(f"failed to fetch recent posts for flow check: {e}") 372 + 373 + try: 374 + summary = await self.agent.process_flow_check( 375 + recent_posts=recent_posts or None, 376 + ) 377 + logger.info(f"flow check: {summary[:200]}") 378 + except Exception as e: 379 + logger.exception(f"flow check failed: {e}") 380 + 361 381 async def review_memories(self): 362 382 """Run the dream/distill pass — review observations with distance.""" 363 383 with logfire.span("memory review"):
+33 -1
src/bot/services/notification_poller.py
··· 32 32 self._last_thought_date: date | None = None 33 33 self._semaphore = asyncio.Semaphore(MAX_CONCURRENT) 34 34 self._background_tasks: set[asyncio.Task] = set() 35 - # scheduled monitor check state 35 + # scheduled monitor check state (relay + prefect flows are 36 + # independently scheduled — different cadences, different sources) 36 37 self._polls_since_last_monitor_check: int = 0 38 + self._polls_since_last_flow_check: int = 0 37 39 38 40 async def start(self) -> asyncio.Task: 39 41 """Start polling for notifications.""" ··· 124 126 125 127 while self._running: 126 128 self._polls_since_last_monitor_check += 1 129 + self._polls_since_last_flow_check += 1 127 130 128 131 try: 129 132 await self._check_notifications() ··· 156 159 task.add_done_callback(self._background_tasks.discard) 157 160 except Exception as e: 158 161 logger.error(f"monitor check error: {e}", exc_info=settings.debug) 162 + 163 + try: 164 + if self._should_check_flows(): 165 + task = asyncio.create_task(self._maybe_check_flows()) 166 + self._background_tasks.add(task) 167 + task.add_done_callback(self._background_tasks.discard) 168 + except Exception as e: 169 + logger.error(f"flow check error: {e}", exc_info=settings.debug) 159 170 160 171 try: 161 172 await asyncio.sleep(settings.notification_poll_interval) ··· 310 321 await self.handler.check_relays() 311 322 except Exception as e: 312 323 logger.error(f"monitor check error: {e}", exc_info=settings.debug) 324 + 325 + # --- scheduled prefect flow check --- 326 + 327 + def _should_check_flows(self) -> bool: 328 + """Check if it's time for a scheduled prefect flow check.""" 329 + if bot_status.paused: 330 + return False 331 + if not settings.prefect_api_auth_string: 332 + return False # no creds, no check 333 + if self._polls_since_last_flow_check < settings.flow_check_interval_polls: 334 + return False 335 + return True 336 + 337 + async def _maybe_check_flows(self): 338 + """Run a scheduled prefect flow check.""" 339 + self._polls_since_last_flow_check = 0 340 + logger.info("triggering flow check") 341 + try: 342 + await self.handler.check_flows() 343 + except Exception as e: 344 + logger.error(f"flow check error: {e}", exc_info=settings.debug)