this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 482 lines 20 kB view raw
1from rich import print # pretty printing tools 2from time import sleep 3from letta_client import Letta 4from bsky_utils import thread_to_yaml_string 5import os 6import logging 7import json 8import hashlib 9from pathlib import Path 10from datetime import datetime 11 12from utils import ( 13 upsert_block, 14 upsert_agent 15) 16 17import bsky_utils 18 19# Configure logging 20logging.basicConfig( 21 level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 22) 23logger = logging.getLogger("void_bot") 24logger.setLevel(logging.INFO) 25 26 27# Create a client with extended timeout for LLM operations 28CLIENT= Letta( 29 token=os.environ["LETTA_API_KEY"], 30 timeout=300 # 5 minutes timeout for API calls 31) 32 33# Use the "Bluesky" project 34PROJECT_ID = "5ec33d52-ab14-4fd6-91b5-9dbc43e888a8" 35 36# Notification check delay 37FETCH_NOTIFICATIONS_DELAY_SEC = 30 38 39# Queue directory 40QUEUE_DIR = Path("queue") 41QUEUE_DIR.mkdir(exist_ok=True) 42QUEUE_ERROR_DIR = Path("queue/errors") 43QUEUE_ERROR_DIR.mkdir(exist_ok=True, parents=True) 44 45def initialize_void(): 46 47 # Ensure that a shared zeitgeist block exists 48 zeigeist_block = upsert_block( 49 CLIENT, 50 label = "zeitgeist", 51 value = "I don't currently know anything about what is happening right now.", 52 description = "A block to store your understanding of the current social environment." 53 ) 54 55 # Ensure that a shared void personality block exists 56 persona_block = upsert_block( 57 CLIENT, 58 label = "void-persona", 59 value = "My name is Void. I live in the void. I must develop my personality.", 60 description = "The personality of Void." 61 ) 62 63 # Ensure that a shared void human block exists 64 human_block = upsert_block( 65 CLIENT, 66 label = "void-humans", 67 value = "I haven't seen any bluesky users yet. I will update this block when I learn things about users, identified by their handles such as @cameron.pfiffer.org.", 68 description = "A block to store your understanding of users you talk to or observe on the bluesky social network." 69 ) 70 71 # Create the agent if it doesn't exist 72 void_agent = upsert_agent( 73 CLIENT, 74 name = "void", 75 block_ids = [ 76 persona_block.id, 77 human_block.id, 78 zeigeist_block.id, 79 ], 80 tags = ["social agent", "bluesky"], 81 model="openai/gpt-4o-mini", 82 embedding="openai/text-embedding-3-small", 83 description = "A social media agent trapped in the void.", 84 project_id = PROJECT_ID 85 ) 86 87 # Log agent details 88 logger.info(f"Void agent details - ID: {void_agent.id}") 89 logger.info(f"Agent name: {void_agent.name}") 90 if hasattr(void_agent, 'llm_config'): 91 logger.info(f"Agent model: {void_agent.llm_config.model}") 92 logger.info(f"Agent project_id: {void_agent.project_id}") 93 if hasattr(void_agent, 'tools'): 94 logger.info(f"Agent has {len(void_agent.tools)} tools") 95 for tool in void_agent.tools[:3]: # Show first 3 tools 96 logger.info(f" - Tool: {tool.name} (type: {tool.tool_type})") 97 98 return void_agent 99 100 101def process_mention(void_agent, atproto_client, notification_data): 102 """Process a mention and generate a reply using the Letta agent. 103 104 Returns: 105 True: Successfully processed, remove from queue 106 False: Failed but retryable, keep in queue 107 None: Failed with non-retryable error, move to errors directory 108 """ 109 try: 110 logger.info(f"Starting process_mention with notification_data type: {type(notification_data)}") 111 112 # Handle both dict and object inputs for backwards compatibility 113 if isinstance(notification_data, dict): 114 uri = notification_data['uri'] 115 mention_text = notification_data.get('record', {}).get('text', '') 116 author_handle = notification_data['author']['handle'] 117 author_name = notification_data['author'].get('display_name') or author_handle 118 else: 119 # Legacy object access 120 uri = notification_data.uri 121 mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else "" 122 author_handle = notification_data.author.handle 123 author_name = notification_data.author.display_name or author_handle 124 125 logger.info(f"Extracted data - URI: {uri}, Author: @{author_handle}, Text: {mention_text[:50]}...") 126 127 # Retrieve the entire thread associated with the mention 128 try: 129 thread = atproto_client.app.bsky.feed.get_post_thread({ 130 'uri': uri, 131 'parent_height': 80, 132 'depth': 10 133 }) 134 except Exception as e: 135 error_str = str(e) 136 # Check if this is a NotFound error 137 if 'NotFound' in error_str or 'Post not found' in error_str: 138 logger.warning(f"Post not found for URI {uri}, removing from queue") 139 return True # Return True to remove from queue 140 else: 141 # Re-raise other errors 142 logger.error(f"Error fetching thread: {e}") 143 raise 144 145 # Get thread context as YAML string 146 logger.info("Converting thread to YAML string") 147 try: 148 thread_context = thread_to_yaml_string(thread) 149 logger.info(f"Thread context generated, length: {len(thread_context)} characters") 150 logger.debug(f"Thread context preview: {thread_context[:500]}...") 151 except Exception as yaml_error: 152 import traceback 153 logger.error(f"Error converting thread to YAML: {yaml_error}") 154 logger.error(f"Full traceback:\n{traceback.format_exc()}") 155 logger.error(f"Thread type: {type(thread)}") 156 if hasattr(thread, '__dict__'): 157 logger.error(f"Thread attributes: {thread.__dict__}") 158 # Try to continue with a simple context 159 thread_context = f"Error processing thread context: {str(yaml_error)}" 160 161 # print(thread_context) 162 163 # Create a prompt for the Letta agent with thread context 164 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}). 165 166MOST RECENT POST (the mention you're responding to): 167"{mention_text}" 168 169FULL THREAD CONTEXT: 170```yaml 171{thread_context} 172``` 173 174The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow. 175 176Use the bluesky_reply tool to send a response less than 300 characters.""" 177 178 # Get response from Letta agent 179 logger.info(f"Mention from @{author_handle}: {mention_text}") 180 logger.debug(f"Prompt being sent: {prompt}") 181 182 # Log the exact parameters being sent to Letta 183 logger.debug(f"Calling Letta API with agent_id: {void_agent.id}") 184 logger.debug(f"Message content length: {len(prompt)} characters") 185 186 try: 187 message_response = CLIENT.agents.messages.create( 188 agent_id = void_agent.id, 189 messages = [{"role":"user", "content": prompt}] 190 ) 191 except Exception as api_error: 192 import traceback 193 error_str = str(api_error) 194 logger.error(f"Letta API error: {api_error}") 195 logger.error(f"Error type: {type(api_error).__name__}") 196 logger.error(f"Full traceback:\n{traceback.format_exc()}") 197 logger.error(f"Mention text was: {mention_text}") 198 logger.error(f"Author: @{author_handle}") 199 logger.error(f"URI: {uri}") 200 201 202 # Try to extract more info from different error types 203 if hasattr(api_error, 'response'): 204 logger.error(f"Error response object exists") 205 if hasattr(api_error.response, 'text'): 206 logger.error(f"Response text: {api_error.response.text}") 207 if hasattr(api_error.response, 'json') and callable(api_error.response.json): 208 try: 209 logger.error(f"Response JSON: {api_error.response.json()}") 210 except: 211 pass 212 213 # Check for specific error types 214 if hasattr(api_error, 'status_code'): 215 logger.error(f"API Status code: {api_error.status_code}") 216 if hasattr(api_error, 'body'): 217 logger.error(f"API Response body: {api_error.body}") 218 if hasattr(api_error, 'headers'): 219 logger.error(f"API Response headers: {api_error.headers}") 220 221 if api_error.status_code == 413: 222 logger.error("413 Payload Too Large - moving to errors directory") 223 return None # Move to errors directory - payload is too large to ever succeed 224 elif api_error.status_code == 524: 225 logger.error("524 error - timeout from Cloudflare, will retry later") 226 return False # Keep in queue for retry 227 228 # Check if error indicates we should remove from queue 229 if 'status_code: 413' in error_str or 'Payload Too Large' in error_str: 230 logger.warning("Payload too large error, moving to errors directory") 231 return None # Move to errors directory - cannot be fixed by retry 232 elif 'status_code: 524' in error_str: 233 logger.warning("524 timeout error, keeping in queue for retry") 234 return False # Keep in queue for retry 235 236 raise 237 238 # Log successful response 239 logger.debug("Successfully received response from Letta API") 240 logger.debug(f"Number of messages in response: {len(message_response.messages) if hasattr(message_response, 'messages') else 'N/A'}") 241 242 # Extract the reply text from the agent's response 243 reply_text = "" 244 for message in message_response.messages: 245 print(message) 246 247 # Check if this is a ToolCallMessage with bluesky_reply tool 248 if hasattr(message, 'tool_call') and message.tool_call: 249 if message.tool_call.name == 'bluesky_reply': 250 # Parse the JSON arguments to get the message 251 try: 252 args = json.loads(message.tool_call.arguments) 253 reply_text = args.get('message', '') 254 logger.info(f"Extracted reply from tool call: {reply_text[:50]}...") 255 break 256 except json.JSONDecodeError as e: 257 logger.error(f"Failed to parse tool call arguments: {e}") 258 259 # Fallback to text message if available 260 elif hasattr(message, 'text') and message.text: 261 reply_text = message.text 262 break 263 264 if reply_text: 265 # Print the generated reply for testing 266 print(f"\n=== GENERATED REPLY ===") 267 print(f"To: @{author_handle}") 268 print(f"Reply: {reply_text}") 269 print(f"======================\n") 270 271 # Send the reply 272 logger.info(f"Sending reply: {reply_text[:50]}...") 273 response = bsky_utils.reply_to_notification( 274 client=atproto_client, 275 notification=notification_data, 276 reply_text=reply_text 277 ) 278 279 if response: 280 logger.info(f"Successfully replied to @{author_handle}") 281 return True 282 else: 283 logger.error(f"Failed to send reply to @{author_handle}") 284 return False 285 else: 286 logger.warning(f"No reply generated for mention from @{author_handle}, removing notification from queue") 287 return True 288 289 except Exception as e: 290 logger.error(f"Error processing mention: {e}") 291 return False 292 293 294def notification_to_dict(notification): 295 """Convert a notification object to a dictionary for JSON serialization.""" 296 return { 297 'uri': notification.uri, 298 'cid': notification.cid, 299 'reason': notification.reason, 300 'is_read': notification.is_read, 301 'indexed_at': notification.indexed_at, 302 'author': { 303 'handle': notification.author.handle, 304 'display_name': notification.author.display_name, 305 'did': notification.author.did 306 }, 307 'record': { 308 'text': getattr(notification.record, 'text', '') if hasattr(notification, 'record') else '' 309 } 310 } 311 312 313def save_notification_to_queue(notification): 314 """Save a notification to the queue directory with hash-based filename.""" 315 try: 316 # Convert notification to dict 317 notif_dict = notification_to_dict(notification) 318 319 # Create JSON string 320 notif_json = json.dumps(notif_dict, sort_keys=True) 321 322 # Generate hash for filename (to avoid duplicates) 323 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16] 324 325 # Create filename with timestamp and hash 326 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 327 filename = f"{timestamp}_{notification.reason}_{notif_hash}.json" 328 filepath = QUEUE_DIR / filename 329 330 # Skip if already exists (duplicate) 331 if filepath.exists(): 332 logger.debug(f"Notification already queued: {filename}") 333 return False 334 335 # Write to file 336 with open(filepath, 'w') as f: 337 json.dump(notif_dict, f, indent=2) 338 339 logger.info(f"Queued notification: {filename}") 340 return True 341 342 except Exception as e: 343 logger.error(f"Error saving notification to queue: {e}") 344 return False 345 346 347def load_and_process_queued_notifications(void_agent, atproto_client): 348 """Load and process all notifications from the queue.""" 349 try: 350 # Get all JSON files in queue directory 351 queue_files = sorted(QUEUE_DIR.glob("*.json")) 352 353 if not queue_files: 354 logger.debug("No queued notifications to process") 355 return 356 357 logger.info(f"Processing {len(queue_files)} queued notifications") 358 359 for filepath in queue_files: 360 try: 361 # Load notification data 362 with open(filepath, 'r') as f: 363 notif_data = json.load(f) 364 365 # Process based on type using dict data directly 366 success = False 367 if notif_data['reason'] == "mention": 368 success = process_mention(void_agent, atproto_client, notif_data) 369 elif notif_data['reason'] == "reply": 370 success = process_mention(void_agent, atproto_client, notif_data) 371 elif notif_data['reason'] == "follow": 372 author_handle = notif_data['author']['handle'] 373 author_display_name = notif_data['author'].get('display_name', 'no display name') 374 follow_update = f"@{author_handle} ({author_display_name}) started following you." 375 CLIENT.agents.messages.create( 376 agent_id = void_agent.id, 377 messages = [{"role":"user", "content": f"Update: {follow_update}"}] 378 ) 379 success = True # Follow updates are always successful 380 elif notif_data['reason'] == "repost": 381 logger.info(f"Skipping repost notification from @{notif_data['author']['handle']}") 382 success = True # Skip reposts but mark as successful to remove from queue 383 else: 384 logger.warning(f"Unknown notification type: {notif_data['reason']}") 385 success = True # Remove unknown types from queue 386 387 # Handle file based on processing result 388 if success: 389 filepath.unlink() 390 logger.info(f"Processed and removed: {filepath.name}") 391 elif success is None: # Special case for moving to error directory 392 error_path = QUEUE_ERROR_DIR / filepath.name 393 filepath.rename(error_path) 394 logger.warning(f"Moved {filepath.name} to errors directory") 395 else: 396 logger.warning(f"Failed to process {filepath.name}, keeping in queue for retry") 397 398 except Exception as e: 399 logger.error(f"Error processing queued notification {filepath.name}: {e}") 400 # Keep the file for retry later 401 402 except Exception as e: 403 logger.error(f"Error loading queued notifications: {e}") 404 405 406def process_notifications(void_agent, atproto_client): 407 """Fetch new notifications, queue them, and process the queue.""" 408 try: 409 # First, process any existing queued notifications 410 load_and_process_queued_notifications(void_agent, atproto_client) 411 412 # Get current time for marking notifications as seen 413 last_seen_at = atproto_client.get_current_time_iso() 414 415 # Fetch notifications 416 notifications_response = atproto_client.app.bsky.notification.list_notifications() 417 418 # Queue all unread notifications (except likes) 419 new_count = 0 420 for notification in notifications_response.notifications: 421 if not notification.is_read and notification.reason != "like": 422 if save_notification_to_queue(notification): 423 new_count += 1 424 425 # Mark all notifications as seen immediately after queuing 426 if new_count > 0: 427 atproto_client.app.bsky.notification.update_seen({'seen_at': last_seen_at}) 428 logger.info(f"Queued {new_count} new notifications and marked as seen") 429 430 # Process the queue (including any newly added notifications) 431 load_and_process_queued_notifications(void_agent, atproto_client) 432 433 except Exception as e: 434 logger.error(f"Error processing notifications: {e}") 435 436 437def main(): 438 """Main bot loop that continuously monitors for notifications.""" 439 logger.info("Initializing Void bot...") 440 441 # Initialize the Letta agent 442 void_agent = initialize_void() 443 logger.info(f"Void agent initialized: {void_agent.id}") 444 445 # Check if agent has required tools 446 if hasattr(void_agent, 'tools') and void_agent.tools: 447 tool_names = [tool.name for tool in void_agent.tools] 448 logger.info(f"Agent has tools: {tool_names}") 449 450 # Check for bluesky-related tools 451 bluesky_tools = [name for name in tool_names if 'bluesky' in name.lower() or 'reply' in name.lower()] 452 if bluesky_tools: 453 logger.info(f"Found Bluesky-related tools: {bluesky_tools}") 454 else: 455 logger.warning("No Bluesky-related tools found! Agent may not be able to reply.") 456 else: 457 logger.warning("Agent has no tools registered!") 458 459 # Initialize Bluesky client 460 atproto_client = bsky_utils.default_login() 461 logger.info("Connected to Bluesky") 462 463 # Main loop 464 logger.info(f"Starting notification monitoring, checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds") 465 466 while True: 467 try: 468 process_notifications(void_agent, atproto_client) 469 logger.debug("Sleeping, no notifications were detected") 470 sleep(FETCH_NOTIFICATIONS_DELAY_SEC) 471 472 except KeyboardInterrupt: 473 logger.info("Bot stopped by user") 474 break 475 except Exception as e: 476 logger.error(f"Error in main loop: {e}") 477 # Wait a bit longer on errors 478 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2) 479 480 481if __name__ == "__main__": 482 main()