a digital entity named phi that roams bsky phi.zzstoatzz.io
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

add active observations — phi's small attention pool, sits next to GOALS

phi has needed a substrate for "things noticed but not yet acted on" —
relay transitions, thread directions phi wants to come back to, patterns
across posts. before this, the only options were post-immediately (the
relay-check pattern, which produced robotic top-level alerts) or write to
episodic memory (which is global and gets buried under everything else).

active observations:
- bounded attention pool, max 5
- stored as PDS records under io.zzstoatzz.phi.observation (parallel to
io.zzstoatzz.phi.goal)
- shown in every prompt as [ACTIVE OBSERVATIONS] block, sits next to
[GOALS]
- mutated via observe(content, reasoning) and drop_observation(rkey, reason)
- when count exceeds 5, oldest auto-archives with reason "aged out"
- archive lives in turbopuffer namespace phi-observations — embeddings +
content + reasoning + archival_reason, searchable later via tools
(future) but not in the prompt

relay-check task rewires:
- before: "post the headline verbatim" for any transition → robotic alerts
- after: observe() each transition; only post immediately if waow.tech or
fleet-wide (the actual high-signal cases), in phi's voice, grouped if
multiple. otherwise silent on timeline; the next musing or reflection
will see the active observations and decide whether to surface them.

generalizes: any future scheduled noticer (feed-watcher, follow-graph
monitor, publication-watcher) feeds the same pool. the digest happens
naturally because observations are visible in every subsequent run.

new files:
- bot/src/bot/core/observations.py — list_active, record_observation
(auto-archives oldest on overflow), drop_observation
- bot/src/bot/tools/observations.py — observe() and drop_observation()
tools

wired:
- bot/src/bot/agent.py — inject_active_observations dynamic system prompt;
rewrote process_relay_check task
- bot/src/bot/tools/__init__.py — registered observations tools
- bot/docs/system-prompt.md — added [ACTIVE OBSERVATIONS] row

102 tests pass, ruff clean.

Co-Authored-By: Claude Opus 4 (1M context) <noreply@anthropic.com>

+343 -9
+1
docs/system-prompt.md
··· 13 13 | **`[NOW]`** | `inject_today` | every run | current UTC timestamp | 14 14 | **`[KNOWN RELAYS]`** | `inject_known_relays` → `tools.bluesky.fetch_relay_names()` (5min TTL) | every 5min | exact relay hostnames for `check_relays(name=...)` so the LLM picks valid values | 15 15 | **`[GOALS]`** | `get_state_block` → PDS `io.zzstoatzz.phi.goal` (5min block cache) | every 5min | phi's anchors. mutated via `propose_goal_change` (owner-gated) | 16 + | **`[ACTIVE OBSERVATIONS]`** | `inject_active_observations` → PDS `io.zzstoatzz.phi.observation` | every run | phi's small attention pool (max 5). things noticed, not yet acted on. mutated via `observe` / `drop_observation`. older items age out into a turbopuffer archive (`phi-observations` namespace) — searchable later but not in prompt | 16 17 | **`[STRANGER'S AUDIT]`** | `get_state_block` → haiku pass over recent posts + goals (1h cache, invalidated by new post) | when posts change or 1h elapses | a fresh observer's critique — patterns to push against, drift from goals, jargon a stranger wouldn't follow | 17 18 | **`[SELF STATE]`** | `get_state_block` → PDS reads (5min) | every 5min | last-follow age (more pointers can be added here as needed) | 18 19 | **`[RECENT OPERATIONS]`** | `get_operations_block` → `list_records` per meaningful collection, merged by rkey desc (5min cache) | every 5min | last 10 PDS writes across collections (post / like / follow / goal / cosmik card / cosmik connection / greengale doc), chronological. continuity signal — phi sees what it's actually been doing |
+36 -8
src/bot/agent.py
··· 16 16 from bot.core.discovery_pool import get_discovery_pool_block 17 17 from bot.core.goals import list_goals as list_goal_records 18 18 from bot.core.graze_client import GrazeClient 19 + from bot.core.observations import list_active as list_active_observations 19 20 from bot.core.recent_operations import get_operations_block 20 21 from bot.core.self_state import get_state_block 21 22 from bot.memory.extraction import EXTRACTION_SYSTEM_PROMPT, ExtractionResult ··· 23 24 from bot.memory.review import REVIEW_SYSTEM_PROMPT, ReviewResult 24 25 from bot.tools import PhiDeps, _check_services_impl, register_all 25 26 from bot.tools.bluesky import fetch_relay_names 27 + from bot.utils.time import relative_when 26 28 27 29 logger = logging.getLogger("bot.agent") 28 30 ··· 193 195 async def inject_self_state() -> str: 194 196 """How phi looks from outside + canonical pointers (last follow, queue).""" 195 197 return await get_state_block(bot_client) 198 + 199 + @self.agent.system_prompt(dynamic=True) 200 + async def inject_active_observations() -> str: 201 + """[ACTIVE OBSERVATIONS] — phi's small attention pool, sits next to GOALS.""" 202 + rows = await list_active_observations(bot_client) 203 + if not rows: 204 + return "" 205 + lines = [ 206 + "[ACTIVE OBSERVATIONS — stored at io.zzstoatzz.phi.observation on " 207 + "your PDS — things you've seen and not yet acted on, kept small " 208 + "(max 5) and rotating. mutate via observe / drop_observation. " 209 + "older items age out into a searchable archive (not in prompt).]" 210 + ] 211 + for r in rows: 212 + age = relative_when(r["created_at"]) if r["created_at"] else "" 213 + age_part = f" ({age})" if age else "" 214 + lines.append(f"- [rkey={r['rkey']}] {r['content']}{age_part}") 215 + if r["reasoning"]: 216 + lines.append(f" reasoning: {r['reasoning']}") 217 + return "\n".join(lines) 196 218 197 219 @self.agent.system_prompt(dynamic=True) 198 220 async def inject_recent_operations() -> str: ··· 639 661 640 662 relay_task = ( 641 663 "scheduled relay check. call check_relays to see current relay " 642 - "status. if a relay has transitioned to critical or degraded " 643 - "recently, post the headline verbatim. silence is fine if " 644 - "everything's nominal or you've already posted about the current " 645 - f"state. tag @{settings.owner_handle} in either of these cases: " 646 - "(1) any relay under *.waow.tech is degraded or worse — those are " 647 - "nate's own, he needs to know immediately; (2) every relay in the " 648 - "fleet is degraded or worse — that's fleet-wide and nate needs to " 649 - "know. otherwise, no tag." 664 + "status. for any relay that's transitioned to degraded or " 665 + "critical recently, call observe() with the factual change in " 666 + "your voice — what dropped, by how much, baseline. no theories " 667 + "about cause. observations sit in your active pool and the " 668 + "next musing or reflection will see them; don't post about each " 669 + "one as it happens.\n\n" 670 + f"only post immediately (and tag @{settings.owner_handle}) in " 671 + "either of these cases: (1) any *.waow.tech relay is degraded " 672 + "or worse — those are nate's own, he needs to know now; (2) " 673 + "the whole fleet is degraded or worse — that's fleet-wide and " 674 + "needs immediate visibility. write the post in your voice with " 675 + "the factual change, group multiple transitions into one post.\n\n" 676 + "otherwise: silent on the timeline, observe everything, let the " 677 + "digest happen later." 650 678 ) 651 679 652 680 toolsets = self._mcp_toolsets()
+195
src/bot/core/observations.py
··· 1 + """Phi's active observations — small, rotating attention pool. 2 + 3 + Each observation is a record under `io.zzstoatzz.phi.observation` on phi's 4 + PDS. The pool is bounded (ACTIVE_CAP); when phi records a new observation 5 + beyond the cap, the oldest is archived to a turbopuffer namespace 6 + (`phi-observations`) where it stays searchable but no longer appears in the 7 + prompt. 8 + 9 + Two paths leave the active pool: 10 + - aged out: count exceeds cap, oldest goes to archive with reason 11 + - explicit drop: phi calls drop_observation(rkey, reason) 12 + 13 + Both archive to the same namespace with `archival_reason` recording why. 14 + The archive is append-only and not surfaced by default; a future tool can 15 + search it for "did i already think about this last week." 16 + 17 + Reasoning is optional but encouraged at write time — what made phi notice 18 + this, what might compose with it, why it's worth attention. 19 + """ 20 + 21 + from __future__ import annotations 22 + 23 + import logging 24 + from datetime import UTC, datetime 25 + from typing import TYPE_CHECKING, TypedDict 26 + 27 + from bot.core.atproto_client import BotClient 28 + 29 + if TYPE_CHECKING: 30 + from bot.memory import NamespaceMemory 31 + 32 + logger = logging.getLogger("bot.observations") 33 + 34 + OBSERVATION_COLLECTION = "io.zzstoatzz.phi.observation" 35 + ARCHIVE_NAMESPACE = "phi-observations" 36 + ACTIVE_CAP = 5 37 + 38 + 39 + class ObservationRecord(TypedDict): 40 + rkey: str 41 + content: str 42 + reasoning: str 43 + created_at: str 44 + 45 + 46 + async def list_active(client: BotClient) -> list[ObservationRecord]: 47 + """List all active observation records, oldest first. 48 + 49 + Sorted by rkey ascending — TIDs are time-ordered so this is creation 50 + order. Used both for prompt rendering and for the archive-on-overflow 51 + decision (oldest goes to archive). 52 + """ 53 + await client.authenticate() 54 + if not client.client.me: 55 + return [] 56 + try: 57 + response = client.client.com.atproto.repo.list_records( 58 + { 59 + "repo": client.client.me.did, 60 + "collection": OBSERVATION_COLLECTION, 61 + "limit": 50, 62 + } 63 + ) 64 + except Exception as e: 65 + logger.debug(f"list observations failed: {e}") 66 + return [] 67 + 68 + rows: list[ObservationRecord] = [] 69 + for rec in response.records or []: 70 + value = dict(rec.value) if rec.value else {} 71 + rows.append( 72 + ObservationRecord( 73 + rkey=rec.uri.split("/")[-1], 74 + content=value.get("content", ""), 75 + reasoning=value.get("reasoning", "") or "", 76 + created_at=value.get("created_at", ""), 77 + ) 78 + ) 79 + rows.sort(key=lambda r: r["rkey"]) 80 + return rows 81 + 82 + 83 + async def _archive( 84 + memory: NamespaceMemory | None, 85 + record: ObservationRecord, 86 + archival_reason: str, 87 + ) -> None: 88 + """Move an observation row into the archive namespace. 89 + 90 + Embedded by content. If memory is unavailable (no turbopuffer), the 91 + record is dropped silently — the active-pool deletion still happens 92 + in the caller, so we just lose the historical trace. 93 + """ 94 + if memory is None: 95 + logger.debug("archive: no memory client, skipping archive write") 96 + return 97 + try: 98 + ns = memory.client.namespace(ARCHIVE_NAMESPACE) 99 + embedding = await memory._get_embedding(record["content"]) 100 + ns.write( 101 + upsert_rows=[ 102 + { 103 + "id": record["rkey"], 104 + "vector": embedding, 105 + "content": record["content"], 106 + "reasoning": record["reasoning"], 107 + "archival_reason": archival_reason, 108 + "created_at": record["created_at"], 109 + "archived_at": datetime.now(UTC).isoformat(), 110 + } 111 + ], 112 + distance_metric="cosine_distance", 113 + schema={ 114 + "content": {"type": "string", "full_text_search": True}, 115 + "reasoning": {"type": "string"}, 116 + "archival_reason": {"type": "string", "filterable": True}, 117 + "created_at": {"type": "string"}, 118 + "archived_at": {"type": "string"}, 119 + }, 120 + ) 121 + except Exception as e: 122 + logger.warning(f"archive write failed for {record['rkey']}: {e}") 123 + 124 + 125 + async def _delete_record(client: BotClient, rkey: str) -> None: 126 + assert client.client.me is not None 127 + try: 128 + client.client.com.atproto.repo.delete_record( 129 + data={ 130 + "repo": client.client.me.did, 131 + "collection": OBSERVATION_COLLECTION, 132 + "rkey": rkey, 133 + } 134 + ) 135 + except Exception as e: 136 + logger.warning(f"delete observation {rkey} failed: {e}") 137 + 138 + 139 + async def record_observation( 140 + client: BotClient, 141 + memory: NamespaceMemory | None, 142 + content: str, 143 + reasoning: str = "", 144 + ) -> str: 145 + """Create a new active observation. Returns the record's AT-URI. 146 + 147 + If the active count exceeds ACTIVE_CAP after this write, the oldest 148 + observation is archived (reason: 'aged out') and removed from the 149 + active pool — keeping the prompt block bounded. 150 + """ 151 + await client.authenticate() 152 + assert client.client.me is not None 153 + now = datetime.now(UTC).isoformat() 154 + record = { 155 + "content": content, 156 + "reasoning": reasoning, 157 + "created_at": now, 158 + } 159 + result = client.client.com.atproto.repo.create_record( 160 + data={ 161 + "repo": client.client.me.did, 162 + "collection": OBSERVATION_COLLECTION, 163 + "record": record, 164 + } 165 + ) 166 + 167 + # roll off oldest if we exceeded the cap 168 + active = await list_active(client) 169 + overflow = len(active) - ACTIVE_CAP 170 + for stale in active[:overflow] if overflow > 0 else []: 171 + await _archive(memory, stale, archival_reason="aged out") 172 + await _delete_record(client, stale["rkey"]) 173 + logger.info(f"observation aged out: {stale['rkey']} ({stale['content'][:60]})") 174 + 175 + return result.uri 176 + 177 + 178 + async def drop_observation( 179 + client: BotClient, 180 + memory: NamespaceMemory | None, 181 + rkey: str, 182 + reason: str, 183 + ) -> bool: 184 + """Explicitly remove an active observation. Archived with reason. 185 + 186 + Returns True if the record existed and was removed, False if not found. 187 + """ 188 + active = await list_active(client) 189 + target = next((r for r in active if r["rkey"] == rkey), None) 190 + if target is None: 191 + return False 192 + await _archive(memory, target, archival_reason=reason or "dropped") 193 + await _delete_record(client, rkey) 194 + logger.info(f"observation dropped: {rkey} ({reason})") 195 + return True
+12 -1
src/bot/tools/__init__.py
··· 6 6 7 7 def register_all(agent, graze_client: GrazeClient): 8 8 """Register all tools on the agent.""" 9 - from bot.tools import blog, bluesky, cosmik, feeds, goals, memory, posting, search 9 + from bot.tools import ( 10 + blog, 11 + bluesky, 12 + cosmik, 13 + feeds, 14 + goals, 15 + memory, 16 + observations, 17 + posting, 18 + search, 19 + ) 10 20 11 21 memory.register(agent) 12 22 search.register(agent) ··· 15 25 bluesky.register(agent) 16 26 blog.register(agent) 17 27 goals.register(agent) 28 + observations.register(agent) 18 29 posting.register(agent) 19 30 20 31
+99
src/bot/tools/observations.py
··· 1 + """Active-observation tools — record + drop. 2 + 3 + Active observations are phi's small attention pool, surfaced in the prompt 4 + next to GOALS. Phi adds via observe; removes via drop_observation. Aging 5 + is automatic when the cap is exceeded. 6 + """ 7 + 8 + from typing import Annotated 9 + 10 + from pydantic import Field 11 + from pydantic_ai import RunContext 12 + 13 + from bot.core import observations 14 + from bot.core.atproto_client import bot_client 15 + from bot.tools._helpers import PhiDeps 16 + 17 + 18 + def register(agent): 19 + @agent.tool 20 + async def observe( 21 + ctx: RunContext[PhiDeps], 22 + content: Annotated[ 23 + str, 24 + Field( 25 + description=( 26 + "What you noticed, in your own voice. Stays factual — " 27 + "the change observed, not theories about cause." 28 + ) 29 + ), 30 + ], 31 + reasoning: Annotated[ 32 + str, 33 + Field( 34 + description=( 35 + "Optional: why this is worth keeping in active attention " 36 + "— what might compose with it, what it shifts, why it " 37 + "caught your eye. Empty is fine." 38 + ) 39 + ), 40 + ] = "", 41 + ) -> str: 42 + """Record an active observation in your attention pool. 43 + 44 + Use when you've noticed something worth holding but not yet acting on 45 + — a relay transition, a thread direction, a person showing up 46 + repeatedly, a pattern across posts. The observation stays visible to 47 + you in the [ACTIVE OBSERVATIONS] block of every subsequent run, so 48 + you can naturally weave it into a later musing or reply. 49 + 50 + The pool is bounded (5 active). Older observations age out into a 51 + searchable archive — they're not lost, just no longer in your 52 + working set. 53 + """ 54 + try: 55 + uri = await observations.record_observation( 56 + bot_client, ctx.deps.memory, content, reasoning 57 + ) 58 + return f"observed: {uri}" 59 + except Exception as e: 60 + return f"failed to record observation: {e}" 61 + 62 + @agent.tool 63 + async def drop_observation( 64 + ctx: RunContext[PhiDeps], 65 + rkey: Annotated[ 66 + str, 67 + Field( 68 + description=( 69 + "The rkey of the active observation to drop. Get rkeys " 70 + "from the [ACTIVE OBSERVATIONS] block in your prompt." 71 + ) 72 + ), 73 + ], 74 + reason: Annotated[ 75 + str, 76 + Field( 77 + description=( 78 + "Why you're dropping it — posted about it / no longer " 79 + "relevant / decided not to surface / etc. Goes into the " 80 + "archive for later introspection." 81 + ) 82 + ), 83 + ], 84 + ) -> str: 85 + """Explicitly remove an observation from your active pool. 86 + 87 + Use when you've acted on an observation (posted about it, brought it 88 + into a reply) or decided it's not worth carrying. The observation is 89 + archived with your reason — searchable later, not deleted. 90 + """ 91 + try: 92 + ok = await observations.drop_observation( 93 + bot_client, ctx.deps.memory, rkey, reason 94 + ) 95 + if not ok: 96 + return f"no active observation with rkey {rkey}" 97 + return f"dropped {rkey} ({reason})" 98 + except Exception as e: 99 + return f"failed to drop observation: {e}"