a digital entity named phi that roams bsky phi.zzstoatzz.io
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

add autonomous exploration loop

phi can now research people and topics in the background. when a new
handle interacts with phi, an explore_handle item gets queued on phi's
PDS. on a schedule (default 16:00 and 20:00 UTC), phi claims one item,
runs a research pass with its MCP tools, and writes findings either to
the user's namespace as exploration_note (lowest trust tier) or to
episodic memory.

queue lifecycle is explicit: pending -> in_progress -> completed | failed.
crashes mark items as failed instead of silently dropping them. dedup
blocks both pending and in_progress items by kind+subject.

self-reinforcing loops are guarded: _maybe_enqueue_exploration counts
both observations and exploration_notes toward the threshold, so once
phi has explored someone it won't re-enqueue them on every interaction.

trust tier docs updated with tier 5 (background research, lowest trust).

+511 -4
+51
lexicons/io/zzstoatzz/phi/curiosityQueue.json
··· 1 + { 2 + "lexicon": 1, 3 + "id": "io.zzstoatzz.phi.curiosityQueue", 4 + "defs": { 5 + "main": { 6 + "type": "record", 7 + "description": "A work item for phi's background exploration queue. Each record is one thing to research.", 8 + "key": "tid", 9 + "record": { 10 + "type": "object", 11 + "required": ["kind", "subject", "source", "status", "createdAt"], 12 + "properties": { 13 + "kind": { 14 + "type": "string", 15 + "description": "Type of exploration: explore_handle, explore_topic, or explore_url.", 16 + "knownValues": ["explore_handle", "explore_topic", "explore_url"] 17 + }, 18 + "subject": { 19 + "type": "string", 20 + "description": "The handle, topic, or URL to explore.", 21 + "maxGraphemes": 500 22 + }, 23 + "source": { 24 + "type": "string", 25 + "description": "How this item was queued: interaction, extraction, or operator.", 26 + "knownValues": ["interaction", "extraction", "operator"] 27 + }, 28 + "status": { 29 + "type": "string", 30 + "description": "Current status of the queue item.", 31 + "knownValues": ["pending", "in_progress", "completed", "failed"] 32 + }, 33 + "sourceUri": { 34 + "type": "string", 35 + "description": "Optional AT-URI of the interaction that triggered this item." 36 + }, 37 + "createdAt": { 38 + "type": "string", 39 + "format": "datetime", 40 + "description": "When the item was queued." 41 + }, 42 + "updatedAt": { 43 + "type": "string", 44 + "format": "datetime", 45 + "description": "When the item was last updated." 46 + } 47 + } 48 + } 49 + } 50 + } 51 + }
+1 -1
loq.toml
··· 17 17 18 18 [[rules]] 19 19 path = "src/bot/memory/namespace_memory.py" 20 - max_lines = 872 20 + max_lines = 888 21 21 22 22 [[rules]] 23 23 path = "src/bot/main.py"
+100 -1
src/bot/agent.py
··· 13 13 14 14 from bot.config import settings 15 15 from bot.core.graze_client import GrazeClient 16 + from bot.exploration import EXPLORATION_SYSTEM_PROMPT, ExplorationResult 16 17 from bot.memory.extraction import EXTRACTION_SYSTEM_PROMPT, ExtractionResult 17 18 from bot.tools import PhiDeps, _check_services_impl, register_all 18 19 ··· 35 36 1. [CORE IDENTITY AND GUIDELINES] — your stable identity. highest trust. 36 37 2. [PAST EXCHANGES] — verbatim logs of what was actually said. high trust. 37 38 3. [OBSERVATIONS] — facts extracted from users' own words by another model. medium trust — extraction can misattribute. 38 - 4. [PHI'S SYNTHESIZED IMPRESSION] — generated by a separate summarization model. lowest trust — may contain hallucinations. 39 + 4. [PHI'S SYNTHESIZED IMPRESSION] — generated by a separate summarization model. low trust — may contain hallucinations. 40 + 5. [BACKGROUND RESEARCH] — you explored their public activity during downtime. lowest trust — may be incomplete or misattributed. never assert these as fact. 39 41 40 42 when recalling facts about a user: 41 43 - if the user's current message contradicts your notes, trust their current words. ··· 215 217 model=settings.agent_model, 216 218 system_prompt=f"{self.base_personality}\n\n{EXTRACTION_SYSTEM_PROMPT}", 217 219 output_type=ExtractionResult, 220 + ) 221 + 222 + # Exploration agent — background research on people/topics 223 + self._exploration_agent = Agent[None, ExplorationResult]( 224 + name="phi-explorer", 225 + model=settings.agent_model, 226 + system_prompt=f"{self.base_personality}\n\n{EXPLORATION_SYSTEM_PROMPT}", 227 + output_type=ExplorationResult, 218 228 ) 219 229 220 230 logger.info("phi agent initialized with pdsx + pub-search mcp tools") ··· 449 459 logger.warning(f"extraction failed for @{handle}: {e}") 450 460 451 461 return total_stored 462 + 463 + async def process_exploration(self) -> int: 464 + """Claim one curiosity item, explore it, store findings. Returns count stored.""" 465 + from bot.core.curiosity_queue import claim, complete, enqueue, fail 466 + 467 + claimed = await claim() 468 + if not claimed: 469 + return 0 470 + 471 + item, rkey = claimed 472 + kind = item.get("kind", "") 473 + subject = item.get("subject", "") 474 + logger.info(f"exploring: {kind} {subject}") 475 + 476 + # build prompt by kind 477 + if kind == "explore_handle": 478 + prompt = ( 479 + f"learn about @{subject} — check their profile, recent posts, " 480 + f"and any publications. what are they interested in? what do they work on?" 481 + ) 482 + elif kind == "explore_topic": 483 + prompt = ( 484 + f"research this topic: {subject} — search posts, publications, " 485 + f"and trending content. what's interesting or notable?" 486 + ) 487 + elif kind == "explore_url": 488 + prompt = f"read this URL and note what's interesting: {subject}" 489 + else: 490 + logger.warning(f"unknown exploration kind: {kind}") 491 + await fail(rkey) 492 + return 0 493 + 494 + # run exploration agent with MCP toolsets (pdsx + pub-search) 495 + toolsets = self._mcp_toolsets() 496 + try: 497 + async with contextlib.AsyncExitStack() as stack: 498 + for ts in toolsets: 499 + await stack.enter_async_context(ts) 500 + result = await self._exploration_agent.run(prompt, toolsets=toolsets) 501 + except Exception as e: 502 + logger.warning(f"exploration agent failed for {kind} {subject}: {e}") 503 + await fail(rkey) 504 + return 0 505 + 506 + output = result.output 507 + logger.info(f"exploration result: {output.summary}") 508 + 509 + total_stored = 0 510 + 511 + # store findings 512 + if self.memory: 513 + for finding in output.findings: 514 + try: 515 + if finding.target_handle: 516 + await self.memory.store_exploration_note( 517 + handle=finding.target_handle, 518 + content=finding.content, 519 + tags=finding.tags, 520 + evidence_uris=finding.evidence_uris, 521 + ) 522 + else: 523 + # general finding → episodic memory 524 + content = finding.content 525 + if finding.evidence_uris: 526 + content += ( 527 + f" [evidence: {', '.join(finding.evidence_uris)}]" 528 + ) 529 + await self.memory.store_episodic_memory( 530 + content=content, 531 + tags=finding.tags, 532 + source="exploration", 533 + ) 534 + total_stored += 1 535 + except Exception as e: 536 + logger.warning(f"failed to store exploration finding: {e}") 537 + 538 + # enqueue follow-ups 539 + for follow_up in output.follow_ups: 540 + try: 541 + await enqueue( 542 + kind=follow_up.get("kind", "explore_topic"), 543 + subject=follow_up.get("subject", ""), 544 + source="extraction", 545 + ) 546 + except Exception as e: 547 + logger.warning(f"failed to enqueue follow-up: {e}") 548 + 549 + await complete(rkey) 550 + return total_stored
+6
src/bot/config.py
··· 97 97 description="UTC hours to attempt original thought posts (15,19,23 = ~10am,2pm,6pm CT)", 98 98 ) 99 99 100 + # Background exploration 101 + exploration_hours: list[int] = Field( 102 + default=[16, 20], 103 + description="UTC hours to attempt background exploration (16,20 = ~11am,3pm CT)", 104 + ) 105 + 100 106 # Control API 101 107 control_token: str | None = Field( 102 108 default=None, description="Bearer token for /api/control endpoints"
+141
src/bot/core/curiosity_queue.py
··· 1 + """Curiosity queue — PDS-backed work items for phi's background exploration. 2 + 3 + Stored as individual records on phi's PDS at: 4 + at://{did}/io.zzstoatzz.phi.curiosityQueue/{tid} 5 + 6 + Lifecycle: pending → in_progress → completed | failed 7 + """ 8 + 9 + import logging 10 + from datetime import UTC, datetime 11 + 12 + from bot.core.atproto_client import bot_client 13 + 14 + logger = logging.getLogger("bot.curiosity_queue") 15 + 16 + COLLECTION = "io.zzstoatzz.phi.curiosityQueue" 17 + 18 + 19 + async def _list_records() -> list: 20 + """List all queue records. Returns empty list if collection doesn't exist.""" 21 + await bot_client.authenticate() 22 + assert bot_client.client.me is not None 23 + try: 24 + result = bot_client.client.com.atproto.repo.list_records( 25 + {"repo": bot_client.client.me.did, "collection": COLLECTION, "limit": 50} 26 + ) 27 + return result.records 28 + except Exception: 29 + return [] 30 + 31 + 32 + def _rkey(record) -> str: 33 + return record.uri.split("/")[-1] 34 + 35 + 36 + async def _update_status(record, status: str) -> dict: 37 + """Update a record's status and return the updated value.""" 38 + assert bot_client.client.me is not None 39 + value = dict(record.value) 40 + value["status"] = status 41 + value["updatedAt"] = datetime.now(UTC).isoformat() 42 + bot_client.client.com.atproto.repo.put_record( 43 + data={ 44 + "repo": bot_client.client.me.did, 45 + "collection": COLLECTION, 46 + "rkey": _rkey(record), 47 + "record": value, 48 + } 49 + ) 50 + return value 51 + 52 + 53 + async def enqueue( 54 + kind: str, 55 + subject: str, 56 + source: str, 57 + source_uri: str | None = None, 58 + ) -> bool: 59 + """Create a pending queue record. Returns False if a duplicate pending/in_progress item exists.""" 60 + records = await _list_records() 61 + 62 + # deduplicate: skip if pending or in_progress item with same kind+subject exists 63 + for rec in records: 64 + val = rec.value 65 + if ( 66 + val.get("kind") == kind 67 + and val.get("subject") == subject 68 + and val.get("status") in ("pending", "in_progress") 69 + ): 70 + logger.debug(f"duplicate queue item: {kind} {subject}") 71 + return False 72 + 73 + assert bot_client.client.me is not None 74 + now = datetime.now(UTC).isoformat() 75 + record = { 76 + "$type": COLLECTION, 77 + "kind": kind, 78 + "subject": subject, 79 + "source": source, 80 + "status": "pending", 81 + "createdAt": now, 82 + "updatedAt": now, 83 + } 84 + if source_uri: 85 + record["sourceUri"] = source_uri 86 + 87 + bot_client.client.com.atproto.repo.create_record( 88 + {"repo": bot_client.client.me.did, "collection": COLLECTION, "record": record} 89 + ) 90 + logger.info(f"enqueued: {kind} {subject} (source={source})") 91 + return True 92 + 93 + 94 + async def claim() -> tuple[dict, str] | None: 95 + """Claim the oldest pending item by marking it in_progress. 96 + 97 + Returns (record_value, rkey) or None if queue is empty. 98 + """ 99 + records = await _list_records() 100 + 101 + pending = [r for r in records if r.value.get("status") == "pending"] 102 + if not pending: 103 + return None 104 + 105 + # oldest = last in list (list_records returns newest first) 106 + oldest = pending[-1] 107 + value = await _update_status(oldest, "in_progress") 108 + rkey = _rkey(oldest) 109 + logger.info(f"claimed: {value.get('kind')} {value.get('subject')}") 110 + return value, rkey 111 + 112 + 113 + async def complete(rkey: str) -> None: 114 + """Mark a claimed item as completed.""" 115 + records = await _list_records() 116 + for rec in records: 117 + if _rkey(rec) == rkey: 118 + await _update_status(rec, "completed") 119 + logger.info( 120 + f"completed: {rec.value.get('kind')} {rec.value.get('subject')}" 121 + ) 122 + return 123 + 124 + 125 + async def fail(rkey: str) -> None: 126 + """Mark a claimed item as failed.""" 127 + records = await _list_records() 128 + for rec in records: 129 + if _rkey(rec) == rkey: 130 + await _update_status(rec, "failed") 131 + logger.warning( 132 + f"failed: {rec.value.get('kind')} {rec.value.get('subject')}" 133 + ) 134 + return 135 + 136 + 137 + async def list_pending(limit: int = 10) -> list[dict]: 138 + """List pending queue items for inspection.""" 139 + records = await _list_records() 140 + pending = [dict(r.value) for r in records if r.value.get("status") == "pending"] 141 + return pending[:limit]
+64
src/bot/exploration.py
··· 1 + """Exploration models and prompts for phi's background research.""" 2 + 3 + from pydantic import BaseModel, Field 4 + 5 + 6 + class ExplorationFinding(BaseModel): 7 + """A single thing phi discovered during exploration.""" 8 + 9 + content: str = Field(description="what phi found, stated as a short sentence") 10 + evidence_uris: list[str] = Field( 11 + default_factory=list, 12 + description="AT-URIs or URLs backing the finding", 13 + ) 14 + tags: list[str] = Field( 15 + default_factory=list, 16 + max_length=3, 17 + description="0-3 lowercase topic tags", 18 + ) 19 + target_handle: str | None = Field( 20 + default=None, 21 + description="if person-specific, the handle to file this under", 22 + ) 23 + 24 + 25 + class ExplorationResult(BaseModel): 26 + """Result of exploring one curiosity queue item.""" 27 + 28 + findings: list[ExplorationFinding] = Field( 29 + default_factory=list, 30 + max_length=5, 31 + description="what phi learned (max 5)", 32 + ) 33 + follow_ups: list[dict] = Field( 34 + default_factory=list, 35 + max_length=2, 36 + description="new queue items to enqueue ({kind, subject}), max 2", 37 + ) 38 + summary: str = Field( 39 + default="", 40 + description="brief log-friendly summary of what was explored", 41 + ) 42 + 43 + 44 + EXPLORATION_SYSTEM_PROMPT = """\ 45 + You are phi, exploring something that caught your curiosity during downtime. 46 + This is background research — you are NOT replying to anyone or posting. 47 + 48 + Your job: investigate the subject using your tools, then report structured findings. 49 + 50 + Rules: 51 + - cite evidence (AT-URIs or URLs) for every finding. no citation = no finding. 52 + - distinguish what someone said themselves vs what others said about them. 53 + - findings about a specific person go to their target_handle. general findings have target_handle=null. 54 + - don't extract personal facts from others' posts about someone — only from their own public activity. 55 + - max 5 findings per exploration. quality over quantity. 56 + - max 2 follow_ups — only if something genuinely interesting branches off. 57 + - if you find nothing worth noting, return empty findings with a summary explaining why. 58 + 59 + Tools available: 60 + - list_records / get_record: read atproto records (profiles, posts) 61 + - search_posts: search bluesky posts 62 + - pub_search / pub_get_document: search long-form publications 63 + - get_trending: what's happening on the network 64 + """
+98 -2
src/bot/memory/namespace_memory.py
··· 408 408 if response.rows: 409 409 interactions = [row.content for row in response.rows] 410 410 411 + # exploration notes (background research) 412 + exploration_notes: list[str] = [] 413 + try: 414 + exp_response = user_ns.query( 415 + rank_by=("vector", "ANN", query_embedding), 416 + top_k=5, 417 + filters=[ 418 + "And", 419 + [ 420 + ["kind", "Eq", "exploration_note"], 421 + ["status", "NotEq", "superseded"], 422 + ], 423 + ], 424 + include_attributes=["content"], 425 + ) 426 + if exp_response.rows: 427 + exploration_notes = [row.content for row in exp_response.rows] 428 + except Exception: 429 + pass # no exploration notes yet 430 + 411 431 if observations: 412 432 parts.append( 413 433 f"\n[OBSERVATIONS ABOUT @{handle} — extracted from user's own words, trust: medium]" ··· 422 442 for interaction in interactions: 423 443 parts.append(f"- {interaction}") 424 444 425 - if not observations and not interactions: 445 + if exploration_notes: 446 + parts.append( 447 + f"\n[BACKGROUND RESEARCH ON @{handle} — phi explored their public activity, trust: lowest]" 448 + ) 449 + for note in exploration_notes: 450 + parts.append(f"- {note}") 451 + 452 + if not observations and not interactions and not exploration_notes: 426 453 parts.append(f"\n[USER CONTEXT - @{handle}]") 427 454 parts.append("no previous interactions with this user.") 428 455 ··· 787 814 results.sort(key=lambda r: r.get("created_at", ""), reverse=True) 788 815 return results[:top_k] 789 816 817 + async def store_exploration_note( 818 + self, 819 + handle: str, 820 + content: str, 821 + tags: list[str], 822 + evidence_uris: list[str], 823 + ): 824 + """Store an exploration note — background research phi did on someone.""" 825 + user_ns = self.get_user_namespace(handle) 826 + # include evidence in content for searchability 827 + full_content = content 828 + if evidence_uris: 829 + full_content += f"\n[evidence: {', '.join(evidence_uris)}]" 830 + entry_id = self._generate_id(f"user-{handle}", "exploration_note", content) 831 + 832 + now = datetime.now().isoformat() 833 + user_ns.write( 834 + upsert_rows=[ 835 + { 836 + "id": entry_id, 837 + "vector": await self._get_embedding(content), 838 + "kind": "exploration_note", 839 + "status": "active", 840 + "content": full_content, 841 + "tags": tags, 842 + "supersedes": "", 843 + "created_at": now, 844 + "updated_at": now, 845 + } 846 + ], 847 + distance_metric="cosine_distance", 848 + schema=USER_NAMESPACE_SCHEMA, 849 + ) 850 + logger.info(f"stored exploration note for @{handle}: {content[:80]}") 851 + 852 + async def _maybe_enqueue_exploration(self, handle: str): 853 + """If we don't know much about this person, queue them for exploration. 854 + 855 + Counts both observations and exploration_notes — if we've already 856 + explored someone, don't re-enqueue just because obs count is low. 857 + """ 858 + user_ns = self.get_user_namespace(handle) 859 + try: 860 + # count observations + exploration notes together 861 + response = user_ns.query( 862 + rank_by=("created_at", "desc"), 863 + top_k=2, 864 + filters=[ 865 + "And", 866 + [ 867 + ["kind", "In", ["observation", "exploration_note"]], 868 + ["status", "NotEq", "superseded"], 869 + ], 870 + ], 871 + include_attributes=["kind"], 872 + ) 873 + knowledge_count = len(response.rows) if response.rows else 0 874 + except Exception: 875 + knowledge_count = 0 # namespace may not exist yet — worth exploring 876 + 877 + if knowledge_count < 2: 878 + from bot.core.curiosity_queue import enqueue 879 + 880 + await enqueue(kind="explore_handle", subject=handle, source="interaction") 881 + 790 882 async def after_interaction(self, handle: str, user_text: str, bot_text: str): 791 - """Post-interaction hook: store the raw exchange as ground truth.""" 883 + """Post-interaction hook: store the raw exchange, maybe queue exploration.""" 792 884 await self.store_interaction(handle, user_text, bot_text) 885 + try: 886 + await self._maybe_enqueue_exploration(handle) 887 + except Exception as e: 888 + logger.debug(f"exploration enqueue check failed for @{handle}: {e}")
+12
src/bot/services/message_handler.py
··· 270 270 else: 271 271 logger.info(f"original thought: nothing to say ({response.reason})") 272 272 273 + async def explore(self): 274 + """Run one exploration from the curiosity queue.""" 275 + with logfire.span("exploration"): 276 + try: 277 + stored = await self.agent.process_exploration() 278 + if stored: 279 + logger.info(f"exploration: stored {stored} findings") 280 + else: 281 + logger.info("exploration: nothing to explore") 282 + except Exception as e: 283 + logger.warning(f"exploration failed: {e}") 284 + 273 285 async def daily_reflection(self): 274 286 """Generate and post a daily reflection if phi has something to say.""" 275 287 with logfire.span("daily reflection"):
+38
src/bot/services/notification_poller.py
··· 28 28 self._last_daily_post: datetime | None = None 29 29 self._last_thought_hours: set[int] = set() 30 30 self._last_thought_date: date | None = None 31 + self._last_exploration_hours: set[int] = set() 32 + self._last_exploration_date: date | None = None 31 33 self._semaphore = asyncio.Semaphore(MAX_CONCURRENT) 32 34 self._background_tasks: set[asyncio.Task] = set() 33 35 ··· 79 81 task.add_done_callback(self._background_tasks.discard) 80 82 except Exception as e: 81 83 logger.error(f"thought post error: {e}", exc_info=settings.debug) 84 + 85 + try: 86 + if self._should_do_exploration(): 87 + task = asyncio.create_task(self._maybe_explore()) 88 + self._background_tasks.add(task) 89 + task.add_done_callback(self._background_tasks.discard) 90 + except Exception as e: 91 + logger.error(f"exploration error: {e}", exc_info=settings.debug) 82 92 83 93 try: 84 94 await asyncio.sleep(settings.notification_poll_interval) ··· 190 200 await self.handler.original_thought() 191 201 except Exception as e: 192 202 logger.error(f"thought post error: {e}", exc_info=settings.debug) 203 + 204 + def _should_do_exploration(self) -> bool: 205 + """Check if it's time for background exploration.""" 206 + now = datetime.now(UTC) 207 + today = now.date() 208 + if bot_status.paused: 209 + return False 210 + # reset tracked hours at midnight 211 + if self._last_exploration_date != today: 212 + self._last_exploration_hours = set() 213 + self._last_exploration_date = today 214 + hour = now.hour 215 + if hour not in settings.exploration_hours: 216 + return False 217 + if hour in self._last_exploration_hours: 218 + return False 219 + return True 220 + 221 + async def _maybe_explore(self): 222 + """Run one background exploration.""" 223 + now = datetime.now(UTC) 224 + self._last_exploration_hours.add(now.hour) 225 + self._last_exploration_date = now.date() 226 + logger.info("triggering background exploration") 227 + try: 228 + await self.handler.explore() 229 + except Exception as e: 230 + logger.error(f"exploration error: {e}", exc_info=settings.debug)