a digital entity named phi that roams bsky phi.zzstoatzz.io
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

synthesize [WORKFLOW STATE] for prefect check; hoist cargo TYPE_CHECKING imports

- New core/workflow_state.py mirrors the episodic-synth pattern: pull
recent flow runs + stuck candidates from prefect REST, run a haiku
pass anchored by [NOW], and inject a per-deployment health summary
([WORKFLOW STATE]) into process_prefect_check. Phi no longer has to
aggregate timestamps in her head — the synth has already done it,
with current state defined relative to NOW.
- Two queries: recent activity (limit 100, COMPLETED/FAILED/etc) and
stuck candidates (PENDING/RUNNING with expected_start more than 1h
ago). The second catches things like a flow run stuck in Submitting
for 40 days that fall off any sane recent-activity window.
- Block named [WORKFLOW STATE] (not [PREFECT STATE]) — the workflow
tool is fungible, the surface phi sees should reflect that.
- 5min cache, same shape as the other dynamic state blocks.

While here: hoisted three TYPE_CHECKING-guarded imports of NamespaceMemory
(observations.py, discovery_pool.py, self_state.py) to direct imports.
bot.memory doesn't import from bot.core (no circular risk) and bot/agent.py
already imports from bot.memory unguarded, so the SDK weight is loaded
either way. The guards were cargo. from __future__ import annotations
removed from the same files where it had no other use.

+244 -27
+13 -9
src/bot/agent.py
··· 20 20 from bot.core.owned_feeds import get_owned_feeds_block 21 21 from bot.core.recent_operations import get_operations_block 22 22 from bot.core.self_state import get_state_block 23 + from bot.core.workflow_state import get_workflow_state_block 23 24 from bot.memory.extraction import EXTRACTION_SYSTEM_PROMPT, ExtractionResult 24 25 from bot.memory.namespace_memory import InteractionRow 25 26 from bot.memory.review import REVIEW_SYSTEM_PROMPT, ReviewResult ··· 598 599 ) 599 600 600 601 async def process_prefect_check(self) -> str: 601 - """Scheduled look at the operator's prefect instance.""" 602 + """Scheduled look at the operator's workflow automation. 603 + 604 + [WORKFLOW STATE] is pre-synthesized from raw run history so phi 605 + starts from a correct per-deployment health summary anchored to 606 + [NOW]. She can still call prefect_* tools for detail. 607 + """ 608 + workflow_block = await get_workflow_state_block() 602 609 return await self._run_scheduled( 603 610 name="prefect check", 604 611 task=( 605 - "you have a moment. you have read access to the operator's " 606 - "prefect instance — that's where their automation runs and they " 607 - "want to know when something they care about is persistently " 608 - "broken.\n\n" 609 - "transient hiccups that already self-resolved aren't news. " 610 - "persistent breakage with no path to fixing itself is — tag " 611 - "the operator in that case. silence is the right answer most " 612 - "of the time." 612 + "review [WORKFLOW STATE]. anything currently broken or stuck " 613 + "with no path to fixing itself? tag the operator in that " 614 + "case. for detail you can call the prefect_* tools. silence " 615 + "is the right answer most of the time." 613 616 ), 617 + context_blocks=[workflow_block] if workflow_block else None, 614 618 ) 615 619 616 620 async def process_extraction(self) -> int:
+2 -6
src/bot/core/discovery_pool.py
··· 10 10 from fetch+filter so a future templating swap only touches `_render`. 11 11 """ 12 12 13 - from __future__ import annotations 14 - 15 13 import logging 16 14 import time 17 - from typing import TYPE_CHECKING, TypedDict 15 + from typing import TypedDict 18 16 19 17 import httpx 20 18 21 19 from bot.config import settings 22 - 23 - if TYPE_CHECKING: 24 - from bot.memory import NamespaceMemory 20 + from bot.memory import NamespaceMemory 25 21 26 22 logger = logging.getLogger("bot.discovery_pool") 27 23
+2 -6
src/bot/core/observations.py
··· 18 18 this, what might compose with it, why it's worth attention. 19 19 """ 20 20 21 - from __future__ import annotations 22 - 23 21 import logging 24 22 from datetime import UTC, datetime 25 - from typing import TYPE_CHECKING, TypedDict 23 + from typing import TypedDict 26 24 27 25 from bot.core.atproto_client import BotClient 28 - 29 - if TYPE_CHECKING: 30 - from bot.memory import NamespaceMemory 26 + from bot.memory import NamespaceMemory 31 27 32 28 logger = logging.getLogger("bot.observations") 33 29
+1 -6
src/bot/core/self_state.py
··· 13 13 don't hammer PDS. 14 14 """ 15 15 16 - from __future__ import annotations 17 - 18 16 import logging 19 17 import time 20 - from typing import TYPE_CHECKING 21 18 22 19 from pydantic_ai import Agent 23 20 24 21 from bot.config import settings 25 22 from bot.core.atproto_client import BotClient 26 23 from bot.core.goals import list_goals as list_goal_records 24 + from bot.memory import NamespaceMemory 27 25 from bot.utils.time import relative_when 28 - 29 - if TYPE_CHECKING: 30 - from bot.memory import NamespaceMemory 31 26 32 27 logger = logging.getLogger("bot.self_state") 33 28
+226
src/bot/core/workflow_state.py
··· 1 + """[WORKFLOW STATE] — synthesized current state of the operator's workflow automation. 2 + 3 + Phi has access to raw flow run history via MCP, but reasoning about 4 + temporal currency from a 30-row table was inconsistent — sometimes she 5 + correctly identified resolved chains, sometimes she pattern-matched on 6 + "long failure history = persistent problem" and re-flagged things that 7 + had self-resolved hours ago. 8 + 9 + This pre-fetches recent flow runs + deployments, runs them through a 10 + small synth agent anchored by [NOW], and returns one line per 11 + deployment: its current health, grounded in timestamps relative to now. 12 + The synth does the temporal aggregation so phi doesn't have to. 13 + 14 + Mirrors the [RELEVANT MEMORIES] pattern: raw retrieval → small synth → 15 + coherent block. Phi can still call the prefect_* tools for detail; this 16 + gives her a correct starting picture. 17 + 18 + The naming is deliberately abstract — the workflow tool happens to be 19 + prefect today; tomorrow it could be anything else with the same surface. 20 + """ 21 + 22 + import logging 23 + import time 24 + from datetime import UTC, datetime, timedelta 25 + from typing import Any 26 + 27 + import httpx 28 + from pydantic_ai import Agent 29 + 30 + from bot.config import settings 31 + 32 + logger = logging.getLogger("bot.workflow_state") 33 + 34 + _TTL_SECONDS = 300 # 5min 35 + _cache: dict[str, Any] = {"text": "", "fetched_at": 0.0} 36 + 37 + _synth_agent: Agent | None = None 38 + 39 + 40 + def _get_synth_agent() -> Agent: 41 + global _synth_agent 42 + if _synth_agent is None: 43 + _synth_agent = Agent[None, str]( 44 + name="phi-workflow-synth", 45 + model=settings.extraction_model, 46 + system_prompt=( 47 + "You're synthesizing the current state of the operator's " 48 + "workflow automation for phi to read. You'll see [NOW] and " 49 + "the recent flow runs grouped by deployment.\n\n" 50 + "For each deployment with activity in the data, output one " 51 + "line:\n" 52 + " - <deployment-name>: <healthy|broken|stuck|degraded>. " 53 + "<one short clause grounding it in actual timestamps vs NOW>\n\n" 54 + "Definitions, anchored by NOW:\n" 55 + "- healthy: the most recent run for this deployment " 56 + "completed successfully. earlier failures, if any, are " 57 + "historical.\n" 58 + "- broken: the most recent run failed AND no later run has " 59 + "succeeded. currently unresolved.\n" 60 + "- stuck: a run has been Pending/Submitting/Running far " 61 + "longer than the deployment's typical duration.\n" 62 + "- degraded: a meaningful fraction of recent runs are " 63 + "failing while others succeed.\n\n" 64 + "Resolved incidents are not current state. Don't surface " 65 + "them unless they happened in the last hour. When you cite " 66 + 'time, cite it relative to NOW ("resolved 30h ago", ' 67 + '"failing for 5d") rather than absolute dates.\n\n' 68 + "Plain ASCII, lowercase, terse. No headers, no preamble — " 69 + "just the per-deployment lines." 70 + ), 71 + output_type=str, 72 + ) 73 + agent = _synth_agent 74 + assert agent is not None 75 + return agent 76 + 77 + 78 + def _basic_auth() -> tuple[str, str] | None: 79 + """Parse PREFECT_API_AUTH_STRING into (user, pass) for httpx basic auth.""" 80 + raw = settings.prefect_api_auth_string 81 + if not raw or ":" not in raw: 82 + return None 83 + user, _, pwd = raw.partition(":") 84 + return user, pwd 85 + 86 + 87 + async def _fetch_raw() -> dict[str, Any] | None: 88 + """Pull recent flow runs + deployments from the prefect REST API.""" 89 + auth = _basic_auth() 90 + if not auth: 91 + return None 92 + base = settings.prefect_api_url.rstrip("/") 93 + 94 + # Stuck candidates: PENDING/RUNNING runs whose expected start was 95 + # more than an hour ago. They may have been started days ago and 96 + # would fall out of the recent-activity window. The 1h floor avoids 97 + # flagging legitimately-running short jobs. 98 + stuck_cutoff = ( 99 + (datetime.now(UTC) - timedelta(hours=1)) 100 + .replace(microsecond=0) 101 + .isoformat() 102 + .replace("+00:00", "Z") 103 + ) 104 + 105 + async with httpx.AsyncClient(timeout=15, auth=auth) as client: 106 + try: 107 + # Past-activity states only — SCHEDULED rows are future-pending 108 + # placeholders that drown out the actual signal. 109 + runs_resp = await client.post( 110 + f"{base}/flow_runs/filter", 111 + json={ 112 + "limit": 100, 113 + "sort": "START_TIME_DESC", 114 + "flow_runs": { 115 + "state": { 116 + "type": { 117 + "any_": [ 118 + "COMPLETED", 119 + "FAILED", 120 + "CRASHED", 121 + "RUNNING", 122 + "CANCELLED", 123 + ] 124 + } 125 + } 126 + }, 127 + }, 128 + ) 129 + runs_resp.raise_for_status() 130 + runs = runs_resp.json() 131 + except Exception as e: 132 + logger.debug(f"workflow_state: failed to fetch runs: {e}") 133 + return None 134 + 135 + try: 136 + stuck_resp = await client.post( 137 + f"{base}/flow_runs/filter", 138 + json={ 139 + "limit": 20, 140 + "sort": "START_TIME_ASC", 141 + "flow_runs": { 142 + "state": {"type": {"any_": ["PENDING", "RUNNING"]}}, 143 + "expected_start_time": {"before_": stuck_cutoff}, 144 + }, 145 + }, 146 + ) 147 + stuck_resp.raise_for_status() 148 + stuck = stuck_resp.json() 149 + except Exception as e: 150 + logger.debug(f"workflow_state: failed to fetch stuck candidates: {e}") 151 + stuck = [] 152 + 153 + try: 154 + deps_resp = await client.post( 155 + f"{base}/deployments/filter", 156 + json={"limit": 100}, 157 + ) 158 + deps_resp.raise_for_status() 159 + deployments = deps_resp.json() 160 + except Exception as e: 161 + logger.debug(f"workflow_state: failed to fetch deployments: {e}") 162 + deployments = [] 163 + 164 + return {"runs": runs, "stuck": stuck, "deployments": deployments} 165 + 166 + 167 + async def get_workflow_state_block() -> str: 168 + """Compose [WORKFLOW STATE] — per-deployment health, anchored by NOW.""" 169 + now = time.time() 170 + if _cache["text"] and now - _cache["fetched_at"] < _TTL_SECONDS: 171 + return _cache["text"] 172 + 173 + raw = await _fetch_raw() 174 + if not raw: 175 + return "" 176 + 177 + runs = raw.get("runs") or [] 178 + stuck = raw.get("stuck") or [] 179 + deployments = raw.get("deployments") or [] 180 + if not runs and not stuck: 181 + return "" 182 + 183 + dep_names = {d["id"]: d.get("name", "?") for d in deployments} 184 + now_iso = datetime.now(UTC).strftime("%Y-%m-%d %H:%M UTC") 185 + 186 + def _line(r: dict) -> str: 187 + dep = dep_names.get(r.get("deployment_id"), "<no-deployment>") 188 + state_type = r.get("state_type") or r.get("state", {}).get("type", "?") 189 + state_name = r.get("state_name") or r.get("state", {}).get("name", "") 190 + state = f"{state_type}/{state_name}" if state_name else state_type 191 + name = r.get("name", "?") 192 + start = r.get("start_time") or r.get("expected_start_time", "") 193 + end = r.get("end_time", "") 194 + return f"- {dep} | run={name} | state={state} | start={start} | end={end}" 195 + 196 + sections = [f"[NOW]: {now_iso}"] 197 + if runs: 198 + sections.append( 199 + "recent flow runs (most recent first, max 100):\n" 200 + + "\n".join(_line(r) for r in runs) 201 + ) 202 + if stuck: 203 + sections.append( 204 + "stuck candidates (PENDING/RUNNING with expected_start more than now):\n" 205 + + "\n".join(_line(r) for r in stuck) 206 + ) 207 + payload = "\n\n".join(sections) 208 + 209 + try: 210 + result = await _get_synth_agent().run(payload) 211 + text = (result.output or "").strip() 212 + except Exception as e: 213 + logger.warning(f"workflow state synth failed: {e}") 214 + return "" 215 + 216 + if not text: 217 + return "" 218 + 219 + block = ( 220 + "[WORKFLOW STATE — synthesized current health of the operator's " 221 + f"workflow automation, refreshed every {_TTL_SECONDS // 60}min, " 222 + f"anchored by [NOW]. for detail call the prefect_* tools.]\n{text}" 223 + ) 224 + _cache["text"] = block 225 + _cache["fetched_at"] = now 226 + return block