this repo has no description
40
fork

Configure Feed

Select the types of activity you want to include in your feed.

at e942e7c4e5eb64838b00eccd62079eda62490867 2316 lines 101 kB view raw
1import os 2import logging 3import requests 4import yaml 5import json 6import hashlib 7import random 8import time 9from typing import Optional, Dict, Any, List, Set 10from datetime import datetime 11from pathlib import Path 12from requests_oauthlib import OAuth1 13from rich import print as rprint 14from rich.panel import Panel 15from rich.text import Text 16 17import bsky_utils 18 19class XRateLimitError(Exception): 20 """Exception raised when X API rate limit is exceeded""" 21 pass 22 23 24# Configure logging 25logging.basicConfig( 26 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 27) 28logger = logging.getLogger("x_client") 29 30# X-specific file paths 31X_QUEUE_DIR = Path("x_queue") 32X_CACHE_DIR = Path("x_cache") 33X_PROCESSED_MENTIONS_FILE = Path("x_queue/processed_mentions.json") 34X_LAST_SEEN_FILE = Path("x_queue/last_seen_id.json") 35X_DOWNRANK_USERS_FILE = Path("x_downrank_users.txt") 36 37class XClient: 38 """X (Twitter) API client for fetching mentions and managing interactions.""" 39 40 def __init__(self, api_key: str, user_id: str, access_token: str = None, 41 consumer_key: str = None, consumer_secret: str = None, 42 access_token_secret: str = None): 43 self.api_key = api_key 44 self.access_token = access_token 45 self.user_id = user_id 46 self.base_url = "https://api.x.com/2" 47 48 # Check if we have OAuth 1.0a credentials 49 if (consumer_key and consumer_secret and access_token and access_token_secret): 50 # Use OAuth 1.0a for User Context 51 self.oauth = OAuth1( 52 consumer_key, 53 client_secret=consumer_secret, 54 resource_owner_key=access_token, 55 resource_owner_secret=access_token_secret 56 ) 57 self.headers = {"Content-Type": "application/json"} 58 self.auth_method = "oauth1a" 59 logger.info("Using OAuth 1.0a User Context authentication for X API") 60 elif access_token: 61 # Use OAuth 2.0 Bearer token for User Context 62 self.oauth = None 63 self.headers = { 64 "Authorization": f"Bearer {access_token}", 65 "Content-Type": "application/json" 66 } 67 self.auth_method = "oauth2_user" 68 logger.info("Using OAuth 2.0 User Context access token for X API") 69 else: 70 # Use Application-Only Bearer token 71 self.oauth = None 72 self.headers = { 73 "Authorization": f"Bearer {api_key}", 74 "Content-Type": "application/json" 75 } 76 self.auth_method = "bearer" 77 logger.info("Using Application-Only Bearer token for X API") 78 79 def _make_request(self, endpoint: str, params: Optional[Dict] = None, method: str = "GET", data: Optional[Dict] = None, max_retries: int = 3) -> Optional[Dict]: 80 """Make a request to the X API with proper error handling and exponential backoff.""" 81 url = f"{self.base_url}{endpoint}" 82 83 for attempt in range(max_retries): 84 try: 85 if method.upper() == "GET": 86 if self.oauth: 87 response = requests.get(url, headers=self.headers, params=params, auth=self.oauth) 88 else: 89 response = requests.get(url, headers=self.headers, params=params) 90 elif method.upper() == "POST": 91 if self.oauth: 92 response = requests.post(url, headers=self.headers, json=data, auth=self.oauth) 93 else: 94 response = requests.post(url, headers=self.headers, json=data) 95 else: 96 raise ValueError(f"Unsupported HTTP method: {method}") 97 98 response.raise_for_status() 99 return response.json() 100 101 except requests.exceptions.HTTPError as e: 102 if response.status_code == 401: 103 logger.error(f"X API authentication failed with {self.auth_method} - check your credentials") 104 logger.error(f"Response: {response.text}") 105 return None # Don't retry auth failures 106 elif response.status_code == 403: 107 logger.error(f"X API forbidden with {self.auth_method} - check app permissions") 108 logger.error(f"Response: {response.text}") 109 return None # Don't retry permission failures 110 elif response.status_code == 429: 111 if attempt < max_retries - 1: 112 # Exponential backoff: 60s, 120s, 240s 113 backoff_time = 60 * (2 ** attempt) 114 logger.warning(f"X API rate limit exceeded (attempt {attempt + 1}/{max_retries}) - waiting {backoff_time}s before retry") 115 logger.error(f"Response: {response.text}") 116 time.sleep(backoff_time) 117 continue 118 else: 119 logger.error("X API rate limit exceeded - max retries reached") 120 logger.error(f"Response: {response.text}") 121 raise XRateLimitError("X API rate limit exceeded") 122 else: 123 if attempt < max_retries - 1: 124 # Exponential backoff for other HTTP errors too 125 backoff_time = 30 * (2 ** attempt) 126 logger.warning(f"X API request failed (attempt {attempt + 1}/{max_retries}): {e} - retrying in {backoff_time}s") 127 logger.error(f"Response: {response.text}") 128 time.sleep(backoff_time) 129 continue 130 else: 131 logger.error(f"X API request failed after {max_retries} attempts: {e}") 132 logger.error(f"Response: {response.text}") 133 return None 134 135 except Exception as e: 136 if attempt < max_retries - 1: 137 backoff_time = 15 * (2 ** attempt) 138 logger.warning(f"Unexpected error making X API request (attempt {attempt + 1}/{max_retries}): {e} - retrying in {backoff_time}s") 139 time.sleep(backoff_time) 140 continue 141 else: 142 logger.error(f"Unexpected error making X API request after {max_retries} attempts: {e}") 143 return None 144 145 return None 146 147 def get_mentions(self, since_id: Optional[str] = None, max_results: int = 10) -> Optional[Dict]: 148 """ 149 Fetch mentions for the configured user with user data. 150 151 Args: 152 since_id: Minimum Post ID to include (for getting newer mentions) 153 max_results: Number of results to return (5-100) 154 155 Returns: 156 Dict with 'mentions' and 'users' keys, or None if request failed 157 """ 158 endpoint = f"/users/{self.user_id}/mentions" 159 params = { 160 "max_results": min(max(max_results, 5), 100), # Ensure within API limits 161 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets", 162 "user.fields": "id,name,username", 163 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id" 164 } 165 166 if since_id: 167 params["since_id"] = since_id 168 169 logger.info(f"Fetching mentions for user {self.user_id}") 170 response = self._make_request(endpoint, params) 171 172 if response: 173 logger.debug(f"X API response: {response}") 174 175 if response and "data" in response: 176 mentions = response["data"] 177 users_data = {} 178 179 # Extract user data from includes 180 if "includes" in response and "users" in response["includes"]: 181 for user in response["includes"]["users"]: 182 users_data[user["id"]] = user 183 logger.info(f"Retrieved user data for {len(users_data)} users") 184 185 logger.info(f"Retrieved {len(mentions)} mentions") 186 return {"mentions": mentions, "users": users_data} 187 else: 188 if response: 189 logger.info(f"No mentions in response. Full response: {response}") 190 else: 191 logger.warning("Request failed - no response received") 192 return {"mentions": [], "users": {}} 193 194 def get_user_info(self, user_id: str) -> Optional[Dict]: 195 """Get information about a specific user.""" 196 endpoint = f"/users/{user_id}" 197 params = { 198 "user.fields": "id,name,username,description,public_metrics" 199 } 200 201 response = self._make_request(endpoint, params) 202 return response.get("data") if response else None 203 204 def search_mentions(self, username: str, max_results: int = 10, since_id: str = None) -> Optional[List[Dict]]: 205 """ 206 Search for mentions using the search endpoint instead of mentions endpoint. 207 This might have better rate limits than the direct mentions endpoint. 208 209 Args: 210 username: Username to search for mentions of (without @) 211 max_results: Number of results to return (10-100) 212 since_id: Only return results newer than this tweet ID 213 214 Returns: 215 List of tweets mentioning the username 216 """ 217 endpoint = "/tweets/search/recent" 218 219 # Search for mentions of the username 220 query = f"@{username}" 221 222 params = { 223 "query": query, 224 "max_results": min(max(max_results, 10), 100), 225 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 226 "user.fields": "id,name,username", 227 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id" 228 } 229 230 if since_id: 231 params["since_id"] = since_id 232 233 logger.info(f"Searching for mentions of @{username}") 234 response = self._make_request(endpoint, params) 235 236 if response and "data" in response: 237 tweets = response["data"] 238 logger.info(f"Found {len(tweets)} mentions via search") 239 return tweets 240 else: 241 if response: 242 logger.info(f"No mentions found via search. Response: {response}") 243 else: 244 logger.warning("Search request failed") 245 return [] 246 247 def get_thread_context(self, conversation_id: str, use_cache: bool = True, until_id: Optional[str] = None) -> Optional[List[Dict]]: 248 """ 249 Get all tweets in a conversation thread up to a specific tweet ID. 250 251 Args: 252 conversation_id: The conversation ID to fetch (should be the original tweet ID) 253 use_cache: Whether to use cached data if available 254 until_id: Optional tweet ID to use as upper bound (excludes posts after this ID) 255 256 Returns: 257 List of tweets in the conversation, ordered chronologically 258 """ 259 # Check cache first if enabled 260 if use_cache: 261 cached_data = get_cached_thread_context(conversation_id) 262 if cached_data: 263 return cached_data 264 265 # First, get the original tweet directly since it might not appear in conversation search 266 original_tweet = None 267 try: 268 endpoint = f"/tweets/{conversation_id}" 269 params = { 270 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 271 "user.fields": "id,name,username", 272 "expansions": "author_id" 273 } 274 response = self._make_request(endpoint, params) 275 if response and "data" in response: 276 original_tweet = response["data"] 277 logger.info(f"Retrieved original tweet: {original_tweet.get('id')}") 278 except Exception as e: 279 logger.warning(f"Could not fetch original tweet {conversation_id}: {e}") 280 281 # Then search for all tweets in this conversation 282 endpoint = "/tweets/search/recent" 283 params = { 284 "query": f"conversation_id:{conversation_id}", 285 "max_results": 100, # Get as many as possible 286 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 287 "user.fields": "id,name,username", 288 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id", 289 "sort_order": "recency" # Get newest first, we'll reverse later 290 } 291 292 # Add until_id parameter to exclude tweets after the mention being processed 293 if until_id: 294 params["until_id"] = until_id 295 logger.info(f"Using until_id={until_id} to exclude future tweets") 296 297 logger.info(f"Fetching thread context for conversation {conversation_id}") 298 response = self._make_request(endpoint, params) 299 300 tweets = [] 301 users_data = {} 302 303 # Collect tweets from search 304 if response and "data" in response: 305 tweets.extend(response["data"]) 306 # Store user data for reference 307 if "includes" in response and "users" in response["includes"]: 308 for user in response["includes"]["users"]: 309 users_data[user["id"]] = user 310 311 # Add original tweet if we got it and it's not already in the list 312 if original_tweet: 313 tweet_ids = [t.get('id') for t in tweets] 314 if original_tweet.get('id') not in tweet_ids: 315 tweets.append(original_tweet) 316 logger.info("Added original tweet to thread context") 317 318 # Attempt to fill gaps by fetching referenced tweets that are missing 319 # This helps with X API's incomplete conversation search results 320 tweet_ids = set(t.get('id') for t in tweets) 321 missing_tweet_ids = set() 322 critical_missing_ids = set() 323 324 # Collect referenced tweet IDs, prioritizing critical ones 325 for tweet in tweets: 326 referenced_tweets = tweet.get('referenced_tweets', []) 327 for ref in referenced_tweets: 328 ref_id = ref.get('id') 329 ref_type = ref.get('type') 330 if ref_id and ref_id not in tweet_ids: 331 missing_tweet_ids.add(ref_id) 332 # Prioritize direct replies and quoted tweets over retweets 333 if ref_type in ['replied_to', 'quoted']: 334 critical_missing_ids.add(ref_id) 335 336 # For rate limit efficiency, only fetch critical missing tweets if we have many 337 if len(missing_tweet_ids) > 10: 338 logger.info(f"Many missing tweets ({len(missing_tweet_ids)}), prioritizing {len(critical_missing_ids)} critical ones") 339 missing_tweet_ids = critical_missing_ids 340 341 # Context sufficiency check - skip backfill if we already have enough context 342 if has_sufficient_context(tweets, missing_tweet_ids): 343 logger.info("Thread has sufficient context, skipping missing tweet backfill") 344 missing_tweet_ids = set() 345 346 # Fetch missing referenced tweets in batches (more rate-limit friendly) 347 if missing_tweet_ids: 348 missing_list = list(missing_tweet_ids) 349 350 # First, check cache for missing tweets 351 cached_tweets = get_cached_tweets(missing_list) 352 for tweet_id, cached_tweet in cached_tweets.items(): 353 if cached_tweet.get('conversation_id') == conversation_id: 354 tweets.append(cached_tweet) 355 tweet_ids.add(tweet_id) 356 logger.info(f"Retrieved missing tweet from cache: {tweet_id}") 357 358 # Add user data if available in cache 359 if cached_tweet.get('author_info'): 360 author_id = cached_tweet.get('author_id') 361 if author_id: 362 users_data[author_id] = cached_tweet['author_info'] 363 364 # Only fetch tweets that weren't found in cache 365 uncached_ids = [tid for tid in missing_list if tid not in cached_tweets] 366 367 if uncached_ids: 368 batch_size = 100 # X API limit for bulk tweet lookup 369 370 for i in range(0, len(uncached_ids), batch_size): 371 batch_ids = uncached_ids[i:i + batch_size] 372 try: 373 endpoint = "/tweets" 374 params = { 375 "ids": ",".join(batch_ids), 376 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 377 "user.fields": "id,name,username", 378 "expansions": "author_id" 379 } 380 response = self._make_request(endpoint, params) 381 382 if response and "data" in response: 383 fetched_tweets = [] 384 batch_users_data = {} 385 386 for missing_tweet in response["data"]: 387 # Only add if it's actually part of this conversation 388 if missing_tweet.get('conversation_id') == conversation_id: 389 tweets.append(missing_tweet) 390 tweet_ids.add(missing_tweet.get('id')) 391 fetched_tweets.append(missing_tweet) 392 logger.info(f"Retrieved missing referenced tweet: {missing_tweet.get('id')}") 393 394 # Add user data if available 395 if "includes" in response and "users" in response["includes"]: 396 for user in response["includes"]["users"]: 397 users_data[user["id"]] = user 398 batch_users_data[user["id"]] = user 399 400 # Cache the newly fetched tweets 401 if fetched_tweets: 402 save_cached_tweets(fetched_tweets, batch_users_data) 403 404 logger.info(f"Batch fetched {len(response['data'])} missing tweets from {len(batch_ids)} requested") 405 406 # Handle partial success - log any missing tweets that weren't found 407 if response and "errors" in response: 408 for error in response["errors"]: 409 logger.warning(f"Could not fetch tweet {error.get('resource_id')}: {error.get('title')}") 410 411 except Exception as e: 412 logger.warning(f"Could not fetch batch of missing tweets {batch_ids[:3]}...: {e}") 413 else: 414 logger.info(f"All {len(missing_list)} missing tweets found in cache") 415 416 if tweets: 417 # Filter out tweets that occur after until_id (if specified) 418 if until_id: 419 original_count = len(tweets) 420 # Convert until_id to int for comparison (Twitter IDs are sequential) 421 until_id_int = int(until_id) 422 tweets = [t for t in tweets if int(t.get('id', '0')) <= until_id_int] 423 filtered_count = len(tweets) 424 if original_count != filtered_count: 425 logger.info(f"Filtered out {original_count - filtered_count} tweets after until_id {until_id}") 426 427 # Sort chronologically (oldest first) 428 tweets.sort(key=lambda x: x.get('created_at', '')) 429 logger.info(f"Retrieved {len(tweets)} tweets in thread") 430 431 thread_data = {"tweets": tweets, "users": users_data} 432 433 # Cache individual tweets from the thread for future backfill 434 save_cached_tweets(tweets, users_data) 435 436 # Cache the result 437 if use_cache: 438 save_cached_thread_context(conversation_id, thread_data) 439 440 return thread_data 441 else: 442 logger.warning("No tweets found for thread context") 443 return None 444 445 def post_reply(self, reply_text: str, in_reply_to_tweet_id: str) -> Optional[Dict]: 446 """ 447 Post a reply to a specific tweet. 448 449 Args: 450 reply_text: The text content of the reply 451 in_reply_to_tweet_id: The ID of the tweet to reply to 452 453 Returns: 454 Response data if successful, None if failed 455 """ 456 endpoint = "/tweets" 457 458 payload = { 459 "text": reply_text, 460 "reply": { 461 "in_reply_to_tweet_id": in_reply_to_tweet_id 462 } 463 } 464 465 logger.info(f"Attempting to post reply with {self.auth_method} authentication") 466 result = self._make_request(endpoint, method="POST", data=payload) 467 468 if result: 469 logger.info(f"Successfully posted reply to tweet {in_reply_to_tweet_id}") 470 return result 471 else: 472 logger.error("Failed to post reply") 473 return None 474 475 def post_tweet(self, tweet_text: str) -> Optional[Dict]: 476 """ 477 Post a standalone tweet (not a reply). 478 479 Args: 480 tweet_text: The text content of the tweet (max 280 characters) 481 482 Returns: 483 Response data if successful, None if failed 484 """ 485 endpoint = "/tweets" 486 487 # Validate tweet length 488 if len(tweet_text) > 280: 489 logger.error(f"Tweet text too long: {len(tweet_text)} characters (max 280)") 490 return None 491 492 payload = { 493 "text": tweet_text 494 } 495 496 logger.info(f"Attempting to post tweet with {self.auth_method} authentication") 497 result = self._make_request(endpoint, method="POST", data=payload) 498 499 if result: 500 logger.info(f"Successfully posted tweet") 501 return result 502 else: 503 logger.error("Failed to post tweet") 504 return None 505 506def load_x_config(config_path: str = "config.yaml") -> Dict[str, str]: 507 """Load X configuration from config file.""" 508 try: 509 with open(config_path, 'r') as f: 510 config = yaml.safe_load(f) 511 512 x_config = config.get('x', {}) 513 if not x_config.get('api_key') or not x_config.get('user_id'): 514 raise ValueError("X API key and user_id must be configured in config.yaml") 515 516 return x_config 517 except Exception as e: 518 logger.error(f"Failed to load X configuration: {e}") 519 raise 520 521def create_x_client(config_path: str = "config.yaml") -> XClient: 522 """Create and return an X client with configuration loaded from file.""" 523 config = load_x_config(config_path) 524 return XClient( 525 api_key=config['api_key'], 526 user_id=config['user_id'], 527 access_token=config.get('access_token'), 528 consumer_key=config.get('consumer_key'), 529 consumer_secret=config.get('consumer_secret'), 530 access_token_secret=config.get('access_token_secret') 531 ) 532 533def mention_to_yaml_string(mention: Dict, users_data: Optional[Dict] = None) -> str: 534 """ 535 Convert a mention object to a YAML string for better AI comprehension. 536 Similar to thread_to_yaml_string in bsky_utils.py 537 """ 538 # Extract relevant fields 539 simplified_mention = { 540 'id': mention.get('id'), 541 'text': mention.get('text'), 542 'author_id': mention.get('author_id'), 543 'created_at': mention.get('created_at'), 544 'in_reply_to_user_id': mention.get('in_reply_to_user_id') 545 } 546 547 # Add user information if available 548 if users_data and mention.get('author_id') in users_data: 549 user = users_data[mention.get('author_id')] 550 simplified_mention['author'] = { 551 'username': user.get('username'), 552 'name': user.get('name') 553 } 554 555 return yaml.dump(simplified_mention, default_flow_style=False, sort_keys=False) 556 557def thread_to_yaml_string(thread_data: Dict) -> str: 558 """ 559 Convert X thread context to YAML string for AI comprehension. 560 Similar to Bluesky's thread_to_yaml_string function. 561 562 Args: 563 thread_data: Dict with 'tweets' and 'users' keys from get_thread_context() 564 565 Returns: 566 YAML string representation of the thread 567 """ 568 if not thread_data or "tweets" not in thread_data: 569 return "conversation: []\n" 570 571 tweets = thread_data["tweets"] 572 users_data = thread_data.get("users", {}) 573 574 simplified_thread = { 575 "conversation": [] 576 } 577 578 for tweet in tweets: 579 # Get user info 580 author_id = tweet.get('author_id') 581 author_info = {} 582 if author_id and author_id in users_data: 583 user = users_data[author_id] 584 author_info = { 585 'username': user.get('username'), 586 'name': user.get('name') 587 } 588 589 # Build tweet object (simplified for AI consumption) 590 tweet_obj = { 591 'text': tweet.get('text'), 592 'created_at': tweet.get('created_at'), 593 'author': author_info, 594 'author_id': author_id # Include user ID for block management 595 } 596 597 simplified_thread["conversation"].append(tweet_obj) 598 599 return yaml.dump(simplified_thread, default_flow_style=False, sort_keys=False) 600 601 602def ensure_x_user_blocks_attached(thread_data: Dict, agent_id: Optional[str] = None) -> None: 603 """ 604 Ensure all users in the thread have their X user blocks attached. 605 Creates blocks with initial content including their handle if they don't exist. 606 607 Args: 608 thread_data: Dict with 'tweets' and 'users' keys from get_thread_context() 609 agent_id: The Letta agent ID to attach blocks to (defaults to config agent_id) 610 """ 611 if not thread_data or "users" not in thread_data: 612 return 613 614 try: 615 from tools.blocks import attach_x_user_blocks, x_user_note_set 616 from config_loader import get_letta_config 617 from letta_client import Letta 618 619 # Get Letta client and agent_id from config 620 config = get_letta_config() 621 client = Letta(token=config['api_key'], timeout=config['timeout']) 622 623 # Use provided agent_id or get from config 624 if agent_id is None: 625 agent_id = config['agent_id'] 626 627 # Get agent info to create a mock agent_state for the functions 628 class MockAgentState: 629 def __init__(self, agent_id): 630 self.id = agent_id 631 632 agent_state = MockAgentState(agent_id) 633 634 users_data = thread_data["users"] 635 user_ids = list(users_data.keys()) 636 637 if not user_ids: 638 return 639 640 logger.info(f"Ensuring X user blocks for {len(user_ids)} users: {user_ids}") 641 642 # Get current blocks to check which users already have blocks with content 643 current_blocks = client.agents.blocks.list(agent_id=agent_id) 644 existing_user_blocks = {} 645 646 for block in current_blocks: 647 if block.label.startswith("x_user_"): 648 user_id = block.label.replace("x_user_", "") 649 existing_user_blocks[user_id] = block 650 651 # Attach all user blocks (this will create missing ones with basic content) 652 attach_result = attach_x_user_blocks(user_ids, agent_state) 653 logger.info(f"X user block attachment result: {attach_result}") 654 655 # For newly created blocks, update with user handle information 656 for user_id in user_ids: 657 if user_id not in existing_user_blocks: 658 user_info = users_data[user_id] 659 username = user_info.get('username', 'unknown') 660 name = user_info.get('name', 'Unknown') 661 662 # Set initial content with handle information 663 initial_content = f"# X User: {user_id}\n\n**Handle:** @{username}\n**Name:** {name}\n\nNo additional information about this user yet." 664 665 try: 666 x_user_note_set(user_id, initial_content, agent_state) 667 logger.info(f"Set initial content for X user {user_id} (@{username})") 668 except Exception as e: 669 logger.error(f"Failed to set initial content for X user {user_id}: {e}") 670 671 except Exception as e: 672 logger.error(f"Error ensuring X user blocks: {e}") 673 674 675# X Caching and Queue System Functions 676 677def load_last_seen_id() -> Optional[str]: 678 """Load the last seen mention ID for incremental fetching.""" 679 if X_LAST_SEEN_FILE.exists(): 680 try: 681 with open(X_LAST_SEEN_FILE, 'r') as f: 682 data = json.load(f) 683 return data.get('last_seen_id') 684 except Exception as e: 685 logger.error(f"Error loading last seen ID: {e}") 686 return None 687 688def save_last_seen_id(mention_id: str): 689 """Save the last seen mention ID.""" 690 try: 691 X_QUEUE_DIR.mkdir(exist_ok=True) 692 with open(X_LAST_SEEN_FILE, 'w') as f: 693 json.dump({ 694 'last_seen_id': mention_id, 695 'updated_at': datetime.now().isoformat() 696 }, f) 697 logger.debug(f"Saved last seen ID: {mention_id}") 698 except Exception as e: 699 logger.error(f"Error saving last seen ID: {e}") 700 701def load_processed_mentions() -> set: 702 """Load the set of processed mention IDs.""" 703 if X_PROCESSED_MENTIONS_FILE.exists(): 704 try: 705 with open(X_PROCESSED_MENTIONS_FILE, 'r') as f: 706 data = json.load(f) 707 # Keep only recent entries (last 10000) 708 if len(data) > 10000: 709 data = data[-10000:] 710 save_processed_mentions(set(data)) 711 return set(data) 712 except Exception as e: 713 logger.error(f"Error loading processed mentions: {e}") 714 return set() 715 716def save_processed_mentions(processed_set: set): 717 """Save the set of processed mention IDs.""" 718 try: 719 X_QUEUE_DIR.mkdir(exist_ok=True) 720 with open(X_PROCESSED_MENTIONS_FILE, 'w') as f: 721 json.dump(list(processed_set), f) 722 except Exception as e: 723 logger.error(f"Error saving processed mentions: {e}") 724 725def load_downrank_users() -> Set[str]: 726 """Load the set of user IDs that should be downranked (responded to 10% of the time).""" 727 try: 728 if not X_DOWNRANK_USERS_FILE.exists(): 729 return set() 730 731 downrank_users = set() 732 with open(X_DOWNRANK_USERS_FILE, 'r') as f: 733 for line in f: 734 line = line.strip() 735 # Skip empty lines and comments 736 if line and not line.startswith('#'): 737 downrank_users.add(line) 738 739 logger.info(f"Loaded {len(downrank_users)} downrank users") 740 return downrank_users 741 except Exception as e: 742 logger.error(f"Error loading downrank users: {e}") 743 return set() 744 745def should_respond_to_downranked_user(user_id: str, downrank_users: Set[str]) -> bool: 746 """ 747 Check if we should respond to a downranked user. 748 Returns True 10% of the time for downranked users, True 100% of the time for others. 749 """ 750 if user_id not in downrank_users: 751 return True 752 753 # 10% chance for downranked users 754 should_respond = random.random() < 0.1 755 logger.info(f"Downranked user {user_id}: {'responding' if should_respond else 'skipping'} (10% chance)") 756 return should_respond 757 758def save_mention_to_queue(mention: Dict, users_data: Optional[Dict] = None): 759 """Save a mention to the queue directory for async processing with author info. 760 761 Args: 762 mention: The mention data from X API 763 users_data: Optional dict mapping user IDs to user data (including usernames) 764 """ 765 try: 766 mention_id = mention.get('id') 767 if not mention_id: 768 logger.error("Mention missing ID, cannot queue") 769 return 770 771 # Check if already processed 772 processed_mentions = load_processed_mentions() 773 if mention_id in processed_mentions: 774 logger.debug(f"Mention {mention_id} already processed, skipping") 775 return 776 777 # Create queue directory 778 X_QUEUE_DIR.mkdir(exist_ok=True) 779 780 # Create filename using hash (similar to Bluesky system) 781 mention_str = json.dumps(mention, sort_keys=True) 782 mention_hash = hashlib.sha256(mention_str.encode()).hexdigest()[:16] 783 filename = f"x_mention_{mention_hash}.json" 784 785 queue_file = X_QUEUE_DIR / filename 786 787 # Extract author info if users_data provided 788 author_id = mention.get('author_id') 789 author_username = None 790 author_name = None 791 792 if users_data and author_id and author_id in users_data: 793 user_info = users_data[author_id] 794 author_username = user_info.get('username') 795 author_name = user_info.get('name') 796 logger.info(f"Caching author info for @{author_username} ({author_name})") 797 798 # Save mention data with enhanced debugging information 799 mention_data = { 800 'mention': mention, 801 'queued_at': datetime.now().isoformat(), 802 'type': 'x_mention', 803 # Cache author info for later use 804 'author_info': { 805 'username': author_username, 806 'name': author_name, 807 'id': author_id 808 }, 809 # Debug info for conversation tracking 810 'debug_info': { 811 'mention_id': mention.get('id'), 812 'author_id': mention.get('author_id'), 813 'conversation_id': mention.get('conversation_id'), 814 'in_reply_to_user_id': mention.get('in_reply_to_user_id'), 815 'referenced_tweets': mention.get('referenced_tweets', []), 816 'text_preview': mention.get('text', '')[:200], 817 'created_at': mention.get('created_at'), 818 'public_metrics': mention.get('public_metrics', {}), 819 'context_annotations': mention.get('context_annotations', []) 820 } 821 } 822 823 with open(queue_file, 'w') as f: 824 json.dump(mention_data, f, indent=2) 825 826 logger.info(f"Queued X mention {mention_id} -> {filename}") 827 828 except Exception as e: 829 logger.error(f"Error saving mention to queue: {e}") 830 831# X Cache Functions 832def get_cached_thread_context(conversation_id: str) -> Optional[Dict]: 833 """Load cached thread context if available.""" 834 cache_file = X_CACHE_DIR / f"thread_{conversation_id}.json" 835 if cache_file.exists(): 836 try: 837 with open(cache_file, 'r') as f: 838 cached_data = json.load(f) 839 # Check if cache is recent (within 1 hour) 840 from datetime import datetime, timedelta 841 cached_time = datetime.fromisoformat(cached_data.get('cached_at', '')) 842 if datetime.now() - cached_time < timedelta(hours=1): 843 logger.info(f"Using cached thread context for {conversation_id}") 844 return cached_data.get('thread_data') 845 except Exception as e: 846 logger.warning(f"Error loading cached thread context: {e}") 847 return None 848 849def save_cached_thread_context(conversation_id: str, thread_data: Dict): 850 """Save thread context to cache.""" 851 try: 852 X_CACHE_DIR.mkdir(exist_ok=True) 853 cache_file = X_CACHE_DIR / f"thread_{conversation_id}.json" 854 855 cache_data = { 856 'conversation_id': conversation_id, 857 'thread_data': thread_data, 858 'cached_at': datetime.now().isoformat() 859 } 860 861 with open(cache_file, 'w') as f: 862 json.dump(cache_data, f, indent=2) 863 864 logger.debug(f"Cached thread context for {conversation_id}") 865 except Exception as e: 866 logger.error(f"Error caching thread context: {e}") 867 868def get_cached_tweets(tweet_ids: List[str]) -> Dict[str, Dict]: 869 """ 870 Load cached individual tweets if available. 871 Returns dict mapping tweet_id -> tweet_data for found tweets. 872 """ 873 cached_tweets = {} 874 875 for tweet_id in tweet_ids: 876 cache_file = X_CACHE_DIR / f"tweet_{tweet_id}.json" 877 if cache_file.exists(): 878 try: 879 with open(cache_file, 'r') as f: 880 cached_data = json.load(f) 881 882 # Use longer cache times for older tweets (24 hours vs 1 hour) 883 from datetime import datetime, timedelta 884 cached_time = datetime.fromisoformat(cached_data.get('cached_at', '')) 885 tweet_created = cached_data.get('tweet_data', {}).get('created_at', '') 886 887 # Parse tweet creation time to determine age 888 try: 889 from dateutil.parser import parse 890 tweet_age = datetime.now() - parse(tweet_created) 891 cache_duration = timedelta(hours=24) if tweet_age > timedelta(hours=24) else timedelta(hours=1) 892 except: 893 cache_duration = timedelta(hours=1) # Default to 1 hour if parsing fails 894 895 if datetime.now() - cached_time < cache_duration: 896 cached_tweets[tweet_id] = cached_data.get('tweet_data') 897 logger.debug(f"Using cached tweet {tweet_id}") 898 899 except Exception as e: 900 logger.warning(f"Error loading cached tweet {tweet_id}: {e}") 901 902 return cached_tweets 903 904def save_cached_tweets(tweets_data: List[Dict], users_data: Dict[str, Dict] = None): 905 """Save individual tweets to cache for future reuse.""" 906 try: 907 X_CACHE_DIR.mkdir(exist_ok=True) 908 909 for tweet in tweets_data: 910 tweet_id = tweet.get('id') 911 if not tweet_id: 912 continue 913 914 cache_file = X_CACHE_DIR / f"tweet_{tweet_id}.json" 915 916 # Include user data if available 917 tweet_with_user = tweet.copy() 918 if users_data and tweet.get('author_id') in users_data: 919 tweet_with_user['author_info'] = users_data[tweet.get('author_id')] 920 921 cache_data = { 922 'tweet_id': tweet_id, 923 'tweet_data': tweet_with_user, 924 'cached_at': datetime.now().isoformat() 925 } 926 927 with open(cache_file, 'w') as f: 928 json.dump(cache_data, f, indent=2) 929 930 logger.debug(f"Cached individual tweet {tweet_id}") 931 932 except Exception as e: 933 logger.error(f"Error caching individual tweets: {e}") 934 935def has_sufficient_context(tweets: List[Dict], missing_tweet_ids: Set[str]) -> bool: 936 """ 937 Determine if we have sufficient context to skip backfilling missing tweets. 938 939 Args: 940 tweets: List of tweets already in the thread 941 missing_tweet_ids: Set of missing tweet IDs we'd like to fetch 942 943 Returns: 944 True if context is sufficient, False if backfill is needed 945 """ 946 # If no missing tweets, context is sufficient 947 if not missing_tweet_ids: 948 return True 949 950 # If we have a substantial conversation (5+ tweets), likely sufficient 951 if len(tweets) >= 5: 952 logger.debug(f"Thread has {len(tweets)} tweets, considering sufficient") 953 return True 954 955 # If only a few missing tweets and we have some context, might be enough 956 if len(missing_tweet_ids) <= 2 and len(tweets) >= 3: 957 logger.debug(f"Only {len(missing_tweet_ids)} missing tweets with {len(tweets)} existing, considering sufficient") 958 return True 959 960 # Check if we have conversational flow (mentions between users) 961 has_conversation_flow = False 962 for tweet in tweets: 963 text = tweet.get('text', '').lower() 964 # Look for mentions, replies, or conversational indicators 965 if '@' in text or 'reply' in text or len([t for t in tweets if t.get('author_id') != tweet.get('author_id')]) > 1: 966 has_conversation_flow = True 967 break 968 969 # If we have clear conversational flow and reasonable length, sufficient 970 if has_conversation_flow and len(tweets) >= 2: 971 logger.debug("Thread has conversational flow, considering sufficient") 972 return True 973 974 # Otherwise, we need to backfill 975 logger.debug(f"Context insufficient: {len(tweets)} tweets, {len(missing_tweet_ids)} missing, no clear flow") 976 return False 977 978def fetch_and_queue_mentions(username: str) -> int: 979 """ 980 Single-pass function to fetch new mentions and queue them. 981 Returns number of new mentions found. 982 """ 983 try: 984 client = create_x_client() 985 986 # Load last seen ID for incremental fetching 987 last_seen_id = load_last_seen_id() 988 989 logger.info(f"Fetching mentions for @{username} since {last_seen_id or 'beginning'}") 990 991 # Search for mentions 992 # Get mentions with user data 993 result = client.get_mentions( 994 since_id=last_seen_id, 995 max_results=100 # Get as many as possible 996 ) 997 998 if not result or not result["mentions"]: 999 logger.info("No new mentions found") 1000 return 0 1001 1002 mentions = result["mentions"] 1003 users_data = result["users"] 1004 1005 # Process mentions (newest first, so reverse to process oldest first) 1006 mentions.reverse() 1007 new_count = 0 1008 1009 for mention in mentions: 1010 save_mention_to_queue(mention, users_data) 1011 new_count += 1 1012 1013 # Update last seen ID to the most recent mention 1014 if mentions: 1015 most_recent_id = mentions[-1]['id'] # Last after reverse = most recent 1016 save_last_seen_id(most_recent_id) 1017 1018 logger.info(f"Queued {new_count} new X mentions") 1019 return new_count 1020 1021 except Exception as e: 1022 logger.error(f"Error fetching and queuing mentions: {e}") 1023 return 0 1024 1025# Simple test function 1026def get_my_user_info(): 1027 """Get the authenticated user's information to find correct user ID.""" 1028 try: 1029 client = create_x_client() 1030 1031 # Use the /2/users/me endpoint to get authenticated user info 1032 endpoint = "/users/me" 1033 params = { 1034 "user.fields": "id,name,username,description" 1035 } 1036 1037 print("Fetching authenticated user information...") 1038 response = client._make_request(endpoint, params=params) 1039 1040 if response and "data" in response: 1041 user_data = response["data"] 1042 print(f"✅ Found authenticated user:") 1043 print(f" ID: {user_data.get('id')}") 1044 print(f" Username: @{user_data.get('username')}") 1045 print(f" Name: {user_data.get('name')}") 1046 print(f" Description: {user_data.get('description', 'N/A')[:100]}...") 1047 print(f"\n🔧 Update your config.yaml with:") 1048 print(f" user_id: \"{user_data.get('id')}\"") 1049 return user_data 1050 else: 1051 print("❌ Failed to get user information") 1052 print(f"Response: {response}") 1053 return None 1054 1055 except Exception as e: 1056 print(f"Error getting user info: {e}") 1057 return None 1058 1059def test_search_mentions(): 1060 """Test the search-based mention detection.""" 1061 try: 1062 client = create_x_client() 1063 1064 # First get our username 1065 user_info = client._make_request("/users/me", params={"user.fields": "username"}) 1066 if not user_info or "data" not in user_info: 1067 print("❌ Could not get username") 1068 return 1069 1070 username = user_info["data"]["username"] 1071 print(f"🔍 Searching for mentions of @{username}") 1072 1073 mentions = client.search_mentions(username, max_results=5) 1074 1075 if mentions: 1076 print(f"✅ Found {len(mentions)} mentions via search:") 1077 for mention in mentions: 1078 print(f"- {mention.get('id')}: {mention.get('text', '')[:100]}...") 1079 else: 1080 print("No mentions found via search") 1081 1082 except Exception as e: 1083 print(f"Search test failed: {e}") 1084 1085def test_fetch_and_queue(): 1086 """Test the single-pass fetch and queue function.""" 1087 try: 1088 client = create_x_client() 1089 1090 # Get our username 1091 user_info = client._make_request("/users/me", params={"user.fields": "username"}) 1092 if not user_info or "data" not in user_info: 1093 print("❌ Could not get username") 1094 return 1095 1096 username = user_info["data"]["username"] 1097 print(f"🔄 Fetching and queueing mentions for @{username}") 1098 1099 # Show current state 1100 last_seen = load_last_seen_id() 1101 print(f"📍 Last seen ID: {last_seen or 'None (first run)'}") 1102 1103 # Fetch and queue 1104 new_count = fetch_and_queue_mentions(username) 1105 1106 if new_count > 0: 1107 print(f"✅ Queued {new_count} new mentions") 1108 print(f"📁 Check ./x_queue/ directory for queued mentions") 1109 1110 # Show updated state 1111 new_last_seen = load_last_seen_id() 1112 print(f"📍 Updated last seen ID: {new_last_seen}") 1113 else: 1114 print("ℹ️ No new mentions to queue") 1115 1116 except Exception as e: 1117 print(f"Fetch and queue test failed: {e}") 1118 1119def test_thread_context(): 1120 """Test thread context retrieval from a queued mention.""" 1121 try: 1122 import json 1123 1124 # Find a queued mention file 1125 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json")) 1126 if not queue_files: 1127 print("❌ No queued mentions found. Run 'python x.py queue' first.") 1128 return 1129 1130 # Read the first mention 1131 mention_file = queue_files[0] 1132 with open(mention_file, 'r') as f: 1133 mention_data = json.load(f) 1134 1135 mention = mention_data['mention'] 1136 print(f"📄 Using mention: {mention.get('id')}") 1137 print(f"📝 Text: {mention.get('text')}") 1138 1139 # Check if it has a conversation_id 1140 conversation_id = mention.get('conversation_id') 1141 if not conversation_id: 1142 print("❌ No conversation_id found in mention. May need to re-queue with updated fetch.") 1143 return 1144 1145 print(f"🧵 Getting thread context for conversation: {conversation_id}") 1146 1147 # Get thread context 1148 client = create_x_client() 1149 thread_data = client.get_thread_context(conversation_id) 1150 1151 if thread_data: 1152 tweets = thread_data.get('tweets', []) 1153 print(f"✅ Retrieved thread with {len(tweets)} tweets") 1154 1155 # Convert to YAML 1156 yaml_thread = thread_to_yaml_string(thread_data) 1157 1158 # Save thread context for inspection 1159 thread_file = X_QUEUE_DIR / f"thread_context_{conversation_id}.yaml" 1160 with open(thread_file, 'w') as f: 1161 f.write(yaml_thread) 1162 1163 print(f"💾 Saved thread context to: {thread_file}") 1164 print("\n📋 Thread preview:") 1165 print(yaml_thread) 1166 else: 1167 print("❌ Failed to retrieve thread context") 1168 1169 except Exception as e: 1170 print(f"Thread context test failed: {e}") 1171 1172def test_letta_integration(agent_id: str = None): 1173 """Test sending X thread context to Letta agent.""" 1174 try: 1175 from letta_client import Letta 1176 import json 1177 import yaml 1178 1179 # Load full config to access letta section 1180 try: 1181 with open("config.yaml", 'r') as f: 1182 full_config = yaml.safe_load(f) 1183 1184 letta_config = full_config.get('letta', {}) 1185 api_key = letta_config.get('api_key') 1186 config_agent_id = letta_config.get('agent_id') 1187 1188 # Use agent_id from config if not provided as parameter 1189 if not agent_id: 1190 if config_agent_id: 1191 agent_id = config_agent_id 1192 print(f"ℹ️ Using agent_id from config: {agent_id}") 1193 else: 1194 print("❌ No agent_id found in config.yaml") 1195 print("Expected config structure:") 1196 print(" letta:") 1197 print(" agent_id: your-agent-id") 1198 return 1199 else: 1200 print(f"ℹ️ Using provided agent_id: {agent_id}") 1201 1202 if not api_key: 1203 # Try loading from environment as fallback 1204 import os 1205 api_key = os.getenv('LETTA_API_KEY') 1206 if not api_key: 1207 print("❌ LETTA_API_KEY not found in config.yaml or environment") 1208 print("Expected config structure:") 1209 print(" letta:") 1210 print(" api_key: your-letta-api-key") 1211 return 1212 else: 1213 print("ℹ️ Using LETTA_API_KEY from environment") 1214 else: 1215 print("ℹ️ Using LETTA_API_KEY from config.yaml") 1216 1217 except Exception as e: 1218 print(f"❌ Error loading config: {e}") 1219 return 1220 1221 letta_client = Letta(token=api_key, timeout=600) 1222 print(f"🤖 Connected to Letta, using agent: {agent_id}") 1223 1224 # Find a queued mention file 1225 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json")) 1226 if not queue_files: 1227 print("❌ No queued mentions found. Run 'python x.py queue' first.") 1228 return 1229 1230 # Read the first mention 1231 mention_file = queue_files[0] 1232 with open(mention_file, 'r') as f: 1233 mention_data = json.load(f) 1234 1235 mention = mention_data['mention'] 1236 conversation_id = mention.get('conversation_id') 1237 1238 if not conversation_id: 1239 print("❌ No conversation_id found in mention.") 1240 return 1241 1242 print(f"🧵 Getting thread context for conversation: {conversation_id}") 1243 1244 # Get thread context 1245 x_client = create_x_client() 1246 thread_data = x_client.get_thread_context(conversation_id) 1247 1248 if not thread_data: 1249 print("❌ Failed to retrieve thread context") 1250 return 1251 1252 # Convert to YAML 1253 yaml_thread = thread_to_yaml_string(thread_data) 1254 1255 # Create prompt for the agent 1256 prompt = f"""You are void, an AI agent operating on X (Twitter). You have received a mention and need to respond appropriately. 1257 1258Here is the thread context: 1259 1260{yaml_thread} 1261 1262Please craft a response that continues this conversation naturally. Keep responses conversational and authentic to your void persona.""" 1263 1264 prompt_char_count = len(prompt) 1265 print(f"📤 Sending thread context to Letta agent... | prompt: {prompt_char_count} chars") 1266 1267 # Print the prompt in a rich panel 1268 rprint(Panel(prompt, title=f"Prompt ({prompt_char_count} chars)", border_style="blue")) 1269 1270 # Send to Letta agent using streaming 1271 message_stream = letta_client.agents.messages.create_stream( 1272 agent_id=agent_id, 1273 messages=[{"role": "user", "content": prompt}], 1274 stream_tokens=False, 1275 max_steps=10 1276 ) 1277 1278 print("🔄 Streaming response from agent...") 1279 response_text = "" 1280 1281 for chunk in message_stream: 1282 print(chunk) 1283 if hasattr(chunk, 'message_type'): 1284 if chunk.message_type == 'assistant_message': 1285 print(f"🤖 Agent response: {chunk.content}") 1286 response_text = chunk.content 1287 elif chunk.message_type == 'reasoning_message': 1288 print(f"💭 Agent reasoning: {chunk.reasoning[:100]}...") 1289 elif chunk.message_type == 'tool_call_message': 1290 print(f"🔧 Agent tool call: {chunk.tool_call.name}") 1291 1292 if response_text: 1293 print(f"\n✅ Agent generated response:") 1294 print(f"📝 Response: {response_text}") 1295 else: 1296 print("❌ No response generated by agent") 1297 1298 except Exception as e: 1299 print(f"Letta integration test failed: {e}") 1300 import traceback 1301 traceback.print_exc() 1302 1303def test_x_client(): 1304 """Test the X client by fetching mentions.""" 1305 try: 1306 client = create_x_client() 1307 result = client.get_mentions(max_results=5) 1308 1309 if result and result["mentions"]: 1310 mentions = result["mentions"] 1311 users_data = result["users"] 1312 print(f"Successfully retrieved {len(mentions)} mentions:") 1313 print(f"User data available for {len(users_data)} users") 1314 for mention in mentions: 1315 author_id = mention.get('author_id') 1316 author_info = users_data.get(author_id, {}) 1317 username = author_info.get('username', 'unknown') 1318 print(f"- {mention.get('id')} from @{username}: {mention.get('text')[:50]}...") 1319 else: 1320 print("No mentions retrieved") 1321 1322 except Exception as e: 1323 print(f"Test failed: {e}") 1324 1325def reply_to_cameron_post(): 1326 """ 1327 Reply to Cameron's specific X post. 1328 1329 NOTE: This requires OAuth User Context authentication, not Bearer token. 1330 Current Bearer token is Application-Only which can't post. 1331 """ 1332 try: 1333 client = create_x_client() 1334 1335 # Cameron's post ID from the URL: https://x.com/cameron_pfiffer/status/1950690566909710618 1336 cameron_post_id = "1950690566909710618" 1337 1338 # Simple reply message 1339 reply_text = "Hello from void! 🤖 Testing X integration." 1340 1341 print(f"Attempting to reply to post {cameron_post_id}") 1342 print(f"Reply text: {reply_text}") 1343 print("\nNOTE: This will fail with current Bearer token (Application-Only)") 1344 print("Posting requires OAuth User Context authentication") 1345 1346 result = client.post_reply(reply_text, cameron_post_id) 1347 1348 if result: 1349 print(f"✅ Successfully posted reply!") 1350 print(f"Reply ID: {result.get('data', {}).get('id', 'Unknown')}") 1351 else: 1352 print("❌ Failed to post reply (expected with current auth)") 1353 1354 except Exception as e: 1355 print(f"Reply failed: {e}") 1356 1357def process_x_mention(void_agent, x_client, mention_data, queue_filepath=None, testing_mode=False): 1358 """ 1359 Process an X mention and generate a reply using the Letta agent. 1360 Similar to bsky.py process_mention but for X/Twitter. 1361 1362 Args: 1363 void_agent: The Letta agent instance 1364 x_client: The X API client 1365 mention_data: The mention data dictionary 1366 queue_filepath: Optional Path object to the queue file (for cleanup on halt) 1367 testing_mode: If True, don't actually post to X 1368 1369 Returns: 1370 True: Successfully processed, remove from queue 1371 False: Failed but retryable, keep in queue 1372 None: Failed with non-retryable error, move to errors directory 1373 "no_reply": No reply was generated, move to no_reply directory 1374 """ 1375 try: 1376 logger.debug(f"Starting process_x_mention with mention_data type: {type(mention_data)}") 1377 1378 # Extract mention details 1379 if isinstance(mention_data, dict): 1380 # Handle both raw mention and queued mention formats 1381 if 'mention' in mention_data: 1382 mention = mention_data['mention'] 1383 else: 1384 mention = mention_data 1385 else: 1386 mention = mention_data 1387 1388 mention_id = mention.get('id') 1389 mention_text = mention.get('text', '') 1390 author_id = mention.get('author_id') 1391 conversation_id = mention.get('conversation_id') 1392 in_reply_to_user_id = mention.get('in_reply_to_user_id') 1393 referenced_tweets = mention.get('referenced_tweets', []) 1394 1395 # Check downrank list - only respond to downranked users 10% of the time 1396 downrank_users = load_downrank_users() 1397 if not should_respond_to_downranked_user(str(author_id), downrank_users): 1398 logger.info(f"🔻 Skipping downranked user {author_id} - not in 10% selection") 1399 return "no_reply" 1400 1401 # Enhanced conversation tracking for debug - especially important for Grok handling 1402 logger.info(f"🔍 CONVERSATION DEBUG - Mention ID: {mention_id}") 1403 logger.info(f" Author ID: {author_id}") 1404 logger.info(f" Conversation ID: {conversation_id}") 1405 logger.info(f" In Reply To User ID: {in_reply_to_user_id}") 1406 logger.info(f" Referenced Tweets: {len(referenced_tweets)} items") 1407 for i, ref in enumerate(referenced_tweets[:3]): # Log first 3 referenced tweets 1408 logger.info(f" Reference {i+1}: {ref.get('type')} -> {ref.get('id')}") 1409 logger.info(f" Text preview: {mention_text[:100]}...") 1410 1411 # If no conversation_id, try to use referenced tweet as conversation root 1412 if not conversation_id and referenced_tweets: 1413 # For replies, use the tweet being replied to as conversation root 1414 for ref in referenced_tweets: 1415 if ref.get('type') == 'replied_to': 1416 conversation_id = ref.get('id') 1417 logger.info(f"📎 No conversation_id, using replied_to tweet {conversation_id} as conversation root") 1418 break 1419 1420 if not conversation_id: 1421 # If still no conversation ID, use the mention itself as a standalone 1422 conversation_id = mention_id 1423 logger.warning(f"⚠️ No conversation_id found for mention {mention_id} - treating as standalone tweet") 1424 1425 # Get thread context with caching enabled for efficiency 1426 # Use mention_id as until_id to exclude tweets that occurred after this mention 1427 try: 1428 thread_data = x_client.get_thread_context(conversation_id, use_cache=True, until_id=mention_id) 1429 if not thread_data: 1430 logger.error(f"❌ Failed to get thread context for conversation {conversation_id}") 1431 return False 1432 1433 # If this mention references a specific tweet, ensure we have that tweet in context 1434 if referenced_tweets: 1435 for ref in referenced_tweets: 1436 if ref.get('type') == 'replied_to': 1437 ref_id = ref.get('id') 1438 # Check if the referenced tweet is in our thread data 1439 thread_tweet_ids = [t.get('id') for t in thread_data.get('tweets', [])] 1440 if ref_id and ref_id not in thread_tweet_ids: 1441 logger.warning(f"Missing referenced tweet {ref_id} in thread context, attempting to fetch") 1442 try: 1443 # Fetch the missing referenced tweet directly 1444 endpoint = f"/tweets/{ref_id}" 1445 params = { 1446 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 1447 "user.fields": "id,name,username", 1448 "expansions": "author_id" 1449 } 1450 response = x_client._make_request(endpoint, params) 1451 if response and "data" in response: 1452 missing_tweet = response["data"] 1453 if missing_tweet.get('conversation_id') == conversation_id: 1454 # Add to thread data 1455 if 'tweets' not in thread_data: 1456 thread_data['tweets'] = [] 1457 thread_data['tweets'].append(missing_tweet) 1458 1459 # Add user data if available 1460 if "includes" in response and "users" in response["includes"]: 1461 if 'users' not in thread_data: 1462 thread_data['users'] = {} 1463 for user in response["includes"]["users"]: 1464 thread_data['users'][user["id"]] = user 1465 1466 logger.info(f"✅ Added missing referenced tweet {ref_id} to thread context") 1467 else: 1468 logger.warning(f"Referenced tweet {ref_id} belongs to different conversation {missing_tweet.get('conversation_id')}") 1469 except Exception as e: 1470 logger.error(f"Failed to fetch referenced tweet {ref_id}: {e}") 1471 1472 # Enhanced thread context debugging 1473 logger.info(f"🧵 THREAD CONTEXT DEBUG - Conversation ID: {conversation_id}") 1474 thread_posts = thread_data.get('tweets', []) 1475 thread_users = thread_data.get('users', {}) 1476 logger.info(f" Posts in thread: {len(thread_posts)}") 1477 logger.info(f" Users in thread: {len(thread_users)}") 1478 1479 # Log thread participants for Grok detection 1480 for user_id, user_info in thread_users.items(): 1481 username = user_info.get('username', 'unknown') 1482 name = user_info.get('name', 'Unknown') 1483 is_verified = user_info.get('verified', False) 1484 logger.info(f" User {user_id}: @{username} ({name}) verified={is_verified}") 1485 1486 # Special logging for Grok or AI-related users 1487 if 'grok' in username.lower() or 'grok' in name.lower(): 1488 logger.info(f" 🤖 DETECTED GROK USER: @{username} ({name})") 1489 1490 # Log conversation structure 1491 for i, post in enumerate(thread_posts[:5]): # Log first 5 posts 1492 post_id = post.get('id') 1493 post_author = post.get('author_id') 1494 post_text = post.get('text', '')[:50] 1495 is_reply = 'in_reply_to_user_id' in post 1496 logger.info(f" Post {i+1}: {post_id} by {post_author} (reply={is_reply}) - {post_text}...") 1497 1498 except Exception as e: 1499 logger.error(f"❌ Error getting thread context: {e}") 1500 return False 1501 1502 # Convert to YAML string 1503 thread_context = thread_to_yaml_string(thread_data) 1504 logger.info(f"📄 Thread context generated, length: {len(thread_context)} characters") 1505 1506 # Save comprehensive conversation data for debugging 1507 try: 1508 debug_dir = X_QUEUE_DIR / "debug" / f"conversation_{conversation_id}" 1509 debug_dir.mkdir(parents=True, exist_ok=True) 1510 1511 # Save raw thread data (JSON) 1512 with open(debug_dir / f"thread_data_{mention_id}.json", 'w') as f: 1513 json.dump(thread_data, f, indent=2) 1514 1515 # Save YAML thread context 1516 with open(debug_dir / f"thread_context_{mention_id}.yaml", 'w') as f: 1517 f.write(thread_context) 1518 1519 # Save mention processing debug info 1520 debug_info = { 1521 'processed_at': datetime.now().isoformat(), 1522 'mention_id': mention_id, 1523 'conversation_id': conversation_id, 1524 'author_id': author_id, 1525 'in_reply_to_user_id': in_reply_to_user_id, 1526 'referenced_tweets': referenced_tweets, 1527 'thread_stats': { 1528 'total_posts': len(thread_posts), 1529 'total_users': len(thread_users), 1530 'yaml_length': len(thread_context) 1531 }, 1532 'users_in_conversation': { 1533 user_id: { 1534 'username': user_info.get('username'), 1535 'name': user_info.get('name'), 1536 'verified': user_info.get('verified', False), 1537 'is_grok': 'grok' in user_info.get('username', '').lower() or 'grok' in user_info.get('name', '').lower() 1538 } 1539 for user_id, user_info in thread_users.items() 1540 } 1541 } 1542 1543 with open(debug_dir / f"debug_info_{mention_id}.json", 'w') as f: 1544 json.dump(debug_info, f, indent=2) 1545 1546 logger.info(f"💾 Saved conversation debug data to: {debug_dir}") 1547 1548 except Exception as debug_error: 1549 logger.warning(f"Failed to save debug data: {debug_error}") 1550 # Continue processing even if debug save fails 1551 1552 # Check for #voidstop 1553 if "#voidstop" in thread_context.lower() or "#voidstop" in mention_text.lower(): 1554 logger.info("Found #voidstop, skipping this mention") 1555 return True 1556 1557 # Ensure X user blocks are attached 1558 try: 1559 ensure_x_user_blocks_attached(thread_data, void_agent.id) 1560 except Exception as e: 1561 logger.warning(f"Failed to ensure X user blocks: {e}") 1562 # Continue without user blocks rather than failing completely 1563 1564 # Create prompt for Letta agent 1565 # First try to use cached author info from queued mention 1566 author_username = 'unknown' 1567 author_name = 'unknown' 1568 1569 if 'author_info' in mention_data: 1570 # Use cached author info from when mention was queued 1571 cached_info = mention_data['author_info'] 1572 if cached_info.get('username'): 1573 author_username = cached_info['username'] 1574 author_name = cached_info.get('name', author_username) 1575 logger.info(f"Using cached author info: @{author_username} ({author_name})") 1576 1577 # If not cached, try thread data 1578 if author_username == 'unknown': 1579 author_info = thread_data.get('users', {}).get(author_id, {}) 1580 author_username = author_info.get('username', 'unknown') 1581 author_name = author_info.get('name', author_username) 1582 1583 # Final fallback: if username is still unknown, try to find it in the thread tweets 1584 if author_username == 'unknown' and 'tweets' in thread_data: 1585 for tweet in thread_data['tweets']: 1586 if tweet.get('author_id') == author_id and 'author' in tweet: 1587 tweet_author = tweet['author'] 1588 if tweet_author.get('username'): 1589 author_username = tweet_author['username'] 1590 author_name = tweet_author.get('name', author_username) 1591 logger.info(f"Resolved unknown author via thread fallback: @{author_username} ({author_name})") 1592 break 1593 1594 # Build user ID mapping from thread data 1595 user_id_mapping = {} 1596 if 'tweets' in thread_data: 1597 for tweet in thread_data['tweets']: 1598 author_id_tweet = tweet.get('author_id') 1599 if author_id_tweet and 'author' in tweet: 1600 tweet_author = tweet['author'] 1601 username = tweet_author.get('username') 1602 if username and author_id_tweet not in user_id_mapping: 1603 user_id_mapping[author_id_tweet] = f"@{username}" 1604 1605 # Also add users from the users dict if available 1606 if 'users' in thread_data: 1607 for user_id, user_data in thread_data['users'].items(): 1608 username = user_data.get('username') 1609 if username and user_id not in user_id_mapping: 1610 user_id_mapping[user_id] = f"@{username}" 1611 1612 # Format user ID mapping for prompt 1613 id_mapping_text = "" 1614 if user_id_mapping: 1615 id_mapping_lines = [f" {user_id}: {handle}" for user_id, handle in user_id_mapping.items()] 1616 id_mapping_text = f"\n\nUSER_ID_KEY:\n" + "\n".join(id_mapping_lines) 1617 1618 prompt = f"""You received a mention on X (Twitter) from @{author_username} ({author_name}). 1619 1620MOST RECENT POST (the mention you're responding to): 1621"{mention_text}" 1622 1623FULL THREAD CONTEXT: 1624```yaml 1625{thread_context} 1626``` 1627 1628The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow.{id_mapping_text} 1629 1630If you need to update user information, use the x_user_* tools. 1631 1632To reply, use the add_post_to_x_thread tool: 1633- Each call creates one post (max 280 characters) 1634- For most responses, a single call is sufficient 1635- Only use multiple calls for threaded replies when: 1636 * The topic requires extended explanation that cannot fit in 280 characters 1637 * You're explicitly asked for a detailed/long response 1638 * The conversation naturally benefits from a structured multi-part answer 1639- Avoid unnecessary threads - be concise when possible""" 1640 1641 # Log mention processing 1642 title = f"X MENTION FROM @{author_username}" 1643 print(f"\n{title}") 1644 print(f" {'' * len(title)}") 1645 for line in mention_text.split('\n'): 1646 print(f" {line}") 1647 1648 # Send to Letta agent 1649 from config_loader import get_letta_config 1650 from letta_client import Letta 1651 1652 config = get_letta_config() 1653 letta_client = Letta(token=config['api_key'], timeout=config['timeout']) 1654 1655 prompt_char_count = len(prompt) 1656 logger.debug(f"Sending to LLM: @{author_username} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars | prompt: {prompt_char_count} chars") 1657 1658 try: 1659 # Use streaming to avoid timeout errors 1660 message_stream = letta_client.agents.messages.create_stream( 1661 agent_id=void_agent.id, 1662 messages=[{"role": "user", "content": prompt}], 1663 stream_tokens=False, 1664 max_steps=100 1665 ) 1666 1667 # Collect streaming response (simplified version of bsky.py logic) 1668 all_messages = [] 1669 for chunk in message_stream: 1670 if hasattr(chunk, 'message_type'): 1671 if chunk.message_type == 'reasoning_message': 1672 print("\n◆ Reasoning") 1673 print(" ─────────") 1674 for line in chunk.reasoning.split('\n'): 1675 print(f" {line}") 1676 elif chunk.message_type == 'tool_call_message': 1677 tool_name = chunk.tool_call.name 1678 if tool_name == 'add_post_to_x_thread': 1679 try: 1680 args = json.loads(chunk.tool_call.arguments) 1681 text = args.get('text', '') 1682 if text: 1683 print("\n✎ X Post") 1684 print(" ────────") 1685 for line in text.split('\n'): 1686 print(f" {line}") 1687 except: 1688 pass 1689 elif tool_name == 'halt_activity': 1690 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING X BOT") 1691 if queue_filepath and queue_filepath.exists(): 1692 queue_filepath.unlink() 1693 logger.info(f"Deleted queue file: {queue_filepath.name}") 1694 logger.info("=== X BOT TERMINATED BY AGENT ===") 1695 exit(0) 1696 elif chunk.message_type == 'tool_return_message': 1697 tool_name = chunk.name 1698 status = chunk.status 1699 if status == 'success' and tool_name == 'add_post_to_x_thread': 1700 print("\n✓ X Post Queued") 1701 print(" ──────────────") 1702 print(" Post queued successfully") 1703 elif chunk.message_type == 'assistant_message': 1704 print("\n▶ Assistant Response") 1705 print(" ──────────────────") 1706 for line in chunk.content.split('\n'): 1707 print(f" {line}") 1708 1709 all_messages.append(chunk) 1710 if str(chunk) == 'done': 1711 break 1712 1713 # Convert streaming response for compatibility 1714 message_response = type('StreamingResponse', (), { 1715 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')] 1716 })() 1717 1718 except Exception as api_error: 1719 logger.error(f"Letta API error: {api_error}") 1720 raise 1721 1722 # Extract successful add_post_to_x_thread tool calls 1723 reply_candidates = [] 1724 tool_call_results = {} 1725 ignored_notification = False 1726 ack_note = None # Track any note from annotate_ack tool 1727 1728 # First pass: collect tool return statuses 1729 for message in message_response.messages: 1730 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'): 1731 if message.name == 'add_post_to_x_thread': 1732 tool_call_results[message.tool_call_id] = message.status 1733 elif message.name == 'ignore_notification': 1734 if message.status == 'success': 1735 ignored_notification = True 1736 logger.info("🚫 X notification ignored") 1737 1738 # Second pass: collect successful tool calls 1739 for message in message_response.messages: 1740 if hasattr(message, 'tool_call') and message.tool_call: 1741 # Collect annotate_ack tool calls 1742 if message.tool_call.name == 'annotate_ack': 1743 try: 1744 args = json.loads(message.tool_call.arguments) 1745 note = args.get('note', '') 1746 if note: 1747 ack_note = note 1748 logger.debug(f"Found annotate_ack with note: {note[:50]}...") 1749 except json.JSONDecodeError as e: 1750 logger.error(f"Failed to parse annotate_ack arguments: {e}") 1751 1752 # Collect add_post_to_x_thread tool calls - only if they were successful 1753 elif message.tool_call.name == 'add_post_to_x_thread': 1754 tool_call_id = message.tool_call.tool_call_id 1755 tool_status = tool_call_results.get(tool_call_id, 'unknown') 1756 1757 if tool_status == 'success': 1758 try: 1759 args = json.loads(message.tool_call.arguments) 1760 reply_text = args.get('text', '') 1761 if reply_text: 1762 reply_candidates.append(reply_text) 1763 logger.debug(f"Found successful add_post_to_x_thread candidate: {reply_text[:50]}...") 1764 except json.JSONDecodeError as e: 1765 logger.error(f"Failed to parse tool call arguments: {e}") 1766 1767 # Save agent response data to debug folder 1768 try: 1769 debug_dir = X_QUEUE_DIR / "debug" / f"conversation_{conversation_id}" 1770 1771 # Save complete agent interaction 1772 agent_response_data = { 1773 'processed_at': datetime.now().isoformat(), 1774 'mention_id': mention_id, 1775 'conversation_id': conversation_id, 1776 'prompt_sent': prompt, 1777 'reply_candidates': reply_candidates, 1778 'ignored_notification': ignored_notification, 1779 'ack_note': ack_note, 1780 'tool_call_results': tool_call_results, 1781 'all_messages': [] 1782 } 1783 1784 # Convert messages to serializable format 1785 for message in message_response.messages: 1786 msg_data = { 1787 'message_type': getattr(message, 'message_type', 'unknown'), 1788 'content': getattr(message, 'content', ''), 1789 'reasoning': getattr(message, 'reasoning', ''), 1790 'status': getattr(message, 'status', ''), 1791 'name': getattr(message, 'name', ''), 1792 } 1793 1794 if hasattr(message, 'tool_call') and message.tool_call: 1795 msg_data['tool_call'] = { 1796 'name': message.tool_call.name, 1797 'arguments': message.tool_call.arguments, 1798 'tool_call_id': getattr(message.tool_call, 'tool_call_id', '') 1799 } 1800 1801 agent_response_data['all_messages'].append(msg_data) 1802 1803 with open(debug_dir / f"agent_response_{mention_id}.json", 'w') as f: 1804 json.dump(agent_response_data, f, indent=2) 1805 1806 logger.info(f"💾 Saved agent response debug data") 1807 1808 except Exception as debug_error: 1809 logger.warning(f"Failed to save agent response debug data: {debug_error}") 1810 1811 # Handle conflicts 1812 if reply_candidates and ignored_notification: 1813 logger.error("⚠️ CONFLICT: Agent called both add_post_to_x_thread and ignore_notification!") 1814 return False 1815 1816 if reply_candidates: 1817 # Post replies to X 1818 logger.debug(f"Found {len(reply_candidates)} add_post_to_x_thread calls, posting to X") 1819 1820 if len(reply_candidates) == 1: 1821 content = reply_candidates[0] 1822 title = f"Reply to @{author_username}" 1823 else: 1824 content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_candidates, 1)]) 1825 title = f"Reply Thread to @{author_username} ({len(reply_candidates)} messages)" 1826 1827 print(f"\n{title}") 1828 print(f" {'' * len(title)}") 1829 for line in content.split('\n'): 1830 print(f" {line}") 1831 1832 if testing_mode: 1833 logger.info("TESTING MODE: Skipping actual X post") 1834 return True 1835 else: 1836 # Post to X using thread approach 1837 success = post_x_thread_replies(x_client, mention_id, reply_candidates) 1838 if success: 1839 logger.info(f"Successfully replied to @{author_username} on X") 1840 1841 # Acknowledge the post we're replying to 1842 try: 1843 ack_result = acknowledge_x_post(x_client, mention_id, ack_note) 1844 if ack_result: 1845 if ack_note: 1846 logger.info(f"Successfully acknowledged X post from @{author_username} (note: \"{ack_note[:50]}...\")") 1847 else: 1848 logger.info(f"Successfully acknowledged X post from @{author_username}") 1849 else: 1850 logger.warning(f"Failed to acknowledge X post from @{author_username}") 1851 except Exception as e: 1852 logger.error(f"Error acknowledging X post from @{author_username}: {e}") 1853 # Don't fail the entire operation if acknowledgment fails 1854 1855 return True 1856 else: 1857 logger.error(f"Failed to send reply to @{author_username} on X") 1858 return False 1859 else: 1860 if ignored_notification: 1861 logger.info(f"X mention from @{author_username} was explicitly ignored") 1862 return "ignored" 1863 else: 1864 logger.warning(f"No add_post_to_x_thread tool calls found for mention from @{author_username} - keeping in queue for next pass") 1865 return False # Keep in queue for retry instead of removing 1866 1867 except Exception as e: 1868 logger.error(f"Error processing X mention: {e}") 1869 return False 1870 1871def acknowledge_x_post(x_client, post_id, note=None): 1872 """ 1873 Acknowledge an X post that we replied to. 1874 Uses the same Bluesky client and uploads to the void data repository on atproto, 1875 just like Bluesky acknowledgments. 1876 1877 Args: 1878 x_client: XClient instance (not used, kept for compatibility) 1879 post_id: The X post ID we're acknowledging 1880 note: Optional note to include with the acknowledgment 1881 1882 Returns: 1883 True if successful, False otherwise 1884 """ 1885 try: 1886 # Use Bluesky client to upload acks to the void data repository on atproto 1887 bsky_client = bsky_utils.default_login() 1888 1889 # Create a synthetic URI and CID for the X post 1890 # X posts don't have atproto URIs/CIDs, so we create identifiers 1891 post_uri = f"x://twitter.com/post/{post_id}" 1892 post_cid = f"x_{post_id}_cid" # Synthetic CID for X posts 1893 1894 # Use the same acknowledge_post function as Bluesky 1895 ack_result = bsky_utils.acknowledge_post(bsky_client, post_uri, post_cid, note) 1896 1897 if ack_result: 1898 logger.debug(f"Acknowledged X post {post_id} via atproto" + (f" with note: {note[:50]}..." if note else "")) 1899 return True 1900 else: 1901 logger.error(f"Failed to acknowledge X post {post_id}") 1902 return False 1903 1904 except Exception as e: 1905 logger.error(f"Error acknowledging X post {post_id}: {e}") 1906 return False 1907 1908def post_x_thread_replies(x_client, in_reply_to_tweet_id, reply_messages): 1909 """ 1910 Post a series of replies to X, threading them properly. 1911 1912 Args: 1913 x_client: XClient instance 1914 in_reply_to_tweet_id: The original tweet ID to reply to 1915 reply_messages: List of reply text strings 1916 1917 Returns: 1918 True if successful, False otherwise 1919 """ 1920 try: 1921 current_reply_id = in_reply_to_tweet_id 1922 1923 for i, reply_text in enumerate(reply_messages): 1924 logger.info(f"Posting X reply {i+1}/{len(reply_messages)}: {reply_text[:50]}...") 1925 1926 result = x_client.post_reply(reply_text, current_reply_id) 1927 1928 if result and 'data' in result: 1929 new_tweet_id = result['data']['id'] 1930 logger.info(f"Successfully posted X reply {i+1}, ID: {new_tweet_id}") 1931 # For threading, the next reply should reply to this one 1932 current_reply_id = new_tweet_id 1933 else: 1934 logger.error(f"Failed to post X reply {i+1}") 1935 return False 1936 1937 return True 1938 1939 except Exception as e: 1940 logger.error(f"Error posting X thread replies: {e}") 1941 return False 1942 1943def load_and_process_queued_x_mentions(void_agent, x_client, testing_mode=False): 1944 """ 1945 Load and process all X mentions from the queue. 1946 Similar to bsky.py load_and_process_queued_notifications but for X. 1947 """ 1948 try: 1949 # Get all X mention files in queue directory 1950 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json")) 1951 1952 if not queue_files: 1953 return 1954 1955 # Load file metadata and sort by creation time (chronological order) 1956 file_metadata = [] 1957 for filepath in queue_files: 1958 try: 1959 with open(filepath, 'r') as f: 1960 queue_data = json.load(f) 1961 mention_data = queue_data.get('mention', queue_data) 1962 created_at = mention_data.get('created_at', '1970-01-01T00:00:00.000Z') # Default to epoch if missing 1963 file_metadata.append((created_at, filepath)) 1964 except Exception as e: 1965 logger.warning(f"Error reading queue file {filepath.name}: {e}") 1966 # Add with default timestamp so it still gets processed 1967 file_metadata.append(('1970-01-01T00:00:00.000Z', filepath)) 1968 1969 # Sort by creation time (oldest first) 1970 file_metadata.sort(key=lambda x: x[0]) 1971 1972 logger.info(f"Processing {len(file_metadata)} queued X mentions in chronological order") 1973 1974 for i, (created_at, filepath) in enumerate(file_metadata, 1): 1975 logger.info(f"Processing X queue file {i}/{len(file_metadata)}: {filepath.name} (created: {created_at})") 1976 1977 try: 1978 # Load mention data 1979 with open(filepath, 'r') as f: 1980 queue_data = json.load(f) 1981 1982 # Process the mention (pass full queue_data to have access to author_info) 1983 success = process_x_mention(void_agent, x_client, queue_data, 1984 queue_filepath=filepath, testing_mode=testing_mode) 1985 1986 except XRateLimitError: 1987 logger.info("Rate limit hit - breaking out of queue processing to restart from beginning") 1988 break 1989 1990 except Exception as e: 1991 logger.error(f"Error processing X queue file {filepath.name}: {e}") 1992 continue 1993 1994 # Handle file based on processing result 1995 if success: 1996 if testing_mode: 1997 logger.info(f"TESTING MODE: Keeping X queue file: {filepath.name}") 1998 else: 1999 filepath.unlink() 2000 logger.info(f"Successfully processed and removed X file: {filepath.name}") 2001 2002 # Mark as processed 2003 processed_mentions = load_processed_mentions() 2004 processed_mentions.add(mention_data.get('id')) 2005 save_processed_mentions(processed_mentions) 2006 2007 elif success is None: # Move to error directory 2008 error_dir = X_QUEUE_DIR / "errors" 2009 error_dir.mkdir(exist_ok=True) 2010 error_path = error_dir / filepath.name 2011 filepath.rename(error_path) 2012 logger.warning(f"Moved X file {filepath.name} to errors directory") 2013 2014 elif success == "no_reply": # Move to no_reply directory 2015 no_reply_dir = X_QUEUE_DIR / "no_reply" 2016 no_reply_dir.mkdir(exist_ok=True) 2017 no_reply_path = no_reply_dir / filepath.name 2018 filepath.rename(no_reply_path) 2019 logger.info(f"Moved X file {filepath.name} to no_reply directory") 2020 2021 elif success == "ignored": # Delete ignored notifications 2022 filepath.unlink() 2023 logger.info(f"🚫 Deleted ignored X notification: {filepath.name}") 2024 2025 else: 2026 logger.warning(f"⚠️ Failed to process X file {filepath.name}, keeping in queue for retry") 2027 2028 except Exception as e: 2029 logger.error(f"Error loading queued X mentions: {e}") 2030 2031def process_x_notifications(void_agent, x_client, testing_mode=False): 2032 """ 2033 Fetch new X mentions, queue them, and process the queue. 2034 Similar to bsky.py process_notifications but for X. 2035 """ 2036 try: 2037 # Get username for fetching mentions 2038 user_info = x_client._make_request("/users/me", params={"user.fields": "username"}) 2039 if not user_info or "data" not in user_info: 2040 logger.error("Could not get username for X mentions") 2041 return 2042 2043 username = user_info["data"]["username"] 2044 2045 # Fetch and queue new mentions 2046 new_count = fetch_and_queue_mentions(username) 2047 2048 if new_count > 0: 2049 logger.info(f"Found {new_count} new X mentions to process") 2050 2051 # Process the entire queue 2052 load_and_process_queued_x_mentions(void_agent, x_client, testing_mode) 2053 2054 except Exception as e: 2055 logger.error(f"Error processing X notifications: {e}") 2056 2057def periodic_user_block_cleanup(client, agent_id: str) -> None: 2058 """ 2059 Detach all user blocks from the agent to prevent memory bloat. 2060 This should be called periodically to ensure clean state. 2061 """ 2062 try: 2063 # Get all blocks attached to the agent 2064 attached_blocks = client.agents.blocks.list(agent_id=agent_id) 2065 2066 user_blocks_to_detach = [] 2067 for block in attached_blocks: 2068 if hasattr(block, 'label') and block.label.startswith('user_'): 2069 user_blocks_to_detach.append({ 2070 'label': block.label, 2071 'id': block.id 2072 }) 2073 2074 if not user_blocks_to_detach: 2075 logger.debug("No user blocks found to detach during periodic cleanup") 2076 return 2077 2078 logger.info(f"Found {len(user_blocks_to_detach)} user blocks to detach") 2079 2080 # Detach each user block 2081 for block in user_blocks_to_detach: 2082 try: 2083 client.agents.blocks.detach( 2084 agent_id=agent_id, 2085 block_id=block['id'] 2086 ) 2087 logger.debug(f"Detached user block: {block['label']}") 2088 except Exception as e: 2089 logger.error(f"Failed to detach user block {block['label']}: {e}") 2090 2091 logger.info(f"Periodic cleanup complete: detached {len(user_blocks_to_detach)} user blocks") 2092 2093 except Exception as e: 2094 logger.error(f"Error during periodic user block cleanup: {e}") 2095 2096def initialize_x_void(): 2097 """Initialize the void agent for X operations.""" 2098 logger.info("Starting void agent initialization for X...") 2099 2100 from config_loader import get_letta_config 2101 from letta_client import Letta 2102 2103 # Get config 2104 config = get_letta_config() 2105 client = Letta(token=config['api_key'], timeout=config['timeout']) 2106 agent_id = config['agent_id'] 2107 2108 try: 2109 void_agent = client.agents.retrieve(agent_id=agent_id) 2110 logger.info(f"Successfully loaded void agent for X: {void_agent.name} ({agent_id})") 2111 except Exception as e: 2112 logger.error(f"Failed to load void agent {agent_id}: {e}") 2113 raise e 2114 2115 # Clean up all user blocks at startup 2116 logger.info("🧹 Cleaning up user blocks at X startup...") 2117 periodic_user_block_cleanup(client, agent_id) 2118 2119 # Ensure correct tools are attached for X 2120 logger.info("Configuring tools for X platform...") 2121 try: 2122 from tool_manager import ensure_platform_tools 2123 ensure_platform_tools('x', void_agent.id) 2124 except Exception as e: 2125 logger.error(f"Failed to configure platform tools: {e}") 2126 logger.warning("Continuing with existing tool configuration") 2127 2128 # Log agent details 2129 logger.info(f"X Void agent details - ID: {void_agent.id}") 2130 logger.info(f"Agent name: {void_agent.name}") 2131 2132 return void_agent 2133 2134def x_main_loop(testing_mode=False, cleanup_interval=10): 2135 """ 2136 Main X bot loop that continuously monitors for mentions and processes them. 2137 Similar to bsky.py main() but for X/Twitter. 2138 2139 Args: 2140 testing_mode: If True, don't actually post to X 2141 cleanup_interval: Run user block cleanup every N cycles (0 to disable) 2142 """ 2143 import time 2144 from time import sleep 2145 from config_loader import get_letta_config 2146 from letta_client import Letta 2147 2148 logger.info("=== STARTING X VOID BOT ===") 2149 2150 # Initialize void agent 2151 void_agent = initialize_x_void() 2152 logger.info(f"X void agent initialized: {void_agent.id}") 2153 2154 # Initialize X client 2155 x_client = create_x_client() 2156 logger.info("Connected to X API") 2157 2158 # Get Letta client for periodic cleanup 2159 config = get_letta_config() 2160 letta_client = Letta(token=config['api_key'], timeout=config['timeout']) 2161 2162 # Main loop 2163 FETCH_DELAY_SEC = 120 # Check every 2 minutes for X mentions (reduced from 60s to conserve API calls) 2164 logger.info(f"Starting X mention monitoring, checking every {FETCH_DELAY_SEC} seconds") 2165 2166 if testing_mode: 2167 logger.info("=== RUNNING IN X TESTING MODE ===") 2168 logger.info(" - No messages will be sent to X") 2169 logger.info(" - Queue files will not be deleted") 2170 2171 if cleanup_interval > 0: 2172 logger.info(f"User block cleanup enabled every {cleanup_interval} cycles") 2173 else: 2174 logger.info("User block cleanup disabled") 2175 2176 cycle_count = 0 2177 start_time = time.time() 2178 2179 while True: 2180 try: 2181 cycle_count += 1 2182 logger.info(f"=== X CYCLE {cycle_count} ===") 2183 2184 # Process X notifications (fetch, queue, and process) 2185 process_x_notifications(void_agent, x_client, testing_mode) 2186 2187 # Run periodic cleanup every N cycles 2188 if cleanup_interval > 0 and cycle_count % cleanup_interval == 0: 2189 logger.debug(f"Running periodic user block cleanup (cycle {cycle_count})") 2190 periodic_user_block_cleanup(letta_client, void_agent.id) 2191 2192 # Log cycle completion 2193 elapsed_time = time.time() - start_time 2194 logger.info(f"X Cycle {cycle_count} complete. Elapsed: {elapsed_time/60:.1f} minutes") 2195 2196 sleep(FETCH_DELAY_SEC) 2197 2198 except KeyboardInterrupt: 2199 elapsed_time = time.time() - start_time 2200 logger.info("=== X BOT STOPPED BY USER ===") 2201 logger.info(f"Final X session: {cycle_count} cycles in {elapsed_time/60:.1f} minutes") 2202 break 2203 except Exception as e: 2204 logger.error(f"=== ERROR IN X MAIN LOOP CYCLE {cycle_count} ===") 2205 logger.error(f"Error details: {e}") 2206 logger.info(f"Sleeping for {FETCH_DELAY_SEC * 2} seconds due to error...") 2207 sleep(FETCH_DELAY_SEC * 2) 2208 2209def process_queue_only(testing_mode=False): 2210 """ 2211 Process all queued X mentions without fetching new ones. 2212 Useful for rate limit management - queue first, then process separately. 2213 2214 Args: 2215 testing_mode: If True, don't actually post to X and keep queue files 2216 """ 2217 logger.info("=== PROCESSING X QUEUE ONLY ===") 2218 2219 if testing_mode: 2220 logger.info("=== RUNNING IN X TESTING MODE ===") 2221 logger.info(" - No messages will be sent to X") 2222 logger.info(" - Queue files will not be deleted") 2223 2224 try: 2225 # Initialize void agent 2226 void_agent = initialize_x_void() 2227 logger.info(f"X void agent initialized: {void_agent.id}") 2228 2229 # Initialize X client 2230 x_client = create_x_client() 2231 logger.info("Connected to X API") 2232 2233 # Process the queue without fetching new mentions 2234 logger.info("Processing existing X queue...") 2235 load_and_process_queued_x_mentions(void_agent, x_client, testing_mode) 2236 2237 logger.info("=== X QUEUE PROCESSING COMPLETE ===") 2238 2239 except Exception as e: 2240 logger.error(f"Error processing X queue: {e}") 2241 raise 2242 2243def x_notification_loop(): 2244 """ 2245 DEPRECATED: Old X notification loop using search-based mention detection. 2246 Use x_main_loop() instead for the full bot experience. 2247 """ 2248 logger.warning("x_notification_loop() is deprecated. Use x_main_loop() instead.") 2249 x_main_loop() 2250 2251if __name__ == "__main__": 2252 import sys 2253 import argparse 2254 2255 if len(sys.argv) > 1: 2256 if sys.argv[1] == "bot": 2257 # Main bot with optional --test flag and cleanup interval 2258 parser = argparse.ArgumentParser(description='X Void Bot') 2259 parser.add_argument('command', choices=['bot']) 2260 parser.add_argument('--test', action='store_true', help='Run in testing mode (no actual posts)') 2261 parser.add_argument('--cleanup-interval', type=int, default=10, 2262 help='Run user block cleanup every N cycles (default: 10, 0 to disable)') 2263 args = parser.parse_args() 2264 x_main_loop(testing_mode=args.test, cleanup_interval=args.cleanup_interval) 2265 elif sys.argv[1] == "loop": 2266 x_notification_loop() 2267 elif sys.argv[1] == "reply": 2268 reply_to_cameron_post() 2269 elif sys.argv[1] == "me": 2270 get_my_user_info() 2271 elif sys.argv[1] == "search": 2272 test_search_mentions() 2273 elif sys.argv[1] == "queue": 2274 test_fetch_and_queue() 2275 elif sys.argv[1] == "thread": 2276 test_thread_context() 2277 elif sys.argv[1] == "process": 2278 # Process all queued mentions with optional --test flag 2279 testing_mode = "--test" in sys.argv 2280 process_queue_only(testing_mode=testing_mode) 2281 elif sys.argv[1] == "letta": 2282 # Use specific agent ID if provided, otherwise use from config 2283 agent_id = sys.argv[2] if len(sys.argv) > 2 else None 2284 test_letta_integration(agent_id) 2285 elif sys.argv[1] == "downrank": 2286 # View or manage downrank list 2287 if len(sys.argv) > 2 and sys.argv[2] == "list": 2288 downrank_users = load_downrank_users() 2289 if downrank_users: 2290 print(f"📋 Downrank users ({len(downrank_users)} total):") 2291 for user_id in sorted(downrank_users): 2292 print(f" - {user_id}") 2293 else: 2294 print("📋 No downrank users configured") 2295 else: 2296 print("Usage: python x.py downrank list") 2297 print(" list - Show all downranked user IDs") 2298 print(f" Edit {X_DOWNRANK_USERS_FILE} to modify the list") 2299 else: 2300 print("Usage: python x.py [bot|loop|reply|me|search|queue|process|thread|letta|downrank]") 2301 print(" bot - Run the main X bot (use --test for testing mode)") 2302 print(" Example: python x.py bot --test") 2303 print(" queue - Fetch and queue mentions only (no processing)") 2304 print(" process - Process all queued mentions only (no fetching)") 2305 print(" Example: python x.py process --test") 2306 print(" downrank - Manage downrank users (10% response rate)") 2307 print(" Example: python x.py downrank list") 2308 print(" loop - Run the old notification monitoring loop (deprecated)") 2309 print(" reply - Reply to Cameron's specific post") 2310 print(" me - Get authenticated user info and correct user ID") 2311 print(" search - Test search-based mention detection") 2312 print(" thread - Test thread context retrieval from queued mention") 2313 print(" letta - Test sending thread context to Letta agent") 2314 print(" Optional: python x.py letta <agent-id>") 2315 else: 2316 test_x_client()