a digital entity named phi that roams bsky phi.zzstoatzz.io
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

event-driven exploration: replace cron with feed scanning + idle budget

exploration was clock-shaped ("explore because it's 4pm"). now it's
event-driven ("explore because something surfaced").

changes:
- remove exploration_hours cron schedule
- add FeedScanner: scans For You feed as background task, enqueues
strangers for exploration with 24h cooldown per subject
- replace cron draining with idle-budget: explore when system is truly
idle (no background tasks), max 3/hour, 5-min cooldown between
- inject For You feed context into musings so phi sees the broader
network when deciding what to post about
- normalize queue kinds: alias legacy kinds (product_explore, concept,
etc) to canonical set, validate on enqueue

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+229 -38
+19 -2
src/bot/agent.py
··· 522 522 logger.info(f"reflection finished: {summary[:200]}") 523 523 return summary 524 524 525 - async def process_musing(self, recent_posts: list[str] | None = None) -> str: 525 + async def process_musing( 526 + self, recent_posts: list[str] | None = None, feed_context: str = "" 527 + ) -> str: 526 528 """Generate an original thought post from memory, reading, patterns noticed. 527 529 528 530 Side effects (posting) happen via the `post` tool inside the agent run. ··· 550 552 recent_activity += "[RECENT CONVERSATIONS]:\n" + "\n".join(lines) 551 553 except Exception as e: 552 554 logger.warning(f"failed to get recent interactions for musing: {e}") 555 + 556 + if feed_context: 557 + if recent_activity: 558 + recent_activity += "\n\n" 559 + recent_activity += ( 560 + "[FOR YOU FEED — posts from across the network]:\n" + feed_context 561 + ) 553 562 554 563 deps = PhiDeps( 555 564 author_handle="", ··· 623 632 if not claimed: 624 633 return 0 625 634 635 + KIND_ALIASES = { 636 + "person_exploration": "explore_handle", 637 + "product_explore": "explore_topic", 638 + "topic_exploration": "explore_topic", 639 + "concept": "explore_topic", 640 + "read": "explore_url", 641 + } 642 + 626 643 item, rkey = claimed 627 - kind = item.get("kind", "") 644 + kind = KIND_ALIASES.get(item.get("kind", ""), item.get("kind", "")) 628 645 subject = item.get("subject", "") 629 646 logger.info(f"exploring: {kind} {subject}") 630 647
+16 -4
src/bot/config.py
··· 97 97 description="UTC hours to attempt original thought posts (~8am-10pm CT, every 2h)", 98 98 ) 99 99 100 - # Background exploration 101 - exploration_hours: list[int] = Field( 102 - default=[16, 20], 103 - description="UTC hours to attempt background exploration (16,20 = ~11am,3pm CT)", 100 + # Event-driven exploration 101 + feed_scan_interval: int = Field( 102 + default=6, 103 + description="poll cycles between feed scans (6 * 10s = ~60s)", 104 + ) 105 + for_you_feed_uri: str = Field( 106 + default="at://did:plc:3guzzweuqraryl3rdkimjamk/app.bsky.feed.generator/for-you", 107 + description="AT-URI of the For You feed generator to scan", 108 + ) 109 + max_idle_explorations_per_hour: int = Field( 110 + default=3, 111 + description="cap exploration drains per hour", 112 + ) 113 + exploration_cooldown_polls: int = Field( 114 + default=30, 115 + description="min polls (~5 min) between explorations", 104 116 ) 105 117 106 118 # Control API
+5
src/bot/core/curiosity_queue.py
··· 19 19 logger = logging.getLogger("bot.curiosity_queue") 20 20 21 21 COLLECTION = "io.zzstoatzz.phi.curiosityQueue" 22 + CANONICAL_KINDS = {"explore_handle", "explore_topic", "explore_url"} 22 23 23 24 24 25 async def _list_records() -> list: ··· 62 63 source_uri: str | None = None, 63 64 ) -> bool: 64 65 """Create a pending queue record. Returns False if a duplicate pending/in_progress item exists.""" 66 + if kind not in CANONICAL_KINDS: 67 + logger.warning(f"rejected non-canonical kind: {kind}") 68 + return False 69 + 65 70 records = await _list_records() 66 71 67 72 # deduplicate: skip if pending or in_progress item with same kind+subject exists
+132
src/bot/core/feed_scanner.py
··· 1 + """Feed scanner — event source for phi's curiosity queue. 2 + 3 + Periodically scans the For You feed for unfamiliar authors and enqueues 4 + them for exploration. Runs as a background task, never blocks notifications. 5 + """ 6 + 7 + import logging 8 + from datetime import UTC, datetime 9 + 10 + import logfire 11 + 12 + from bot.config import settings 13 + from bot.core.atproto_client import bot_client 14 + from bot.core.curiosity_queue import enqueue 15 + from bot.tools._helpers import _format_feed_posts 16 + 17 + logger = logging.getLogger("bot.feed_scanner") 18 + 19 + # how long before a subject can be re-enqueued (seconds) 20 + COOLDOWN_SECONDS = 86400 # 24h 21 + SEEN_CAP = 500 22 + 23 + 24 + class FeedScanner: 25 + """Scans the For You feed and enqueues strangers for exploration.""" 26 + 27 + def __init__(self): 28 + self._seen_uris: set[str] = set() 29 + self._explored_subjects: dict[str, datetime] = {} 30 + self._recent_posts: list = [] 31 + self._seeded = False 32 + 33 + async def _seed_cooldowns(self): 34 + """Seed explored_subjects from completed/failed queue items on PDS.""" 35 + if self._seeded: 36 + return 37 + self._seeded = True 38 + try: 39 + from bot.core.curiosity_queue import _list_records 40 + 41 + records = await _list_records() 42 + now = datetime.now(UTC) 43 + for rec in records: 44 + val = rec.value 45 + status = val["status"] 46 + if status in ("completed", "failed"): 47 + subject = val["subject"] 48 + updated = val.get("updatedAt") or val.get("createdAt", "") 49 + try: 50 + ts = datetime.fromisoformat(str(updated).replace("Z", "+00:00")) 51 + except (ValueError, TypeError): 52 + ts = now 53 + self._explored_subjects[subject] = ts 54 + if self._explored_subjects: 55 + logger.info( 56 + f"seeded {len(self._explored_subjects)} exploration cooldowns" 57 + ) 58 + except Exception as e: 59 + logger.warning(f"failed to seed exploration cooldowns: {e}") 60 + 61 + def _is_on_cooldown(self, subject: str) -> bool: 62 + """Check if a subject was explored recently enough to skip.""" 63 + last = self._explored_subjects.get(subject) 64 + if not last: 65 + return False 66 + elapsed = (datetime.now(UTC) - last).total_seconds() 67 + return elapsed < COOLDOWN_SECONDS 68 + 69 + async def scan(self, memory) -> int: 70 + """Scan For You feed, enqueue strangers with cooldown. Returns count enqueued.""" 71 + await self._seed_cooldowns() 72 + 73 + with logfire.span("feed scan"): 74 + try: 75 + await bot_client.authenticate() 76 + response = await bot_client.get_feed( 77 + settings.for_you_feed_uri, limit=20 78 + ) 79 + except Exception as e: 80 + logger.warning(f"feed scan failed: {e}") 81 + return 0 82 + 83 + if not response.feed: 84 + return 0 85 + 86 + # store for musing context 87 + self._recent_posts = response.feed 88 + 89 + new_posts = [] 90 + for item in response.feed: 91 + uri = item.post.uri 92 + if uri in self._seen_uris: 93 + continue 94 + self._seen_uris.add(uri) 95 + new_posts.append(item) 96 + 97 + # cap seen set 98 + if len(self._seen_uris) > SEEN_CAP: 99 + excess = len(self._seen_uris) - SEEN_CAP 100 + # remove oldest (arbitrary since set, but prevents unbounded growth) 101 + for _ in range(excess): 102 + self._seen_uris.pop() 103 + 104 + enqueued = 0 105 + for item in new_posts: 106 + handle = item.post.author.handle 107 + if handle == settings.bluesky_handle: 108 + continue 109 + if self._is_on_cooldown(handle): 110 + continue 111 + try: 112 + if memory and await memory.is_stranger(handle): 113 + ok = await enqueue( 114 + kind="explore_handle", 115 + subject=handle, 116 + source="feed", 117 + ) 118 + if ok: 119 + self._explored_subjects[handle] = datetime.now(UTC) 120 + enqueued += 1 121 + except Exception as e: 122 + logger.debug(f"feed stranger check failed for @{handle}: {e}") 123 + 124 + if enqueued: 125 + logger.info(f"feed scan: enqueued {enqueued} strangers") 126 + return enqueued 127 + 128 + def get_recent_posts_text(self) -> str: 129 + """Return formatted recent feed posts for musing context injection.""" 130 + if not self._recent_posts: 131 + return "" 132 + return _format_feed_posts(self._recent_posts, limit=10)
+3 -2
src/bot/services/message_handler.py
··· 268 268 logger.exception(f"batch handler error: {e}") 269 269 bot_status.record_error() 270 270 271 - async def original_thought(self): 271 + async def original_thought(self, feed_context: str = ""): 272 272 """Generate and post an original thought if phi has something to say. 273 273 274 274 The agent uses the `post` tool inside its run if it decides to post. ··· 288 288 289 289 try: 290 290 summary = await self.agent.process_musing( 291 - recent_posts=recent_posts or None 291 + recent_posts=recent_posts or None, 292 + feed_context=feed_context, 292 293 ) 293 294 logger.info(f"original thought: {summary[:200]}") 294 295 except Exception as e:
+54 -30
src/bot/services/notification_poller.py
··· 1 - """Simplified notification poller.""" 1 + """Notification poller with event-driven exploration.""" 2 2 3 3 import asyncio 4 4 import logging ··· 8 8 9 9 from bot.config import settings 10 10 from bot.core.atproto_client import BotClient 11 + from bot.core.feed_scanner import FeedScanner 11 12 from bot.services.message_handler import MessageHandler 12 13 from bot.status import bot_status 13 14 ··· 23 24 def __init__(self, client: BotClient): 24 25 self.client = client 25 26 self.handler = MessageHandler(client) 27 + self.feed_scanner = FeedScanner() 26 28 self._running = False 27 29 self._task: asyncio.Task | None = None 28 30 self._processed_uris: set[str] = set() ··· 30 32 self._last_daily_post: datetime | None = None 31 33 self._last_thought_hours: set[int] = set() 32 34 self._last_thought_date: date | None = None 33 - self._last_exploration_hours: set[int] = set() 34 - self._last_exploration_date: date | None = None 35 35 self._semaphore = asyncio.Semaphore(MAX_CONCURRENT) 36 36 self._background_tasks: set[asyncio.Task] = set() 37 + # event-driven exploration state 38 + self._poll_count: int = 0 39 + self._explorations_this_hour: int = 0 40 + self._exploration_hour: int = -1 41 + self._polls_since_last_exploration: int = 0 37 42 38 43 async def start(self) -> asyncio.Task: 39 44 """Start polling for notifications.""" ··· 52 57 await self._task 53 58 except asyncio.CancelledError: 54 59 pass 55 - # wait for any in-flight notification handlers 56 60 if self._background_tasks: 57 61 await asyncio.gather(*self._background_tasks, return_exceptions=True) 58 62 ··· 124 128 await self._seed_schedule_from_history() 125 129 126 130 while self._running: 131 + self._poll_count += 1 132 + self._polls_since_last_exploration += 1 133 + 127 134 try: 128 135 await self._check_notifications() 129 136 except Exception as e: ··· 147 154 except Exception as e: 148 155 logger.error(f"thought post error: {e}", exc_info=settings.debug) 149 156 157 + # feed scanning — background task, never blocks notifications 150 158 try: 151 - if self._should_do_exploration(): 159 + if self._poll_count % settings.feed_scan_interval == 0: 160 + task = asyncio.create_task(self._scan_feeds()) 161 + self._background_tasks.add(task) 162 + task.add_done_callback(self._background_tasks.discard) 163 + except Exception as e: 164 + logger.error(f"feed scan error: {e}", exc_info=settings.debug) 165 + 166 + # event-driven exploration — drain queue when idle 167 + try: 168 + if self._can_explore(): 152 169 task = asyncio.create_task(self._maybe_explore()) 153 170 self._background_tasks.add(task) 154 171 task.add_done_callback(self._background_tasks.discard) ··· 172 189 """ 173 190 check_time = self.client.client.get_current_time_iso() 174 191 175 - # Wrap the bsky list_notifications call in an observability span so we 176 - # can see the raw response — counts AND the actual unread items. 177 - # Without this we only see phi's downstream interpretation (post-filter 178 - # batch size), which makes "why did bsky return only N" unanswerable 179 - # from logs alone. 180 192 with logfire.span("fetch notifications", check_time=check_time) as fetch_span: 181 193 response = await self.client.get_notifications() 182 194 notifications = response.notifications ··· 186 198 fetch_span.set_attribute("total_count", len(notifications)) 187 199 fetch_span.set_attribute("unread_count", len(unread)) 188 200 if unread: 189 - # Capture each unread entry as a structured dict so we can 190 - # answer questions like "did bsky return all 3 mentions or 191 - # just 1" without re-running the test. 192 201 fetch_span.set_attribute( 193 202 "unread_items", 194 203 [ ··· 294 303 self._last_thought_date = now.date() 295 304 logger.info("triggering original thought") 296 305 try: 297 - await self.handler.original_thought() 306 + feed_context = self.feed_scanner.get_recent_posts_text() 307 + await self.handler.original_thought(feed_context=feed_context) 298 308 except Exception as e: 299 309 logger.error(f"thought post error: {e}", exc_info=settings.debug) 300 310 301 - def _should_do_exploration(self) -> bool: 302 - """Check if it's time for background exploration.""" 303 - now = datetime.now(UTC) 304 - today = now.date() 311 + # --- event-driven exploration --- 312 + 313 + def _can_explore(self) -> bool: 314 + """Check if exploration should run — idle budget, not a cron.""" 305 315 if bot_status.paused: 306 316 return False 307 - # reset tracked hours at midnight 308 - if self._last_exploration_date != today: 309 - self._last_exploration_hours = set() 310 - self._last_exploration_date = today 311 - hour = now.hour 312 - if hour not in settings.exploration_hours: 317 + # reset hourly counter 318 + now_hour = datetime.now(UTC).hour 319 + if now_hour != self._exploration_hour: 320 + self._explorations_this_hour = 0 321 + self._exploration_hour = now_hour 322 + # budget cap 323 + if self._explorations_this_hour >= settings.max_idle_explorations_per_hour: 313 324 return False 314 - if hour in self._last_exploration_hours: 325 + # cooldown between explorations 326 + if self._polls_since_last_exploration < settings.exploration_cooldown_polls: 327 + return False 328 + # don't explore while any background work is in-flight 329 + if len(self._background_tasks) > 0: 315 330 return False 316 331 return True 317 332 318 333 async def _maybe_explore(self): 319 - """Run one background exploration.""" 320 - now = datetime.now(UTC) 321 - self._last_exploration_hours.add(now.hour) 322 - self._last_exploration_date = now.date() 323 - logger.info("triggering background exploration") 334 + """Drain one item from the curiosity queue.""" 335 + self._explorations_this_hour += 1 336 + self._polls_since_last_exploration = 0 337 + logger.info("triggering idle exploration") 324 338 try: 325 339 await self.handler.explore() 326 340 except Exception as e: 327 341 logger.error(f"exploration error: {e}", exc_info=settings.debug) 342 + 343 + async def _scan_feeds(self): 344 + """Scan For You feed for strangers to explore.""" 345 + try: 346 + memory = self.handler.agent.memory 347 + enqueued = await self.feed_scanner.scan(memory) 348 + if enqueued: 349 + logger.info(f"feed scan enqueued {enqueued} strangers") 350 + except Exception as e: 351 + logger.warning(f"feed scan error: {e}")