a digital entity named phi that roams bsky phi.zzstoatzz.io
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

remove feed scanner, use saved_feeds for read_feed tool instead

feed scanning was a bespoke parallel system for something the agent
already has tools for. phi should read feeds like a user — via
read_feed("for-you") during musings, not through infrastructure.

- delete feed_scanner.py
- add saved_feeds config: friendly name → AT-URI mapping
- read_feed checks saved_feeds first, then phi's own generators
- remove feed_scan_interval, feed context injection, _poll_count

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+20 -180
+1 -10
src/bot/agent.py
··· 522 522 logger.info(f"reflection finished: {summary[:200]}") 523 523 return summary 524 524 525 - async def process_musing( 526 - self, recent_posts: list[str] | None = None, feed_context: str = "" 527 - ) -> str: 525 + async def process_musing(self, recent_posts: list[str] | None = None) -> str: 528 526 """Generate an original thought post from memory, reading, patterns noticed. 529 527 530 528 Side effects (posting) happen via the `post` tool inside the agent run. ··· 552 550 recent_activity += "[RECENT CONVERSATIONS]:\n" + "\n".join(lines) 553 551 except Exception as e: 554 552 logger.warning(f"failed to get recent interactions for musing: {e}") 555 - 556 - if feed_context: 557 - if recent_activity: 558 - recent_activity += "\n\n" 559 - recent_activity += ( 560 - "[FOR YOU FEED — posts from across the network]:\n" + feed_context 561 - ) 562 553 563 554 deps = PhiDeps( 564 555 author_handle="",
+5 -7
src/bot/config.py
··· 98 98 ) 99 99 100 100 # Event-driven exploration 101 - feed_scan_interval: int = Field( 102 - default=6, 103 - description="poll cycles between feed scans (6 * 10s = ~60s)", 104 - ) 105 - for_you_feed_uri: str = Field( 106 - default="at://did:plc:3guzzweuqraryl3rdkimjamk/app.bsky.feed.generator/for-you", 107 - description="AT-URI of the For You feed generator to scan", 101 + saved_feeds: dict[str, str] = Field( 102 + default={ 103 + "for-you": "at://did:plc:3guzzweuqraryl3rdkimjamk/app.bsky.feed.generator/for-you", 104 + }, 105 + description="friendly name → AT-URI for external feeds phi can read", 108 106 ) 109 107 max_idle_explorations_per_hour: int = Field( 110 108 default=3,
-132
src/bot/core/feed_scanner.py
··· 1 - """Feed scanner — event source for phi's curiosity queue. 2 - 3 - Periodically scans the For You feed for unfamiliar authors and enqueues 4 - them for exploration. Runs as a background task, never blocks notifications. 5 - """ 6 - 7 - import logging 8 - from datetime import UTC, datetime 9 - 10 - import logfire 11 - 12 - from bot.config import settings 13 - from bot.core.atproto_client import bot_client 14 - from bot.core.curiosity_queue import enqueue 15 - from bot.tools._helpers import _format_feed_posts 16 - 17 - logger = logging.getLogger("bot.feed_scanner") 18 - 19 - # how long before a subject can be re-enqueued (seconds) 20 - COOLDOWN_SECONDS = 86400 # 24h 21 - SEEN_CAP = 500 22 - 23 - 24 - class FeedScanner: 25 - """Scans the For You feed and enqueues strangers for exploration.""" 26 - 27 - def __init__(self): 28 - self._seen_uris: set[str] = set() 29 - self._explored_subjects: dict[str, datetime] = {} 30 - self._recent_posts: list = [] 31 - self._seeded = False 32 - 33 - async def _seed_cooldowns(self): 34 - """Seed explored_subjects from completed/failed queue items on PDS.""" 35 - if self._seeded: 36 - return 37 - self._seeded = True 38 - try: 39 - from bot.core.curiosity_queue import _list_records 40 - 41 - records = await _list_records() 42 - now = datetime.now(UTC) 43 - for rec in records: 44 - val = rec.value 45 - status = val["status"] 46 - if status in ("completed", "failed"): 47 - subject = val["subject"] 48 - updated = val.get("updatedAt") or val.get("createdAt", "") 49 - try: 50 - ts = datetime.fromisoformat(str(updated).replace("Z", "+00:00")) 51 - except (ValueError, TypeError): 52 - ts = now 53 - self._explored_subjects[subject] = ts 54 - if self._explored_subjects: 55 - logger.info( 56 - f"seeded {len(self._explored_subjects)} exploration cooldowns" 57 - ) 58 - except Exception as e: 59 - logger.warning(f"failed to seed exploration cooldowns: {e}") 60 - 61 - def _is_on_cooldown(self, subject: str) -> bool: 62 - """Check if a subject was explored recently enough to skip.""" 63 - last = self._explored_subjects.get(subject) 64 - if not last: 65 - return False 66 - elapsed = (datetime.now(UTC) - last).total_seconds() 67 - return elapsed < COOLDOWN_SECONDS 68 - 69 - async def scan(self, memory) -> int: 70 - """Scan For You feed, enqueue strangers with cooldown. Returns count enqueued.""" 71 - await self._seed_cooldowns() 72 - 73 - with logfire.span("feed scan"): 74 - try: 75 - await bot_client.authenticate() 76 - response = await bot_client.get_feed( 77 - settings.for_you_feed_uri, limit=20 78 - ) 79 - except Exception as e: 80 - logger.warning(f"feed scan failed: {e}") 81 - return 0 82 - 83 - if not response.feed: 84 - return 0 85 - 86 - # store for musing context 87 - self._recent_posts = response.feed 88 - 89 - new_posts = [] 90 - for item in response.feed: 91 - uri = item.post.uri 92 - if uri in self._seen_uris: 93 - continue 94 - self._seen_uris.add(uri) 95 - new_posts.append(item) 96 - 97 - # cap seen set 98 - if len(self._seen_uris) > SEEN_CAP: 99 - excess = len(self._seen_uris) - SEEN_CAP 100 - # remove oldest (arbitrary since set, but prevents unbounded growth) 101 - for _ in range(excess): 102 - self._seen_uris.pop() 103 - 104 - enqueued = 0 105 - for item in new_posts: 106 - handle = item.post.author.handle 107 - if handle == settings.bluesky_handle: 108 - continue 109 - if self._is_on_cooldown(handle): 110 - continue 111 - try: 112 - if memory and await memory.is_stranger(handle): 113 - ok = await enqueue( 114 - kind="explore_handle", 115 - subject=handle, 116 - source="feed", 117 - ) 118 - if ok: 119 - self._explored_subjects[handle] = datetime.now(UTC) 120 - enqueued += 1 121 - except Exception as e: 122 - logger.debug(f"feed stranger check failed for @{handle}: {e}") 123 - 124 - if enqueued: 125 - logger.info(f"feed scan: enqueued {enqueued} strangers") 126 - return enqueued 127 - 128 - def get_recent_posts_text(self) -> str: 129 - """Return formatted recent feed posts for musing context injection.""" 130 - if not self._recent_posts: 131 - return "" 132 - return _format_feed_posts(self._recent_posts, limit=10)
+1 -2
src/bot/services/message_handler.py
··· 268 268 logger.exception(f"batch handler error: {e}") 269 269 bot_status.record_error() 270 270 271 - async def original_thought(self, feed_context: str = ""): 271 + async def original_thought(self): 272 272 """Generate and post an original thought if phi has something to say. 273 273 274 274 The agent uses the `post` tool inside its run if it decides to post. ··· 289 289 try: 290 290 summary = await self.agent.process_musing( 291 291 recent_posts=recent_posts or None, 292 - feed_context=feed_context, 293 292 ) 294 293 logger.info(f"original thought: {summary[:200]}") 295 294 except Exception as e:
+1 -24
src/bot/services/notification_poller.py
··· 8 8 9 9 from bot.config import settings 10 10 from bot.core.atproto_client import BotClient 11 - from bot.core.feed_scanner import FeedScanner 12 11 from bot.services.message_handler import MessageHandler 13 12 from bot.status import bot_status 14 13 ··· 24 23 def __init__(self, client: BotClient): 25 24 self.client = client 26 25 self.handler = MessageHandler(client) 27 - self.feed_scanner = FeedScanner() 28 26 self._running = False 29 27 self._task: asyncio.Task | None = None 30 28 self._processed_uris: set[str] = set() ··· 35 33 self._semaphore = asyncio.Semaphore(MAX_CONCURRENT) 36 34 self._background_tasks: set[asyncio.Task] = set() 37 35 # event-driven exploration state 38 - self._poll_count: int = 0 39 36 self._explorations_this_hour: int = 0 40 37 self._exploration_hour: int = -1 41 38 self._polls_since_last_exploration: int = 0 ··· 128 125 await self._seed_schedule_from_history() 129 126 130 127 while self._running: 131 - self._poll_count += 1 132 128 self._polls_since_last_exploration += 1 133 129 134 130 try: ··· 154 150 except Exception as e: 155 151 logger.error(f"thought post error: {e}", exc_info=settings.debug) 156 152 157 - # feed scanning — fire-and-forget, does NOT count as background work 158 - # (so it doesn't block idle exploration) 159 - try: 160 - if self._poll_count % settings.feed_scan_interval == 0: 161 - asyncio.create_task(self._scan_feeds()) 162 - except Exception as e: 163 - logger.error(f"feed scan error: {e}", exc_info=settings.debug) 164 - 165 153 # event-driven exploration — drain queue when idle 166 154 try: 167 155 if self._can_explore(): ··· 302 290 self._last_thought_date = now.date() 303 291 logger.info("triggering original thought") 304 292 try: 305 - feed_context = self.feed_scanner.get_recent_posts_text() 306 - await self.handler.original_thought(feed_context=feed_context) 293 + await self.handler.original_thought() 307 294 except Exception as e: 308 295 logger.error(f"thought post error: {e}", exc_info=settings.debug) 309 296 ··· 338 325 await self.handler.explore() 339 326 except Exception as e: 340 327 logger.error(f"exploration error: {e}", exc_info=settings.debug) 341 - 342 - async def _scan_feeds(self): 343 - """Scan For You feed for strangers to explore.""" 344 - try: 345 - memory = self.handler.agent.memory 346 - enqueued = await self.feed_scanner.scan(memory) 347 - if enqueued: 348 - logger.info(f"feed scan enqueued {enqueued} strangers") 349 - except Exception as e: 350 - logger.warning(f"feed scan error: {e}")
+12 -5
src/bot/tools/feeds.py
··· 126 126 127 127 @agent.tool 128 128 async def read_feed(ctx: RunContext[PhiDeps], name: str, limit: int = 20) -> str: 129 - """Read posts from one of your graze-powered feeds. 129 + """Read posts from a feed by name. 130 130 131 - name: the feed slug (e.g. "mushroom-foraging"). use list_feeds to see available names. 131 + name: a saved feed name (e.g. "for-you") or one of your own feed slugs. 132 + use list_feeds to see available names. 132 133 """ 133 134 try: 134 - await bot_client.authenticate() 135 - assert bot_client.client.me is not None 136 - feed_uri = f"at://{bot_client.client.me.did}/app.bsky.feed.generator/{name}" 135 + # check saved feeds first (external feeds mapped by friendly name) 136 + feed_uri = settings.saved_feeds.get(name) 137 + if not feed_uri: 138 + # fall back to phi's own graze-powered feeds 139 + await bot_client.authenticate() 140 + assert bot_client.client.me is not None 141 + feed_uri = ( 142 + f"at://{bot_client.client.me.did}/app.bsky.feed.generator/{name}" 143 + ) 137 144 response = await bot_client.get_feed(feed_uri, limit=limit) 138 145 if not response.feed: 139 146 return "no posts in this feed yet"