a digital entity named phi that roams bsky phi.zzstoatzz.io
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: remove thread storage duplication, fetch from network

eliminates sqlite thread_messages table in favor of fetching thread context directly from atproto network on-demand.

changes:
- add build_thread_context() utility to extract thread messages from network data
- update message_handler to fetch threads via get_thread() instead of sqlite
- remove thread_messages table, add_message(), get_thread_messages(), get_thread_context()
- keep approval_requests table for future self-modification features
- keep turbopuffer episodic memory (semantic search, not duplicative)

rationale:
- thread data already exists on users' PDSs, aggregated by appview
- network is source of truth (no staleness from edits/deletions)
- simpler architecture, less maintenance
- aligns with atproto's "data on the web" philosophy

tested: bot runs successfully, polls notifications, no errors

see sandbox/THREAD_STORAGE_REFACTOR.md for full analysis

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

zzstoatzz 1da00f09 5ce04186

+45 -122
+2 -64
src/bot/database.py
··· 1 - """Simple SQLite database for storing thread history""" 1 + """Simple SQLite database for approval requests""" 2 2 3 3 import sqlite3 4 4 from contextlib import contextmanager ··· 7 7 8 8 9 9 class ThreadDatabase: 10 - """Simple database for storing Bluesky thread conversations""" 10 + """Database for storing approval requests (future self-modification features)""" 11 11 12 12 def __init__(self, db_path: Path = Path("threads.db")): 13 13 self.db_path = db_path ··· 16 16 def _init_db(self): 17 17 """Initialize database schema""" 18 18 with self._get_connection() as conn: 19 - conn.execute(""" 20 - CREATE TABLE IF NOT EXISTS thread_messages ( 21 - id INTEGER PRIMARY KEY AUTOINCREMENT, 22 - thread_uri TEXT NOT NULL, 23 - author_handle TEXT NOT NULL, 24 - author_did TEXT NOT NULL, 25 - message_text TEXT NOT NULL, 26 - post_uri TEXT NOT NULL UNIQUE, 27 - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 28 - ) 29 - """) 30 - conn.execute(""" 31 - CREATE INDEX IF NOT EXISTS idx_thread_uri 32 - ON thread_messages(thread_uri) 33 - """) 34 - 35 19 # Approval requests table 36 20 conn.execute(""" 37 21 CREATE TABLE IF NOT EXISTS approval_requests ( ··· 73 57 finally: 74 58 conn.close() 75 59 76 - def add_message( 77 - self, 78 - thread_uri: str, 79 - author_handle: str, 80 - author_did: str, 81 - message_text: str, 82 - post_uri: str, 83 - ): 84 - """Add a message to a thread""" 85 - with self._get_connection() as conn: 86 - conn.execute( 87 - """ 88 - INSERT OR IGNORE INTO thread_messages 89 - (thread_uri, author_handle, author_did, message_text, post_uri) 90 - VALUES (?, ?, ?, ?, ?) 91 - """, 92 - (thread_uri, author_handle, author_did, message_text, post_uri), 93 - ) 94 - 95 - def get_thread_messages(self, thread_uri: str) -> list[dict[str, Any]]: 96 - """Get all messages in a thread, ordered chronologically""" 97 - with self._get_connection() as conn: 98 - cursor = conn.execute( 99 - """ 100 - SELECT * FROM thread_messages 101 - WHERE thread_uri = ? 102 - ORDER BY created_at ASC 103 - """, 104 - (thread_uri,), 105 - ) 106 - 107 - return [dict(row) for row in cursor.fetchall()] 108 - 109 - def get_thread_context(self, thread_uri: str) -> str: 110 - """Get thread messages formatted for AI context""" 111 - messages = self.get_thread_messages(thread_uri) 112 - 113 - if not messages: 114 - return "No previous messages in this thread." 115 - 116 - context_parts = ["Previous messages in this thread:"] 117 - for msg in messages: 118 - context_parts.append(f"@{msg['author_handle']}: {msg['message_text']}") 119 - 120 - return "\n".join(context_parts) 121 - 122 60 def create_approval_request( 123 61 self, request_type: str, request_data: str, thread_uri: str | None = None 124 62 ) -> int:
+10 -58
src/bot/services/message_handler.py
··· 7 7 from bot.agent import PhiAgent 8 8 from bot.config import settings 9 9 from bot.core.atproto_client import BotClient 10 - from bot.database import thread_db 11 10 from bot.status import bot_status 12 - from bot.utils.thread import traverse_thread 11 + from bot.utils.thread import build_thread_context 13 12 14 13 logger = logging.getLogger("bot.handler") 15 14 ··· 21 20 self.client = client 22 21 self.agent = PhiAgent() 23 22 24 - async def _store_thread_messages(self, thread_node, thread_uri: str): 25 - """Extract and store all messages from a thread.""" 26 - 27 - def store_post(node): 28 - """Store a single post from the thread.""" 29 - if not hasattr(node, "post"): 30 - return 31 - 32 - post = node.post 33 - thread_db.add_message( 34 - thread_uri=thread_uri, 35 - author_handle=post.author.handle, 36 - author_did=post.author.did, 37 - message_text=post.record.text, 38 - post_uri=post.uri, 39 - ) 40 - 41 - # Use utility to traverse and store all posts 42 - traverse_thread(thread_node, store_post) 43 - 44 23 async def handle_mention(self, notification): 45 24 """Process a mention or reply notification.""" 46 25 try: ··· 73 52 root_ref = parent_ref 74 53 thread_uri = post_uri 75 54 76 - # Discover thread context if we haven't participated yet 77 - existing_messages = thread_db.get_thread_messages(thread_uri) 78 - if not existing_messages: 79 - # Phi is being tagged into an existing thread - fetch full context 80 - logger.debug(f"🔍 Discovering thread context for {thread_uri}") 81 - try: 82 - thread_data = await self.client.get_thread(thread_uri, depth=100) 83 - # Extract and store all messages from the thread 84 - await self._store_thread_messages(thread_data.thread, thread_uri) 85 - except Exception as e: 86 - logger.warning(f"Failed to fetch thread context: {e}") 87 - 88 - # Store the current mention in thread history 89 - thread_db.add_message( 90 - thread_uri=thread_uri, 91 - author_handle=author_handle, 92 - author_did=author_did, 93 - message_text=mention_text, 94 - post_uri=post_uri, 95 - ) 96 - 97 - # Get thread context 98 - thread_context = thread_db.get_thread_context(thread_uri) 55 + # Fetch thread context directly from network 56 + thread_context = "No previous messages in this thread." 57 + try: 58 + logger.debug(f"🔍 Fetching thread context for {thread_uri}") 59 + thread_data = await self.client.get_thread(thread_uri, depth=100) 60 + thread_context = build_thread_context(thread_data.thread) 61 + except Exception as e: 62 + logger.warning(f"Failed to fetch thread context: {e}") 99 63 100 64 # Process with agent (has episodic memory + MCP tools) 101 65 response = await self.agent.process_mention( ··· 129 93 reply_ref = models.AppBskyFeedPost.ReplyRef( 130 94 parent=parent_ref, root=root_ref 131 95 ) 132 - reply_response = await self.client.create_post( 133 - response.text, reply_to=reply_ref 134 - ) 135 - 136 - # Store bot's response in thread history 137 - if reply_response and hasattr(reply_response, "uri"): 138 - thread_db.add_message( 139 - thread_uri=thread_uri, 140 - author_handle=settings.bluesky_handle, 141 - author_did=self.client.me.did if self.client.me else "bot", 142 - message_text=response.text, 143 - post_uri=reply_response.uri, 144 - ) 96 + await self.client.create_post(response.text, reply_to=reply_ref) 145 97 146 98 bot_status.record_response() 147 99 logger.info(f"✅ Replied to @{author_handle}: {response.text[:50]}...")
+33
src/bot/utils/thread.py
··· 58 58 # Sort by indexed timestamp 59 59 posts.sort(key=lambda p: p.indexed_at if hasattr(p, "indexed_at") else "") 60 60 return posts 61 + 62 + 63 + def build_thread_context(thread_node) -> str: 64 + """Build conversational context string from ATProto thread structure. 65 + 66 + Args: 67 + thread_node: ATProto thread node 68 + 69 + Returns: 70 + Formatted string of messages like: 71 + @alice: I love birds 72 + @phi: me too! what's your favorite? 73 + @alice: especially crows 74 + 75 + Example: 76 + thread_data = await client.get_thread(uri, depth=100) 77 + context = build_thread_context(thread_data.thread) 78 + """ 79 + if not thread_node: 80 + return "No previous messages in this thread." 81 + 82 + posts = extract_posts_chronological(thread_node) 83 + 84 + if not posts: 85 + return "No previous messages in this thread." 86 + 87 + messages = [] 88 + for post in posts: 89 + handle = post.author.handle 90 + text = post.record.text if hasattr(post.record, "text") else "[no text]" 91 + messages.append(f"@{handle}: {text}") 92 + 93 + return "\n".join(messages)