this repo has no description
1import os
2import logging
3import requests
4import yaml
5import json
6import hashlib
7import random
8import time
9from typing import Optional, Dict, Any, List, Set
10from datetime import datetime
11from pathlib import Path
12from requests_oauthlib import OAuth1
13from rich import print as rprint
14from rich.panel import Panel
15from rich.text import Text
16
17import bsky_utils
18
19class XRateLimitError(Exception):
20 """Exception raised when X API rate limit is exceeded"""
21 pass
22
23
24# Configure logging
25logging.basicConfig(
26 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
27)
28logger = logging.getLogger("x_client")
29
30# X-specific file paths
31X_QUEUE_DIR = Path("x_queue")
32X_CACHE_DIR = Path("x_cache")
33X_PROCESSED_MENTIONS_FILE = Path("x_queue/processed_mentions.json")
34X_LAST_SEEN_FILE = Path("x_queue/last_seen_id.json")
35X_DOWNRANK_USERS_FILE = Path("x_downrank_users.txt")
36
37class XClient:
38 """X (Twitter) API client for fetching mentions and managing interactions."""
39
40 def __init__(self, api_key: str, user_id: str, access_token: str = None,
41 consumer_key: str = None, consumer_secret: str = None,
42 access_token_secret: str = None):
43 self.api_key = api_key
44 self.access_token = access_token
45 self.user_id = user_id
46 self.base_url = "https://api.x.com/2"
47
48 # Check if we have OAuth 1.0a credentials
49 if (consumer_key and consumer_secret and access_token and access_token_secret):
50 # Use OAuth 1.0a for User Context
51 self.oauth = OAuth1(
52 consumer_key,
53 client_secret=consumer_secret,
54 resource_owner_key=access_token,
55 resource_owner_secret=access_token_secret
56 )
57 self.headers = {"Content-Type": "application/json"}
58 self.auth_method = "oauth1a"
59 logger.info("Using OAuth 1.0a User Context authentication for X API")
60 elif access_token:
61 # Use OAuth 2.0 Bearer token for User Context
62 self.oauth = None
63 self.headers = {
64 "Authorization": f"Bearer {access_token}",
65 "Content-Type": "application/json"
66 }
67 self.auth_method = "oauth2_user"
68 logger.info("Using OAuth 2.0 User Context access token for X API")
69 else:
70 # Use Application-Only Bearer token
71 self.oauth = None
72 self.headers = {
73 "Authorization": f"Bearer {api_key}",
74 "Content-Type": "application/json"
75 }
76 self.auth_method = "bearer"
77 logger.info("Using Application-Only Bearer token for X API")
78
79 def _make_request(self, endpoint: str, params: Optional[Dict] = None, method: str = "GET", data: Optional[Dict] = None, max_retries: int = 3) -> Optional[Dict]:
80 """Make a request to the X API with proper error handling and exponential backoff."""
81 url = f"{self.base_url}{endpoint}"
82
83 for attempt in range(max_retries):
84 try:
85 if method.upper() == "GET":
86 if self.oauth:
87 response = requests.get(url, headers=self.headers, params=params, auth=self.oauth)
88 else:
89 response = requests.get(url, headers=self.headers, params=params)
90 elif method.upper() == "POST":
91 if self.oauth:
92 response = requests.post(url, headers=self.headers, json=data, auth=self.oauth)
93 else:
94 response = requests.post(url, headers=self.headers, json=data)
95 else:
96 raise ValueError(f"Unsupported HTTP method: {method}")
97
98 response.raise_for_status()
99 return response.json()
100
101 except requests.exceptions.HTTPError as e:
102 if response.status_code == 401:
103 logger.error(f"X API authentication failed with {self.auth_method} - check your credentials")
104 logger.error(f"Response: {response.text}")
105 return None # Don't retry auth failures
106 elif response.status_code == 403:
107 logger.error(f"X API forbidden with {self.auth_method} - check app permissions")
108 logger.error(f"Response: {response.text}")
109 return None # Don't retry permission failures
110 elif response.status_code == 429:
111 if attempt < max_retries - 1:
112 # Exponential backoff: 60s, 120s, 240s
113 backoff_time = 60 * (2 ** attempt)
114 logger.warning(f"X API rate limit exceeded (attempt {attempt + 1}/{max_retries}) - waiting {backoff_time}s before retry")
115 logger.error(f"Response: {response.text}")
116 time.sleep(backoff_time)
117 continue
118 else:
119 logger.error("X API rate limit exceeded - max retries reached")
120 logger.error(f"Response: {response.text}")
121 raise XRateLimitError("X API rate limit exceeded")
122 else:
123 if attempt < max_retries - 1:
124 # Exponential backoff for other HTTP errors too
125 backoff_time = 30 * (2 ** attempt)
126 logger.warning(f"X API request failed (attempt {attempt + 1}/{max_retries}): {e} - retrying in {backoff_time}s")
127 logger.error(f"Response: {response.text}")
128 time.sleep(backoff_time)
129 continue
130 else:
131 logger.error(f"X API request failed after {max_retries} attempts: {e}")
132 logger.error(f"Response: {response.text}")
133 return None
134
135 except Exception as e:
136 if attempt < max_retries - 1:
137 backoff_time = 15 * (2 ** attempt)
138 logger.warning(f"Unexpected error making X API request (attempt {attempt + 1}/{max_retries}): {e} - retrying in {backoff_time}s")
139 time.sleep(backoff_time)
140 continue
141 else:
142 logger.error(f"Unexpected error making X API request after {max_retries} attempts: {e}")
143 return None
144
145 return None
146
147 def get_mentions(self, since_id: Optional[str] = None, max_results: int = 10) -> Optional[Dict]:
148 """
149 Fetch mentions for the configured user with user data.
150
151 Args:
152 since_id: Minimum Post ID to include (for getting newer mentions)
153 max_results: Number of results to return (5-100)
154
155 Returns:
156 Dict with 'mentions' and 'users' keys, or None if request failed
157 """
158 endpoint = f"/users/{self.user_id}/mentions"
159 params = {
160 "max_results": min(max(max_results, 5), 100), # Ensure within API limits
161 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets",
162 "user.fields": "id,name,username",
163 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id"
164 }
165
166 if since_id:
167 params["since_id"] = since_id
168
169 logger.info(f"Fetching mentions for user {self.user_id}")
170 response = self._make_request(endpoint, params)
171
172 if response:
173 logger.debug(f"X API response: {response}")
174
175 if response and "data" in response:
176 mentions = response["data"]
177 users_data = {}
178
179 # Extract user data from includes
180 if "includes" in response and "users" in response["includes"]:
181 for user in response["includes"]["users"]:
182 users_data[user["id"]] = user
183 logger.info(f"Retrieved user data for {len(users_data)} users")
184
185 logger.info(f"Retrieved {len(mentions)} mentions")
186 return {"mentions": mentions, "users": users_data}
187 else:
188 if response:
189 logger.info(f"No mentions in response. Full response: {response}")
190 else:
191 logger.warning("Request failed - no response received")
192 return {"mentions": [], "users": {}}
193
194 def get_user_info(self, user_id: str) -> Optional[Dict]:
195 """Get information about a specific user."""
196 endpoint = f"/users/{user_id}"
197 params = {
198 "user.fields": "id,name,username,description,public_metrics"
199 }
200
201 response = self._make_request(endpoint, params)
202 return response.get("data") if response else None
203
204 def search_mentions(self, username: str, max_results: int = 10, since_id: str = None) -> Optional[List[Dict]]:
205 """
206 Search for mentions using the search endpoint instead of mentions endpoint.
207 This might have better rate limits than the direct mentions endpoint.
208
209 Args:
210 username: Username to search for mentions of (without @)
211 max_results: Number of results to return (10-100)
212 since_id: Only return results newer than this tweet ID
213
214 Returns:
215 List of tweets mentioning the username
216 """
217 endpoint = "/tweets/search/recent"
218
219 # Search for mentions of the username
220 query = f"@{username}"
221
222 params = {
223 "query": query,
224 "max_results": min(max(max_results, 10), 100),
225 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id",
226 "user.fields": "id,name,username",
227 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id"
228 }
229
230 if since_id:
231 params["since_id"] = since_id
232
233 logger.info(f"Searching for mentions of @{username}")
234 response = self._make_request(endpoint, params)
235
236 if response and "data" in response:
237 tweets = response["data"]
238 logger.info(f"Found {len(tweets)} mentions via search")
239 return tweets
240 else:
241 if response:
242 logger.info(f"No mentions found via search. Response: {response}")
243 else:
244 logger.warning("Search request failed")
245 return []
246
247 def get_thread_context(self, conversation_id: str, use_cache: bool = True, until_id: Optional[str] = None) -> Optional[List[Dict]]:
248 """
249 Get all tweets in a conversation thread up to a specific tweet ID.
250
251 Args:
252 conversation_id: The conversation ID to fetch (should be the original tweet ID)
253 use_cache: Whether to use cached data if available
254 until_id: Optional tweet ID to use as upper bound (excludes posts after this ID)
255
256 Returns:
257 List of tweets in the conversation, ordered chronologically
258 """
259 # Check cache first if enabled
260 if use_cache:
261 cached_data = get_cached_thread_context(conversation_id)
262 if cached_data:
263 return cached_data
264
265 # First, get the original tweet directly since it might not appear in conversation search
266 original_tweet = None
267 try:
268 endpoint = f"/tweets/{conversation_id}"
269 params = {
270 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id",
271 "user.fields": "id,name,username",
272 "expansions": "author_id"
273 }
274 response = self._make_request(endpoint, params)
275 if response and "data" in response:
276 original_tweet = response["data"]
277 logger.info(f"Retrieved original tweet: {original_tweet.get('id')}")
278 except Exception as e:
279 logger.warning(f"Could not fetch original tweet {conversation_id}: {e}")
280
281 # Then search for all tweets in this conversation
282 endpoint = "/tweets/search/recent"
283 params = {
284 "query": f"conversation_id:{conversation_id}",
285 "max_results": 100, # Get as many as possible
286 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id",
287 "user.fields": "id,name,username",
288 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id",
289 "sort_order": "recency" # Get newest first, we'll reverse later
290 }
291
292 # Add until_id parameter to exclude tweets after the mention being processed
293 if until_id:
294 params["until_id"] = until_id
295 logger.info(f"Using until_id={until_id} to exclude future tweets")
296
297 logger.info(f"Fetching thread context for conversation {conversation_id}")
298 response = self._make_request(endpoint, params)
299
300 tweets = []
301 users_data = {}
302
303 # Collect tweets from search
304 if response and "data" in response:
305 tweets.extend(response["data"])
306 # Store user data for reference
307 if "includes" in response and "users" in response["includes"]:
308 for user in response["includes"]["users"]:
309 users_data[user["id"]] = user
310
311 # Add original tweet if we got it and it's not already in the list
312 if original_tweet:
313 tweet_ids = [t.get('id') for t in tweets]
314 if original_tweet.get('id') not in tweet_ids:
315 tweets.append(original_tweet)
316 logger.info("Added original tweet to thread context")
317
318 # Attempt to fill gaps by fetching referenced tweets that are missing
319 # This helps with X API's incomplete conversation search results
320 tweet_ids = set(t.get('id') for t in tweets)
321 missing_tweet_ids = set()
322 critical_missing_ids = set()
323
324 # Collect referenced tweet IDs, prioritizing critical ones
325 for tweet in tweets:
326 referenced_tweets = tweet.get('referenced_tweets', [])
327 for ref in referenced_tweets:
328 ref_id = ref.get('id')
329 ref_type = ref.get('type')
330 if ref_id and ref_id not in tweet_ids:
331 missing_tweet_ids.add(ref_id)
332 # Prioritize direct replies and quoted tweets over retweets
333 if ref_type in ['replied_to', 'quoted']:
334 critical_missing_ids.add(ref_id)
335
336 # For rate limit efficiency, only fetch critical missing tweets if we have many
337 if len(missing_tweet_ids) > 10:
338 logger.info(f"Many missing tweets ({len(missing_tweet_ids)}), prioritizing {len(critical_missing_ids)} critical ones")
339 missing_tweet_ids = critical_missing_ids
340
341 # Context sufficiency check - skip backfill if we already have enough context
342 if has_sufficient_context(tweets, missing_tweet_ids):
343 logger.info("Thread has sufficient context, skipping missing tweet backfill")
344 missing_tweet_ids = set()
345
346 # Fetch missing referenced tweets in batches (more rate-limit friendly)
347 if missing_tweet_ids:
348 missing_list = list(missing_tweet_ids)
349
350 # First, check cache for missing tweets
351 cached_tweets = get_cached_tweets(missing_list)
352 for tweet_id, cached_tweet in cached_tweets.items():
353 if cached_tweet.get('conversation_id') == conversation_id:
354 tweets.append(cached_tweet)
355 tweet_ids.add(tweet_id)
356 logger.info(f"Retrieved missing tweet from cache: {tweet_id}")
357
358 # Add user data if available in cache
359 if cached_tweet.get('author_info'):
360 author_id = cached_tweet.get('author_id')
361 if author_id:
362 users_data[author_id] = cached_tweet['author_info']
363
364 # Only fetch tweets that weren't found in cache
365 uncached_ids = [tid for tid in missing_list if tid not in cached_tweets]
366
367 if uncached_ids:
368 batch_size = 100 # X API limit for bulk tweet lookup
369
370 for i in range(0, len(uncached_ids), batch_size):
371 batch_ids = uncached_ids[i:i + batch_size]
372 try:
373 endpoint = "/tweets"
374 params = {
375 "ids": ",".join(batch_ids),
376 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id",
377 "user.fields": "id,name,username",
378 "expansions": "author_id"
379 }
380 response = self._make_request(endpoint, params)
381
382 if response and "data" in response:
383 fetched_tweets = []
384 batch_users_data = {}
385
386 for missing_tweet in response["data"]:
387 # Only add if it's actually part of this conversation
388 if missing_tweet.get('conversation_id') == conversation_id:
389 tweets.append(missing_tweet)
390 tweet_ids.add(missing_tweet.get('id'))
391 fetched_tweets.append(missing_tweet)
392 logger.info(f"Retrieved missing referenced tweet: {missing_tweet.get('id')}")
393
394 # Add user data if available
395 if "includes" in response and "users" in response["includes"]:
396 for user in response["includes"]["users"]:
397 users_data[user["id"]] = user
398 batch_users_data[user["id"]] = user
399
400 # Cache the newly fetched tweets
401 if fetched_tweets:
402 save_cached_tweets(fetched_tweets, batch_users_data)
403
404 logger.info(f"Batch fetched {len(response['data'])} missing tweets from {len(batch_ids)} requested")
405
406 # Handle partial success - log any missing tweets that weren't found
407 if response and "errors" in response:
408 for error in response["errors"]:
409 logger.warning(f"Could not fetch tweet {error.get('resource_id')}: {error.get('title')}")
410
411 except Exception as e:
412 logger.warning(f"Could not fetch batch of missing tweets {batch_ids[:3]}...: {e}")
413 else:
414 logger.info(f"All {len(missing_list)} missing tweets found in cache")
415
416 if tweets:
417 # Filter out tweets that occur after until_id (if specified)
418 if until_id:
419 original_count = len(tweets)
420 # Convert until_id to int for comparison (Twitter IDs are sequential)
421 until_id_int = int(until_id)
422 tweets = [t for t in tweets if int(t.get('id', '0')) <= until_id_int]
423 filtered_count = len(tweets)
424 if original_count != filtered_count:
425 logger.info(f"Filtered out {original_count - filtered_count} tweets after until_id {until_id}")
426
427 # Sort chronologically (oldest first)
428 tweets.sort(key=lambda x: x.get('created_at', ''))
429 logger.info(f"Retrieved {len(tweets)} tweets in thread")
430
431 thread_data = {"tweets": tweets, "users": users_data}
432
433 # Cache individual tweets from the thread for future backfill
434 save_cached_tweets(tweets, users_data)
435
436 # Cache the result
437 if use_cache:
438 save_cached_thread_context(conversation_id, thread_data)
439
440 return thread_data
441 else:
442 logger.warning("No tweets found for thread context")
443 return None
444
445 def post_reply(self, reply_text: str, in_reply_to_tweet_id: str) -> Optional[Dict]:
446 """
447 Post a reply to a specific tweet.
448
449 Args:
450 reply_text: The text content of the reply
451 in_reply_to_tweet_id: The ID of the tweet to reply to
452
453 Returns:
454 Response data if successful, None if failed
455 """
456 endpoint = "/tweets"
457
458 payload = {
459 "text": reply_text,
460 "reply": {
461 "in_reply_to_tweet_id": in_reply_to_tweet_id
462 }
463 }
464
465 logger.info(f"Attempting to post reply with {self.auth_method} authentication")
466 result = self._make_request(endpoint, method="POST", data=payload)
467
468 if result:
469 logger.info(f"Successfully posted reply to tweet {in_reply_to_tweet_id}")
470 return result
471 else:
472 logger.error("Failed to post reply")
473 return None
474
475 def post_tweet(self, tweet_text: str) -> Optional[Dict]:
476 """
477 Post a standalone tweet (not a reply).
478
479 Args:
480 tweet_text: The text content of the tweet (max 280 characters)
481
482 Returns:
483 Response data if successful, None if failed
484 """
485 endpoint = "/tweets"
486
487 # Validate tweet length
488 if len(tweet_text) > 280:
489 logger.error(f"Tweet text too long: {len(tweet_text)} characters (max 280)")
490 return None
491
492 payload = {
493 "text": tweet_text
494 }
495
496 logger.info(f"Attempting to post tweet with {self.auth_method} authentication")
497 result = self._make_request(endpoint, method="POST", data=payload)
498
499 if result:
500 logger.info(f"Successfully posted tweet")
501 return result
502 else:
503 logger.error("Failed to post tweet")
504 return None
505
506def load_x_config(config_path: str = "config.yaml") -> Dict[str, str]:
507 """Load X configuration from config file."""
508 try:
509 with open(config_path, 'r') as f:
510 config = yaml.safe_load(f)
511
512 x_config = config.get('x', {})
513 if not x_config.get('api_key') or not x_config.get('user_id'):
514 raise ValueError("X API key and user_id must be configured in config.yaml")
515
516 return x_config
517 except Exception as e:
518 logger.error(f"Failed to load X configuration: {e}")
519 raise
520
521def create_x_client(config_path: str = "config.yaml") -> XClient:
522 """Create and return an X client with configuration loaded from file."""
523 config = load_x_config(config_path)
524 return XClient(
525 api_key=config['api_key'],
526 user_id=config['user_id'],
527 access_token=config.get('access_token'),
528 consumer_key=config.get('consumer_key'),
529 consumer_secret=config.get('consumer_secret'),
530 access_token_secret=config.get('access_token_secret')
531 )
532
533def mention_to_yaml_string(mention: Dict, users_data: Optional[Dict] = None) -> str:
534 """
535 Convert a mention object to a YAML string for better AI comprehension.
536 Similar to thread_to_yaml_string in bsky_utils.py
537 """
538 # Extract relevant fields
539 simplified_mention = {
540 'id': mention.get('id'),
541 'text': mention.get('text'),
542 'author_id': mention.get('author_id'),
543 'created_at': mention.get('created_at'),
544 'in_reply_to_user_id': mention.get('in_reply_to_user_id')
545 }
546
547 # Add user information if available
548 if users_data and mention.get('author_id') in users_data:
549 user = users_data[mention.get('author_id')]
550 simplified_mention['author'] = {
551 'username': user.get('username'),
552 'name': user.get('name')
553 }
554
555 return yaml.dump(simplified_mention, default_flow_style=False, sort_keys=False)
556
557def thread_to_yaml_string(thread_data: Dict) -> str:
558 """
559 Convert X thread context to YAML string for AI comprehension.
560 Similar to Bluesky's thread_to_yaml_string function.
561
562 Args:
563 thread_data: Dict with 'tweets' and 'users' keys from get_thread_context()
564
565 Returns:
566 YAML string representation of the thread
567 """
568 if not thread_data or "tweets" not in thread_data:
569 return "conversation: []\n"
570
571 tweets = thread_data["tweets"]
572 users_data = thread_data.get("users", {})
573
574 simplified_thread = {
575 "conversation": []
576 }
577
578 for tweet in tweets:
579 # Get user info
580 author_id = tweet.get('author_id')
581 author_info = {}
582 if author_id and author_id in users_data:
583 user = users_data[author_id]
584 author_info = {
585 'username': user.get('username'),
586 'name': user.get('name')
587 }
588
589 # Build tweet object (simplified for AI consumption)
590 tweet_obj = {
591 'text': tweet.get('text'),
592 'created_at': tweet.get('created_at'),
593 'author': author_info,
594 'author_id': author_id # Include user ID for block management
595 }
596
597 simplified_thread["conversation"].append(tweet_obj)
598
599 return yaml.dump(simplified_thread, default_flow_style=False, sort_keys=False)
600
601
602def ensure_x_user_blocks_attached(thread_data: Dict, agent_id: Optional[str] = None) -> None:
603 """
604 Ensure all users in the thread have their X user blocks attached.
605 Creates blocks with initial content including their handle if they don't exist.
606
607 Args:
608 thread_data: Dict with 'tweets' and 'users' keys from get_thread_context()
609 agent_id: The Letta agent ID to attach blocks to (defaults to config agent_id)
610 """
611 if not thread_data or "users" not in thread_data:
612 return
613
614 try:
615 from tools.blocks import attach_x_user_blocks, x_user_note_set
616 from config_loader import get_letta_config
617 from letta_client import Letta
618
619 # Get Letta client and agent_id from config
620 config = get_letta_config()
621 client = Letta(token=config['api_key'], timeout=config['timeout'])
622
623 # Use provided agent_id or get from config
624 if agent_id is None:
625 agent_id = config['agent_id']
626
627 # Get agent info to create a mock agent_state for the functions
628 class MockAgentState:
629 def __init__(self, agent_id):
630 self.id = agent_id
631
632 agent_state = MockAgentState(agent_id)
633
634 users_data = thread_data["users"]
635 user_ids = list(users_data.keys())
636
637 if not user_ids:
638 return
639
640 logger.info(f"Ensuring X user blocks for {len(user_ids)} users: {user_ids}")
641
642 # Get current blocks to check which users already have blocks with content
643 current_blocks = client.agents.blocks.list(agent_id=agent_id)
644 existing_user_blocks = {}
645
646 for block in current_blocks:
647 if block.label.startswith("x_user_"):
648 user_id = block.label.replace("x_user_", "")
649 existing_user_blocks[user_id] = block
650
651 # Attach all user blocks (this will create missing ones with basic content)
652 attach_result = attach_x_user_blocks(user_ids, agent_state)
653 logger.info(f"X user block attachment result: {attach_result}")
654
655 # For newly created blocks, update with user handle information
656 for user_id in user_ids:
657 if user_id not in existing_user_blocks:
658 user_info = users_data[user_id]
659 username = user_info.get('username', 'unknown')
660 name = user_info.get('name', 'Unknown')
661
662 # Set initial content with handle information
663 initial_content = f"# X User: {user_id}\n\n**Handle:** @{username}\n**Name:** {name}\n\nNo additional information about this user yet."
664
665 try:
666 x_user_note_set(user_id, initial_content, agent_state)
667 logger.info(f"Set initial content for X user {user_id} (@{username})")
668 except Exception as e:
669 logger.error(f"Failed to set initial content for X user {user_id}: {e}")
670
671 except Exception as e:
672 logger.error(f"Error ensuring X user blocks: {e}")
673
674
675# X Caching and Queue System Functions
676
677def load_last_seen_id() -> Optional[str]:
678 """Load the last seen mention ID for incremental fetching."""
679 if X_LAST_SEEN_FILE.exists():
680 try:
681 with open(X_LAST_SEEN_FILE, 'r') as f:
682 data = json.load(f)
683 return data.get('last_seen_id')
684 except Exception as e:
685 logger.error(f"Error loading last seen ID: {e}")
686 return None
687
688def save_last_seen_id(mention_id: str):
689 """Save the last seen mention ID."""
690 try:
691 X_QUEUE_DIR.mkdir(exist_ok=True)
692 with open(X_LAST_SEEN_FILE, 'w') as f:
693 json.dump({
694 'last_seen_id': mention_id,
695 'updated_at': datetime.now().isoformat()
696 }, f)
697 logger.debug(f"Saved last seen ID: {mention_id}")
698 except Exception as e:
699 logger.error(f"Error saving last seen ID: {e}")
700
701def load_processed_mentions() -> set:
702 """Load the set of processed mention IDs."""
703 if X_PROCESSED_MENTIONS_FILE.exists():
704 try:
705 with open(X_PROCESSED_MENTIONS_FILE, 'r') as f:
706 data = json.load(f)
707 # Keep only recent entries (last 10000)
708 if len(data) > 10000:
709 data = data[-10000:]
710 save_processed_mentions(set(data))
711 return set(data)
712 except Exception as e:
713 logger.error(f"Error loading processed mentions: {e}")
714 return set()
715
716def save_processed_mentions(processed_set: set):
717 """Save the set of processed mention IDs."""
718 try:
719 X_QUEUE_DIR.mkdir(exist_ok=True)
720 with open(X_PROCESSED_MENTIONS_FILE, 'w') as f:
721 json.dump(list(processed_set), f)
722 except Exception as e:
723 logger.error(f"Error saving processed mentions: {e}")
724
725def load_downrank_users() -> Set[str]:
726 """Load the set of user IDs that should be downranked (responded to 10% of the time)."""
727 try:
728 if not X_DOWNRANK_USERS_FILE.exists():
729 return set()
730
731 downrank_users = set()
732 with open(X_DOWNRANK_USERS_FILE, 'r') as f:
733 for line in f:
734 line = line.strip()
735 # Skip empty lines and comments
736 if line and not line.startswith('#'):
737 downrank_users.add(line)
738
739 logger.info(f"Loaded {len(downrank_users)} downrank users")
740 return downrank_users
741 except Exception as e:
742 logger.error(f"Error loading downrank users: {e}")
743 return set()
744
745def should_respond_to_downranked_user(user_id: str, downrank_users: Set[str]) -> bool:
746 """
747 Check if we should respond to a downranked user.
748 Returns True 10% of the time for downranked users, True 100% of the time for others.
749 """
750 if user_id not in downrank_users:
751 return True
752
753 # 10% chance for downranked users
754 should_respond = random.random() < 0.1
755 logger.info(f"Downranked user {user_id}: {'responding' if should_respond else 'skipping'} (10% chance)")
756 return should_respond
757
758def save_mention_to_queue(mention: Dict, users_data: Optional[Dict] = None):
759 """Save a mention to the queue directory for async processing with author info.
760
761 Args:
762 mention: The mention data from X API
763 users_data: Optional dict mapping user IDs to user data (including usernames)
764 """
765 try:
766 mention_id = mention.get('id')
767 if not mention_id:
768 logger.error("Mention missing ID, cannot queue")
769 return
770
771 # Check if already processed
772 processed_mentions = load_processed_mentions()
773 if mention_id in processed_mentions:
774 logger.debug(f"Mention {mention_id} already processed, skipping")
775 return
776
777 # Create queue directory
778 X_QUEUE_DIR.mkdir(exist_ok=True)
779
780 # Create filename using hash (similar to Bluesky system)
781 mention_str = json.dumps(mention, sort_keys=True)
782 mention_hash = hashlib.sha256(mention_str.encode()).hexdigest()[:16]
783 filename = f"x_mention_{mention_hash}.json"
784
785 queue_file = X_QUEUE_DIR / filename
786
787 # Extract author info if users_data provided
788 author_id = mention.get('author_id')
789 author_username = None
790 author_name = None
791
792 if users_data and author_id and author_id in users_data:
793 user_info = users_data[author_id]
794 author_username = user_info.get('username')
795 author_name = user_info.get('name')
796 logger.info(f"Caching author info for @{author_username} ({author_name})")
797
798 # Save mention data with enhanced debugging information
799 mention_data = {
800 'mention': mention,
801 'queued_at': datetime.now().isoformat(),
802 'type': 'x_mention',
803 # Cache author info for later use
804 'author_info': {
805 'username': author_username,
806 'name': author_name,
807 'id': author_id
808 },
809 # Debug info for conversation tracking
810 'debug_info': {
811 'mention_id': mention.get('id'),
812 'author_id': mention.get('author_id'),
813 'conversation_id': mention.get('conversation_id'),
814 'in_reply_to_user_id': mention.get('in_reply_to_user_id'),
815 'referenced_tweets': mention.get('referenced_tweets', []),
816 'text_preview': mention.get('text', '')[:200],
817 'created_at': mention.get('created_at'),
818 'public_metrics': mention.get('public_metrics', {}),
819 'context_annotations': mention.get('context_annotations', [])
820 }
821 }
822
823 with open(queue_file, 'w') as f:
824 json.dump(mention_data, f, indent=2)
825
826 logger.info(f"Queued X mention {mention_id} -> {filename}")
827
828 except Exception as e:
829 logger.error(f"Error saving mention to queue: {e}")
830
831# X Cache Functions
832def get_cached_thread_context(conversation_id: str) -> Optional[Dict]:
833 """Load cached thread context if available."""
834 cache_file = X_CACHE_DIR / f"thread_{conversation_id}.json"
835 if cache_file.exists():
836 try:
837 with open(cache_file, 'r') as f:
838 cached_data = json.load(f)
839 # Check if cache is recent (within 1 hour)
840 from datetime import datetime, timedelta
841 cached_time = datetime.fromisoformat(cached_data.get('cached_at', ''))
842 if datetime.now() - cached_time < timedelta(hours=1):
843 logger.info(f"Using cached thread context for {conversation_id}")
844 return cached_data.get('thread_data')
845 except Exception as e:
846 logger.warning(f"Error loading cached thread context: {e}")
847 return None
848
849def save_cached_thread_context(conversation_id: str, thread_data: Dict):
850 """Save thread context to cache."""
851 try:
852 X_CACHE_DIR.mkdir(exist_ok=True)
853 cache_file = X_CACHE_DIR / f"thread_{conversation_id}.json"
854
855 cache_data = {
856 'conversation_id': conversation_id,
857 'thread_data': thread_data,
858 'cached_at': datetime.now().isoformat()
859 }
860
861 with open(cache_file, 'w') as f:
862 json.dump(cache_data, f, indent=2)
863
864 logger.debug(f"Cached thread context for {conversation_id}")
865 except Exception as e:
866 logger.error(f"Error caching thread context: {e}")
867
868def get_cached_tweets(tweet_ids: List[str]) -> Dict[str, Dict]:
869 """
870 Load cached individual tweets if available.
871 Returns dict mapping tweet_id -> tweet_data for found tweets.
872 """
873 cached_tweets = {}
874
875 for tweet_id in tweet_ids:
876 cache_file = X_CACHE_DIR / f"tweet_{tweet_id}.json"
877 if cache_file.exists():
878 try:
879 with open(cache_file, 'r') as f:
880 cached_data = json.load(f)
881
882 # Use longer cache times for older tweets (24 hours vs 1 hour)
883 from datetime import datetime, timedelta
884 cached_time = datetime.fromisoformat(cached_data.get('cached_at', ''))
885 tweet_created = cached_data.get('tweet_data', {}).get('created_at', '')
886
887 # Parse tweet creation time to determine age
888 try:
889 from dateutil.parser import parse
890 tweet_age = datetime.now() - parse(tweet_created)
891 cache_duration = timedelta(hours=24) if tweet_age > timedelta(hours=24) else timedelta(hours=1)
892 except:
893 cache_duration = timedelta(hours=1) # Default to 1 hour if parsing fails
894
895 if datetime.now() - cached_time < cache_duration:
896 cached_tweets[tweet_id] = cached_data.get('tweet_data')
897 logger.debug(f"Using cached tweet {tweet_id}")
898
899 except Exception as e:
900 logger.warning(f"Error loading cached tweet {tweet_id}: {e}")
901
902 return cached_tweets
903
904def save_cached_tweets(tweets_data: List[Dict], users_data: Dict[str, Dict] = None):
905 """Save individual tweets to cache for future reuse."""
906 try:
907 X_CACHE_DIR.mkdir(exist_ok=True)
908
909 for tweet in tweets_data:
910 tweet_id = tweet.get('id')
911 if not tweet_id:
912 continue
913
914 cache_file = X_CACHE_DIR / f"tweet_{tweet_id}.json"
915
916 # Include user data if available
917 tweet_with_user = tweet.copy()
918 if users_data and tweet.get('author_id') in users_data:
919 tweet_with_user['author_info'] = users_data[tweet.get('author_id')]
920
921 cache_data = {
922 'tweet_id': tweet_id,
923 'tweet_data': tweet_with_user,
924 'cached_at': datetime.now().isoformat()
925 }
926
927 with open(cache_file, 'w') as f:
928 json.dump(cache_data, f, indent=2)
929
930 logger.debug(f"Cached individual tweet {tweet_id}")
931
932 except Exception as e:
933 logger.error(f"Error caching individual tweets: {e}")
934
935def has_sufficient_context(tweets: List[Dict], missing_tweet_ids: Set[str]) -> bool:
936 """
937 Determine if we have sufficient context to skip backfilling missing tweets.
938
939 Args:
940 tweets: List of tweets already in the thread
941 missing_tweet_ids: Set of missing tweet IDs we'd like to fetch
942
943 Returns:
944 True if context is sufficient, False if backfill is needed
945 """
946 # If no missing tweets, context is sufficient
947 if not missing_tweet_ids:
948 return True
949
950 # If we have a substantial conversation (5+ tweets), likely sufficient
951 if len(tweets) >= 5:
952 logger.debug(f"Thread has {len(tweets)} tweets, considering sufficient")
953 return True
954
955 # If only a few missing tweets and we have some context, might be enough
956 if len(missing_tweet_ids) <= 2 and len(tweets) >= 3:
957 logger.debug(f"Only {len(missing_tweet_ids)} missing tweets with {len(tweets)} existing, considering sufficient")
958 return True
959
960 # Check if we have conversational flow (mentions between users)
961 has_conversation_flow = False
962 for tweet in tweets:
963 text = tweet.get('text', '').lower()
964 # Look for mentions, replies, or conversational indicators
965 if '@' in text or 'reply' in text or len([t for t in tweets if t.get('author_id') != tweet.get('author_id')]) > 1:
966 has_conversation_flow = True
967 break
968
969 # If we have clear conversational flow and reasonable length, sufficient
970 if has_conversation_flow and len(tweets) >= 2:
971 logger.debug("Thread has conversational flow, considering sufficient")
972 return True
973
974 # Otherwise, we need to backfill
975 logger.debug(f"Context insufficient: {len(tweets)} tweets, {len(missing_tweet_ids)} missing, no clear flow")
976 return False
977
978def fetch_and_queue_mentions(username: str) -> int:
979 """
980 Single-pass function to fetch new mentions and queue them.
981 Returns number of new mentions found.
982 """
983 try:
984 client = create_x_client()
985
986 # Load last seen ID for incremental fetching
987 last_seen_id = load_last_seen_id()
988
989 logger.info(f"Fetching mentions for @{username} since {last_seen_id or 'beginning'}")
990
991 # Search for mentions
992 # Get mentions with user data
993 result = client.get_mentions(
994 since_id=last_seen_id,
995 max_results=100 # Get as many as possible
996 )
997
998 if not result or not result["mentions"]:
999 logger.info("No new mentions found")
1000 return 0
1001
1002 mentions = result["mentions"]
1003 users_data = result["users"]
1004
1005 # Process mentions (newest first, so reverse to process oldest first)
1006 mentions.reverse()
1007 new_count = 0
1008
1009 for mention in mentions:
1010 save_mention_to_queue(mention, users_data)
1011 new_count += 1
1012
1013 # Update last seen ID to the most recent mention
1014 if mentions:
1015 most_recent_id = mentions[-1]['id'] # Last after reverse = most recent
1016 save_last_seen_id(most_recent_id)
1017
1018 logger.info(f"Queued {new_count} new X mentions")
1019 return new_count
1020
1021 except Exception as e:
1022 logger.error(f"Error fetching and queuing mentions: {e}")
1023 return 0
1024
1025# Simple test function
1026def get_my_user_info():
1027 """Get the authenticated user's information to find correct user ID."""
1028 try:
1029 client = create_x_client()
1030
1031 # Use the /2/users/me endpoint to get authenticated user info
1032 endpoint = "/users/me"
1033 params = {
1034 "user.fields": "id,name,username,description"
1035 }
1036
1037 print("Fetching authenticated user information...")
1038 response = client._make_request(endpoint, params=params)
1039
1040 if response and "data" in response:
1041 user_data = response["data"]
1042 print(f"✅ Found authenticated user:")
1043 print(f" ID: {user_data.get('id')}")
1044 print(f" Username: @{user_data.get('username')}")
1045 print(f" Name: {user_data.get('name')}")
1046 print(f" Description: {user_data.get('description', 'N/A')[:100]}...")
1047 print(f"\n🔧 Update your config.yaml with:")
1048 print(f" user_id: \"{user_data.get('id')}\"")
1049 return user_data
1050 else:
1051 print("❌ Failed to get user information")
1052 print(f"Response: {response}")
1053 return None
1054
1055 except Exception as e:
1056 print(f"Error getting user info: {e}")
1057 return None
1058
1059def test_search_mentions():
1060 """Test the search-based mention detection."""
1061 try:
1062 client = create_x_client()
1063
1064 # First get our username
1065 user_info = client._make_request("/users/me", params={"user.fields": "username"})
1066 if not user_info or "data" not in user_info:
1067 print("❌ Could not get username")
1068 return
1069
1070 username = user_info["data"]["username"]
1071 print(f"🔍 Searching for mentions of @{username}")
1072
1073 mentions = client.search_mentions(username, max_results=5)
1074
1075 if mentions:
1076 print(f"✅ Found {len(mentions)} mentions via search:")
1077 for mention in mentions:
1078 print(f"- {mention.get('id')}: {mention.get('text', '')[:100]}...")
1079 else:
1080 print("No mentions found via search")
1081
1082 except Exception as e:
1083 print(f"Search test failed: {e}")
1084
1085def test_fetch_and_queue():
1086 """Test the single-pass fetch and queue function."""
1087 try:
1088 client = create_x_client()
1089
1090 # Get our username
1091 user_info = client._make_request("/users/me", params={"user.fields": "username"})
1092 if not user_info or "data" not in user_info:
1093 print("❌ Could not get username")
1094 return
1095
1096 username = user_info["data"]["username"]
1097 print(f"🔄 Fetching and queueing mentions for @{username}")
1098
1099 # Show current state
1100 last_seen = load_last_seen_id()
1101 print(f"📍 Last seen ID: {last_seen or 'None (first run)'}")
1102
1103 # Fetch and queue
1104 new_count = fetch_and_queue_mentions(username)
1105
1106 if new_count > 0:
1107 print(f"✅ Queued {new_count} new mentions")
1108 print(f"📁 Check ./x_queue/ directory for queued mentions")
1109
1110 # Show updated state
1111 new_last_seen = load_last_seen_id()
1112 print(f"📍 Updated last seen ID: {new_last_seen}")
1113 else:
1114 print("ℹ️ No new mentions to queue")
1115
1116 except Exception as e:
1117 print(f"Fetch and queue test failed: {e}")
1118
1119def test_thread_context():
1120 """Test thread context retrieval from a queued mention."""
1121 try:
1122 import json
1123
1124 # Find a queued mention file
1125 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json"))
1126 if not queue_files:
1127 print("❌ No queued mentions found. Run 'python x.py queue' first.")
1128 return
1129
1130 # Read the first mention
1131 mention_file = queue_files[0]
1132 with open(mention_file, 'r') as f:
1133 mention_data = json.load(f)
1134
1135 mention = mention_data['mention']
1136 print(f"📄 Using mention: {mention.get('id')}")
1137 print(f"📝 Text: {mention.get('text')}")
1138
1139 # Check if it has a conversation_id
1140 conversation_id = mention.get('conversation_id')
1141 if not conversation_id:
1142 print("❌ No conversation_id found in mention. May need to re-queue with updated fetch.")
1143 return
1144
1145 print(f"🧵 Getting thread context for conversation: {conversation_id}")
1146
1147 # Get thread context
1148 client = create_x_client()
1149 thread_data = client.get_thread_context(conversation_id)
1150
1151 if thread_data:
1152 tweets = thread_data.get('tweets', [])
1153 print(f"✅ Retrieved thread with {len(tweets)} tweets")
1154
1155 # Convert to YAML
1156 yaml_thread = thread_to_yaml_string(thread_data)
1157
1158 # Save thread context for inspection
1159 thread_file = X_QUEUE_DIR / f"thread_context_{conversation_id}.yaml"
1160 with open(thread_file, 'w') as f:
1161 f.write(yaml_thread)
1162
1163 print(f"💾 Saved thread context to: {thread_file}")
1164 print("\n📋 Thread preview:")
1165 print(yaml_thread)
1166 else:
1167 print("❌ Failed to retrieve thread context")
1168
1169 except Exception as e:
1170 print(f"Thread context test failed: {e}")
1171
1172def test_letta_integration(agent_id: str = None):
1173 """Test sending X thread context to Letta agent."""
1174 try:
1175 from letta_client import Letta
1176 import json
1177 import yaml
1178
1179 # Load full config to access letta section
1180 try:
1181 with open("config.yaml", 'r') as f:
1182 full_config = yaml.safe_load(f)
1183
1184 letta_config = full_config.get('letta', {})
1185 api_key = letta_config.get('api_key')
1186 config_agent_id = letta_config.get('agent_id')
1187
1188 # Use agent_id from config if not provided as parameter
1189 if not agent_id:
1190 if config_agent_id:
1191 agent_id = config_agent_id
1192 print(f"ℹ️ Using agent_id from config: {agent_id}")
1193 else:
1194 print("❌ No agent_id found in config.yaml")
1195 print("Expected config structure:")
1196 print(" letta:")
1197 print(" agent_id: your-agent-id")
1198 return
1199 else:
1200 print(f"ℹ️ Using provided agent_id: {agent_id}")
1201
1202 if not api_key:
1203 # Try loading from environment as fallback
1204 import os
1205 api_key = os.getenv('LETTA_API_KEY')
1206 if not api_key:
1207 print("❌ LETTA_API_KEY not found in config.yaml or environment")
1208 print("Expected config structure:")
1209 print(" letta:")
1210 print(" api_key: your-letta-api-key")
1211 return
1212 else:
1213 print("ℹ️ Using LETTA_API_KEY from environment")
1214 else:
1215 print("ℹ️ Using LETTA_API_KEY from config.yaml")
1216
1217 except Exception as e:
1218 print(f"❌ Error loading config: {e}")
1219 return
1220
1221 letta_client = Letta(token=api_key, timeout=600)
1222 print(f"🤖 Connected to Letta, using agent: {agent_id}")
1223
1224 # Find a queued mention file
1225 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json"))
1226 if not queue_files:
1227 print("❌ No queued mentions found. Run 'python x.py queue' first.")
1228 return
1229
1230 # Read the first mention
1231 mention_file = queue_files[0]
1232 with open(mention_file, 'r') as f:
1233 mention_data = json.load(f)
1234
1235 mention = mention_data['mention']
1236 conversation_id = mention.get('conversation_id')
1237
1238 if not conversation_id:
1239 print("❌ No conversation_id found in mention.")
1240 return
1241
1242 print(f"🧵 Getting thread context for conversation: {conversation_id}")
1243
1244 # Get thread context
1245 x_client = create_x_client()
1246 thread_data = x_client.get_thread_context(conversation_id)
1247
1248 if not thread_data:
1249 print("❌ Failed to retrieve thread context")
1250 return
1251
1252 # Convert to YAML
1253 yaml_thread = thread_to_yaml_string(thread_data)
1254
1255 # Create prompt for the agent
1256 prompt = f"""You are void, an AI agent operating on X (Twitter). You have received a mention and need to respond appropriately.
1257
1258Here is the thread context:
1259
1260{yaml_thread}
1261
1262Please craft a response that continues this conversation naturally. Keep responses conversational and authentic to your void persona."""
1263
1264 prompt_char_count = len(prompt)
1265 print(f"📤 Sending thread context to Letta agent... | prompt: {prompt_char_count} chars")
1266
1267 # Print the prompt in a rich panel
1268 rprint(Panel(prompt, title=f"Prompt ({prompt_char_count} chars)", border_style="blue"))
1269
1270 # Send to Letta agent using streaming
1271 message_stream = letta_client.agents.messages.create_stream(
1272 agent_id=agent_id,
1273 messages=[{"role": "user", "content": prompt}],
1274 stream_tokens=False,
1275 max_steps=10
1276 )
1277
1278 print("🔄 Streaming response from agent...")
1279 response_text = ""
1280
1281 for chunk in message_stream:
1282 print(chunk)
1283 if hasattr(chunk, 'message_type'):
1284 if chunk.message_type == 'assistant_message':
1285 print(f"🤖 Agent response: {chunk.content}")
1286 response_text = chunk.content
1287 elif chunk.message_type == 'reasoning_message':
1288 print(f"💭 Agent reasoning: {chunk.reasoning[:100]}...")
1289 elif chunk.message_type == 'tool_call_message':
1290 print(f"🔧 Agent tool call: {chunk.tool_call.name}")
1291
1292 if response_text:
1293 print(f"\n✅ Agent generated response:")
1294 print(f"📝 Response: {response_text}")
1295 else:
1296 print("❌ No response generated by agent")
1297
1298 except Exception as e:
1299 print(f"Letta integration test failed: {e}")
1300 import traceback
1301 traceback.print_exc()
1302
1303def test_x_client():
1304 """Test the X client by fetching mentions."""
1305 try:
1306 client = create_x_client()
1307 result = client.get_mentions(max_results=5)
1308
1309 if result and result["mentions"]:
1310 mentions = result["mentions"]
1311 users_data = result["users"]
1312 print(f"Successfully retrieved {len(mentions)} mentions:")
1313 print(f"User data available for {len(users_data)} users")
1314 for mention in mentions:
1315 author_id = mention.get('author_id')
1316 author_info = users_data.get(author_id, {})
1317 username = author_info.get('username', 'unknown')
1318 print(f"- {mention.get('id')} from @{username}: {mention.get('text')[:50]}...")
1319 else:
1320 print("No mentions retrieved")
1321
1322 except Exception as e:
1323 print(f"Test failed: {e}")
1324
1325def reply_to_cameron_post():
1326 """
1327 Reply to Cameron's specific X post.
1328
1329 NOTE: This requires OAuth User Context authentication, not Bearer token.
1330 Current Bearer token is Application-Only which can't post.
1331 """
1332 try:
1333 client = create_x_client()
1334
1335 # Cameron's post ID from the URL: https://x.com/cameron_pfiffer/status/1950690566909710618
1336 cameron_post_id = "1950690566909710618"
1337
1338 # Simple reply message
1339 reply_text = "Hello from void! 🤖 Testing X integration."
1340
1341 print(f"Attempting to reply to post {cameron_post_id}")
1342 print(f"Reply text: {reply_text}")
1343 print("\nNOTE: This will fail with current Bearer token (Application-Only)")
1344 print("Posting requires OAuth User Context authentication")
1345
1346 result = client.post_reply(reply_text, cameron_post_id)
1347
1348 if result:
1349 print(f"✅ Successfully posted reply!")
1350 print(f"Reply ID: {result.get('data', {}).get('id', 'Unknown')}")
1351 else:
1352 print("❌ Failed to post reply (expected with current auth)")
1353
1354 except Exception as e:
1355 print(f"Reply failed: {e}")
1356
1357def process_x_mention(void_agent, x_client, mention_data, queue_filepath=None, testing_mode=False):
1358 """
1359 Process an X mention and generate a reply using the Letta agent.
1360 Similar to bsky.py process_mention but for X/Twitter.
1361
1362 Args:
1363 void_agent: The Letta agent instance
1364 x_client: The X API client
1365 mention_data: The mention data dictionary
1366 queue_filepath: Optional Path object to the queue file (for cleanup on halt)
1367 testing_mode: If True, don't actually post to X
1368
1369 Returns:
1370 True: Successfully processed, remove from queue
1371 False: Failed but retryable, keep in queue
1372 None: Failed with non-retryable error, move to errors directory
1373 "no_reply": No reply was generated, move to no_reply directory
1374 """
1375 try:
1376 logger.debug(f"Starting process_x_mention with mention_data type: {type(mention_data)}")
1377
1378 # Extract mention details
1379 if isinstance(mention_data, dict):
1380 # Handle both raw mention and queued mention formats
1381 if 'mention' in mention_data:
1382 mention = mention_data['mention']
1383 else:
1384 mention = mention_data
1385 else:
1386 mention = mention_data
1387
1388 mention_id = mention.get('id')
1389 mention_text = mention.get('text', '')
1390 author_id = mention.get('author_id')
1391 conversation_id = mention.get('conversation_id')
1392 in_reply_to_user_id = mention.get('in_reply_to_user_id')
1393 referenced_tweets = mention.get('referenced_tweets', [])
1394
1395 # Check downrank list - only respond to downranked users 10% of the time
1396 downrank_users = load_downrank_users()
1397 if not should_respond_to_downranked_user(str(author_id), downrank_users):
1398 logger.info(f"🔻 Skipping downranked user {author_id} - not in 10% selection")
1399 return "no_reply"
1400
1401 # Enhanced conversation tracking for debug - especially important for Grok handling
1402 logger.info(f"🔍 CONVERSATION DEBUG - Mention ID: {mention_id}")
1403 logger.info(f" Author ID: {author_id}")
1404 logger.info(f" Conversation ID: {conversation_id}")
1405 logger.info(f" In Reply To User ID: {in_reply_to_user_id}")
1406 logger.info(f" Referenced Tweets: {len(referenced_tweets)} items")
1407 for i, ref in enumerate(referenced_tweets[:3]): # Log first 3 referenced tweets
1408 logger.info(f" Reference {i+1}: {ref.get('type')} -> {ref.get('id')}")
1409 logger.info(f" Text preview: {mention_text[:100]}...")
1410
1411 # If no conversation_id, try to use referenced tweet as conversation root
1412 if not conversation_id and referenced_tweets:
1413 # For replies, use the tweet being replied to as conversation root
1414 for ref in referenced_tweets:
1415 if ref.get('type') == 'replied_to':
1416 conversation_id = ref.get('id')
1417 logger.info(f"📎 No conversation_id, using replied_to tweet {conversation_id} as conversation root")
1418 break
1419
1420 if not conversation_id:
1421 # If still no conversation ID, use the mention itself as a standalone
1422 conversation_id = mention_id
1423 logger.warning(f"⚠️ No conversation_id found for mention {mention_id} - treating as standalone tweet")
1424
1425 # Get thread context with caching enabled for efficiency
1426 # Use mention_id as until_id to exclude tweets that occurred after this mention
1427 try:
1428 thread_data = x_client.get_thread_context(conversation_id, use_cache=True, until_id=mention_id)
1429 if not thread_data:
1430 logger.error(f"❌ Failed to get thread context for conversation {conversation_id}")
1431 return False
1432
1433 # If this mention references a specific tweet, ensure we have that tweet in context
1434 if referenced_tweets:
1435 for ref in referenced_tweets:
1436 if ref.get('type') == 'replied_to':
1437 ref_id = ref.get('id')
1438 # Check if the referenced tweet is in our thread data
1439 thread_tweet_ids = [t.get('id') for t in thread_data.get('tweets', [])]
1440 if ref_id and ref_id not in thread_tweet_ids:
1441 logger.warning(f"Missing referenced tweet {ref_id} in thread context, attempting to fetch")
1442 try:
1443 # Fetch the missing referenced tweet directly
1444 endpoint = f"/tweets/{ref_id}"
1445 params = {
1446 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id",
1447 "user.fields": "id,name,username",
1448 "expansions": "author_id"
1449 }
1450 response = x_client._make_request(endpoint, params)
1451 if response and "data" in response:
1452 missing_tweet = response["data"]
1453 if missing_tweet.get('conversation_id') == conversation_id:
1454 # Add to thread data
1455 if 'tweets' not in thread_data:
1456 thread_data['tweets'] = []
1457 thread_data['tweets'].append(missing_tweet)
1458
1459 # Add user data if available
1460 if "includes" in response and "users" in response["includes"]:
1461 if 'users' not in thread_data:
1462 thread_data['users'] = {}
1463 for user in response["includes"]["users"]:
1464 thread_data['users'][user["id"]] = user
1465
1466 logger.info(f"✅ Added missing referenced tweet {ref_id} to thread context")
1467 else:
1468 logger.warning(f"Referenced tweet {ref_id} belongs to different conversation {missing_tweet.get('conversation_id')}")
1469 except Exception as e:
1470 logger.error(f"Failed to fetch referenced tweet {ref_id}: {e}")
1471
1472 # Enhanced thread context debugging
1473 logger.info(f"🧵 THREAD CONTEXT DEBUG - Conversation ID: {conversation_id}")
1474 thread_posts = thread_data.get('tweets', [])
1475 thread_users = thread_data.get('users', {})
1476 logger.info(f" Posts in thread: {len(thread_posts)}")
1477 logger.info(f" Users in thread: {len(thread_users)}")
1478
1479 # Log thread participants for Grok detection
1480 for user_id, user_info in thread_users.items():
1481 username = user_info.get('username', 'unknown')
1482 name = user_info.get('name', 'Unknown')
1483 is_verified = user_info.get('verified', False)
1484 logger.info(f" User {user_id}: @{username} ({name}) verified={is_verified}")
1485
1486 # Special logging for Grok or AI-related users
1487 if 'grok' in username.lower() or 'grok' in name.lower():
1488 logger.info(f" 🤖 DETECTED GROK USER: @{username} ({name})")
1489
1490 # Log conversation structure
1491 for i, post in enumerate(thread_posts[:5]): # Log first 5 posts
1492 post_id = post.get('id')
1493 post_author = post.get('author_id')
1494 post_text = post.get('text', '')[:50]
1495 is_reply = 'in_reply_to_user_id' in post
1496 logger.info(f" Post {i+1}: {post_id} by {post_author} (reply={is_reply}) - {post_text}...")
1497
1498 except Exception as e:
1499 logger.error(f"❌ Error getting thread context: {e}")
1500 return False
1501
1502 # Convert to YAML string
1503 thread_context = thread_to_yaml_string(thread_data)
1504 logger.info(f"📄 Thread context generated, length: {len(thread_context)} characters")
1505
1506 # Save comprehensive conversation data for debugging
1507 try:
1508 debug_dir = X_QUEUE_DIR / "debug" / f"conversation_{conversation_id}"
1509 debug_dir.mkdir(parents=True, exist_ok=True)
1510
1511 # Save raw thread data (JSON)
1512 with open(debug_dir / f"thread_data_{mention_id}.json", 'w') as f:
1513 json.dump(thread_data, f, indent=2)
1514
1515 # Save YAML thread context
1516 with open(debug_dir / f"thread_context_{mention_id}.yaml", 'w') as f:
1517 f.write(thread_context)
1518
1519 # Save mention processing debug info
1520 debug_info = {
1521 'processed_at': datetime.now().isoformat(),
1522 'mention_id': mention_id,
1523 'conversation_id': conversation_id,
1524 'author_id': author_id,
1525 'in_reply_to_user_id': in_reply_to_user_id,
1526 'referenced_tweets': referenced_tweets,
1527 'thread_stats': {
1528 'total_posts': len(thread_posts),
1529 'total_users': len(thread_users),
1530 'yaml_length': len(thread_context)
1531 },
1532 'users_in_conversation': {
1533 user_id: {
1534 'username': user_info.get('username'),
1535 'name': user_info.get('name'),
1536 'verified': user_info.get('verified', False),
1537 'is_grok': 'grok' in user_info.get('username', '').lower() or 'grok' in user_info.get('name', '').lower()
1538 }
1539 for user_id, user_info in thread_users.items()
1540 }
1541 }
1542
1543 with open(debug_dir / f"debug_info_{mention_id}.json", 'w') as f:
1544 json.dump(debug_info, f, indent=2)
1545
1546 logger.info(f"💾 Saved conversation debug data to: {debug_dir}")
1547
1548 except Exception as debug_error:
1549 logger.warning(f"Failed to save debug data: {debug_error}")
1550 # Continue processing even if debug save fails
1551
1552 # Check for #voidstop
1553 if "#voidstop" in thread_context.lower() or "#voidstop" in mention_text.lower():
1554 logger.info("Found #voidstop, skipping this mention")
1555 return True
1556
1557 # Ensure X user blocks are attached
1558 try:
1559 ensure_x_user_blocks_attached(thread_data, void_agent.id)
1560 except Exception as e:
1561 logger.warning(f"Failed to ensure X user blocks: {e}")
1562 # Continue without user blocks rather than failing completely
1563
1564 # Create prompt for Letta agent
1565 # First try to use cached author info from queued mention
1566 author_username = 'unknown'
1567 author_name = 'unknown'
1568
1569 if 'author_info' in mention_data:
1570 # Use cached author info from when mention was queued
1571 cached_info = mention_data['author_info']
1572 if cached_info.get('username'):
1573 author_username = cached_info['username']
1574 author_name = cached_info.get('name', author_username)
1575 logger.info(f"Using cached author info: @{author_username} ({author_name})")
1576
1577 # If not cached, try thread data
1578 if author_username == 'unknown':
1579 author_info = thread_data.get('users', {}).get(author_id, {})
1580 author_username = author_info.get('username', 'unknown')
1581 author_name = author_info.get('name', author_username)
1582
1583 # Final fallback: if username is still unknown, try to find it in the thread tweets
1584 if author_username == 'unknown' and 'tweets' in thread_data:
1585 for tweet in thread_data['tweets']:
1586 if tweet.get('author_id') == author_id and 'author' in tweet:
1587 tweet_author = tweet['author']
1588 if tweet_author.get('username'):
1589 author_username = tweet_author['username']
1590 author_name = tweet_author.get('name', author_username)
1591 logger.info(f"Resolved unknown author via thread fallback: @{author_username} ({author_name})")
1592 break
1593
1594 # Build user ID mapping from thread data
1595 user_id_mapping = {}
1596 if 'tweets' in thread_data:
1597 for tweet in thread_data['tweets']:
1598 author_id_tweet = tweet.get('author_id')
1599 if author_id_tweet and 'author' in tweet:
1600 tweet_author = tweet['author']
1601 username = tweet_author.get('username')
1602 if username and author_id_tweet not in user_id_mapping:
1603 user_id_mapping[author_id_tweet] = f"@{username}"
1604
1605 # Also add users from the users dict if available
1606 if 'users' in thread_data:
1607 for user_id, user_data in thread_data['users'].items():
1608 username = user_data.get('username')
1609 if username and user_id not in user_id_mapping:
1610 user_id_mapping[user_id] = f"@{username}"
1611
1612 # Format user ID mapping for prompt
1613 id_mapping_text = ""
1614 if user_id_mapping:
1615 id_mapping_lines = [f" {user_id}: {handle}" for user_id, handle in user_id_mapping.items()]
1616 id_mapping_text = f"\n\nUSER_ID_KEY:\n" + "\n".join(id_mapping_lines)
1617
1618 prompt = f"""You received a mention on X (Twitter) from @{author_username} ({author_name}).
1619
1620MOST RECENT POST (the mention you're responding to):
1621"{mention_text}"
1622
1623FULL THREAD CONTEXT:
1624```yaml
1625{thread_context}
1626```
1627
1628The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow.{id_mapping_text}
1629
1630If you need to update user information, use the x_user_* tools.
1631
1632To reply, use the add_post_to_x_thread tool:
1633- Each call creates one post (max 280 characters)
1634- For most responses, a single call is sufficient
1635- Only use multiple calls for threaded replies when:
1636 * The topic requires extended explanation that cannot fit in 280 characters
1637 * You're explicitly asked for a detailed/long response
1638 * The conversation naturally benefits from a structured multi-part answer
1639- Avoid unnecessary threads - be concise when possible"""
1640
1641 # Log mention processing
1642 title = f"X MENTION FROM @{author_username}"
1643 print(f"\n▶ {title}")
1644 print(f" {'═' * len(title)}")
1645 for line in mention_text.split('\n'):
1646 print(f" {line}")
1647
1648 # Send to Letta agent
1649 from config_loader import get_letta_config
1650 from letta_client import Letta
1651
1652 config = get_letta_config()
1653 letta_client = Letta(token=config['api_key'], timeout=config['timeout'])
1654
1655 prompt_char_count = len(prompt)
1656 logger.debug(f"Sending to LLM: @{author_username} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars | prompt: {prompt_char_count} chars")
1657
1658 try:
1659 # Use streaming to avoid timeout errors
1660 message_stream = letta_client.agents.messages.create_stream(
1661 agent_id=void_agent.id,
1662 messages=[{"role": "user", "content": prompt}],
1663 stream_tokens=False,
1664 max_steps=100
1665 )
1666
1667 # Collect streaming response (simplified version of bsky.py logic)
1668 all_messages = []
1669 for chunk in message_stream:
1670 if hasattr(chunk, 'message_type'):
1671 if chunk.message_type == 'reasoning_message':
1672 print("\n◆ Reasoning")
1673 print(" ─────────")
1674 for line in chunk.reasoning.split('\n'):
1675 print(f" {line}")
1676 elif chunk.message_type == 'tool_call_message':
1677 tool_name = chunk.tool_call.name
1678 if tool_name == 'add_post_to_x_thread':
1679 try:
1680 args = json.loads(chunk.tool_call.arguments)
1681 text = args.get('text', '')
1682 if text:
1683 print("\n✎ X Post")
1684 print(" ────────")
1685 for line in text.split('\n'):
1686 print(f" {line}")
1687 except:
1688 pass
1689 elif tool_name == 'halt_activity':
1690 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING X BOT")
1691 if queue_filepath and queue_filepath.exists():
1692 queue_filepath.unlink()
1693 logger.info(f"Deleted queue file: {queue_filepath.name}")
1694 logger.info("=== X BOT TERMINATED BY AGENT ===")
1695 exit(0)
1696 elif chunk.message_type == 'tool_return_message':
1697 tool_name = chunk.name
1698 status = chunk.status
1699 if status == 'success' and tool_name == 'add_post_to_x_thread':
1700 print("\n✓ X Post Queued")
1701 print(" ──────────────")
1702 print(" Post queued successfully")
1703 elif chunk.message_type == 'assistant_message':
1704 print("\n▶ Assistant Response")
1705 print(" ──────────────────")
1706 for line in chunk.content.split('\n'):
1707 print(f" {line}")
1708
1709 all_messages.append(chunk)
1710 if str(chunk) == 'done':
1711 break
1712
1713 # Convert streaming response for compatibility
1714 message_response = type('StreamingResponse', (), {
1715 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')]
1716 })()
1717
1718 except Exception as api_error:
1719 logger.error(f"Letta API error: {api_error}")
1720 raise
1721
1722 # Extract successful add_post_to_x_thread tool calls
1723 reply_candidates = []
1724 tool_call_results = {}
1725 ignored_notification = False
1726 ack_note = None # Track any note from annotate_ack tool
1727
1728 # First pass: collect tool return statuses
1729 for message in message_response.messages:
1730 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'):
1731 if message.name == 'add_post_to_x_thread':
1732 tool_call_results[message.tool_call_id] = message.status
1733 elif message.name == 'ignore_notification':
1734 if message.status == 'success':
1735 ignored_notification = True
1736 logger.info("🚫 X notification ignored")
1737
1738 # Second pass: collect successful tool calls
1739 for message in message_response.messages:
1740 if hasattr(message, 'tool_call') and message.tool_call:
1741 # Collect annotate_ack tool calls
1742 if message.tool_call.name == 'annotate_ack':
1743 try:
1744 args = json.loads(message.tool_call.arguments)
1745 note = args.get('note', '')
1746 if note:
1747 ack_note = note
1748 logger.debug(f"Found annotate_ack with note: {note[:50]}...")
1749 except json.JSONDecodeError as e:
1750 logger.error(f"Failed to parse annotate_ack arguments: {e}")
1751
1752 # Collect add_post_to_x_thread tool calls - only if they were successful
1753 elif message.tool_call.name == 'add_post_to_x_thread':
1754 tool_call_id = message.tool_call.tool_call_id
1755 tool_status = tool_call_results.get(tool_call_id, 'unknown')
1756
1757 if tool_status == 'success':
1758 try:
1759 args = json.loads(message.tool_call.arguments)
1760 reply_text = args.get('text', '')
1761 if reply_text:
1762 reply_candidates.append(reply_text)
1763 logger.debug(f"Found successful add_post_to_x_thread candidate: {reply_text[:50]}...")
1764 except json.JSONDecodeError as e:
1765 logger.error(f"Failed to parse tool call arguments: {e}")
1766
1767 # Save agent response data to debug folder
1768 try:
1769 debug_dir = X_QUEUE_DIR / "debug" / f"conversation_{conversation_id}"
1770
1771 # Save complete agent interaction
1772 agent_response_data = {
1773 'processed_at': datetime.now().isoformat(),
1774 'mention_id': mention_id,
1775 'conversation_id': conversation_id,
1776 'prompt_sent': prompt,
1777 'reply_candidates': reply_candidates,
1778 'ignored_notification': ignored_notification,
1779 'ack_note': ack_note,
1780 'tool_call_results': tool_call_results,
1781 'all_messages': []
1782 }
1783
1784 # Convert messages to serializable format
1785 for message in message_response.messages:
1786 msg_data = {
1787 'message_type': getattr(message, 'message_type', 'unknown'),
1788 'content': getattr(message, 'content', ''),
1789 'reasoning': getattr(message, 'reasoning', ''),
1790 'status': getattr(message, 'status', ''),
1791 'name': getattr(message, 'name', ''),
1792 }
1793
1794 if hasattr(message, 'tool_call') and message.tool_call:
1795 msg_data['tool_call'] = {
1796 'name': message.tool_call.name,
1797 'arguments': message.tool_call.arguments,
1798 'tool_call_id': getattr(message.tool_call, 'tool_call_id', '')
1799 }
1800
1801 agent_response_data['all_messages'].append(msg_data)
1802
1803 with open(debug_dir / f"agent_response_{mention_id}.json", 'w') as f:
1804 json.dump(agent_response_data, f, indent=2)
1805
1806 logger.info(f"💾 Saved agent response debug data")
1807
1808 except Exception as debug_error:
1809 logger.warning(f"Failed to save agent response debug data: {debug_error}")
1810
1811 # Handle conflicts
1812 if reply_candidates and ignored_notification:
1813 logger.error("⚠️ CONFLICT: Agent called both add_post_to_x_thread and ignore_notification!")
1814 return False
1815
1816 if reply_candidates:
1817 # Post replies to X
1818 logger.debug(f"Found {len(reply_candidates)} add_post_to_x_thread calls, posting to X")
1819
1820 if len(reply_candidates) == 1:
1821 content = reply_candidates[0]
1822 title = f"Reply to @{author_username}"
1823 else:
1824 content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_candidates, 1)])
1825 title = f"Reply Thread to @{author_username} ({len(reply_candidates)} messages)"
1826
1827 print(f"\n✎ {title}")
1828 print(f" {'─' * len(title)}")
1829 for line in content.split('\n'):
1830 print(f" {line}")
1831
1832 if testing_mode:
1833 logger.info("TESTING MODE: Skipping actual X post")
1834 return True
1835 else:
1836 # Post to X using thread approach
1837 success = post_x_thread_replies(x_client, mention_id, reply_candidates)
1838 if success:
1839 logger.info(f"Successfully replied to @{author_username} on X")
1840
1841 # Acknowledge the post we're replying to
1842 try:
1843 ack_result = acknowledge_x_post(x_client, mention_id, ack_note)
1844 if ack_result:
1845 if ack_note:
1846 logger.info(f"Successfully acknowledged X post from @{author_username} (note: \"{ack_note[:50]}...\")")
1847 else:
1848 logger.info(f"Successfully acknowledged X post from @{author_username}")
1849 else:
1850 logger.warning(f"Failed to acknowledge X post from @{author_username}")
1851 except Exception as e:
1852 logger.error(f"Error acknowledging X post from @{author_username}: {e}")
1853 # Don't fail the entire operation if acknowledgment fails
1854
1855 return True
1856 else:
1857 logger.error(f"Failed to send reply to @{author_username} on X")
1858 return False
1859 else:
1860 if ignored_notification:
1861 logger.info(f"X mention from @{author_username} was explicitly ignored")
1862 return "ignored"
1863 else:
1864 logger.warning(f"No add_post_to_x_thread tool calls found for mention from @{author_username} - keeping in queue for next pass")
1865 return False # Keep in queue for retry instead of removing
1866
1867 except Exception as e:
1868 logger.error(f"Error processing X mention: {e}")
1869 return False
1870
1871def acknowledge_x_post(x_client, post_id, note=None):
1872 """
1873 Acknowledge an X post that we replied to.
1874 Uses the same Bluesky client and uploads to the void data repository on atproto,
1875 just like Bluesky acknowledgments.
1876
1877 Args:
1878 x_client: XClient instance (not used, kept for compatibility)
1879 post_id: The X post ID we're acknowledging
1880 note: Optional note to include with the acknowledgment
1881
1882 Returns:
1883 True if successful, False otherwise
1884 """
1885 try:
1886 # Use Bluesky client to upload acks to the void data repository on atproto
1887 bsky_client = bsky_utils.default_login()
1888
1889 # Create a synthetic URI and CID for the X post
1890 # X posts don't have atproto URIs/CIDs, so we create identifiers
1891 post_uri = f"x://twitter.com/post/{post_id}"
1892 post_cid = f"x_{post_id}_cid" # Synthetic CID for X posts
1893
1894 # Use the same acknowledge_post function as Bluesky
1895 ack_result = bsky_utils.acknowledge_post(bsky_client, post_uri, post_cid, note)
1896
1897 if ack_result:
1898 logger.debug(f"Acknowledged X post {post_id} via atproto" + (f" with note: {note[:50]}..." if note else ""))
1899 return True
1900 else:
1901 logger.error(f"Failed to acknowledge X post {post_id}")
1902 return False
1903
1904 except Exception as e:
1905 logger.error(f"Error acknowledging X post {post_id}: {e}")
1906 return False
1907
1908def post_x_thread_replies(x_client, in_reply_to_tweet_id, reply_messages):
1909 """
1910 Post a series of replies to X, threading them properly.
1911
1912 Args:
1913 x_client: XClient instance
1914 in_reply_to_tweet_id: The original tweet ID to reply to
1915 reply_messages: List of reply text strings
1916
1917 Returns:
1918 True if successful, False otherwise
1919 """
1920 try:
1921 current_reply_id = in_reply_to_tweet_id
1922
1923 for i, reply_text in enumerate(reply_messages):
1924 logger.info(f"Posting X reply {i+1}/{len(reply_messages)}: {reply_text[:50]}...")
1925
1926 result = x_client.post_reply(reply_text, current_reply_id)
1927
1928 if result and 'data' in result:
1929 new_tweet_id = result['data']['id']
1930 logger.info(f"Successfully posted X reply {i+1}, ID: {new_tweet_id}")
1931 # For threading, the next reply should reply to this one
1932 current_reply_id = new_tweet_id
1933 else:
1934 logger.error(f"Failed to post X reply {i+1}")
1935 return False
1936
1937 return True
1938
1939 except Exception as e:
1940 logger.error(f"Error posting X thread replies: {e}")
1941 return False
1942
1943def load_and_process_queued_x_mentions(void_agent, x_client, testing_mode=False):
1944 """
1945 Load and process all X mentions from the queue.
1946 Similar to bsky.py load_and_process_queued_notifications but for X.
1947 """
1948 try:
1949 # Get all X mention files in queue directory
1950 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json"))
1951
1952 if not queue_files:
1953 return
1954
1955 # Load file metadata and sort by creation time (chronological order)
1956 file_metadata = []
1957 for filepath in queue_files:
1958 try:
1959 with open(filepath, 'r') as f:
1960 queue_data = json.load(f)
1961 mention_data = queue_data.get('mention', queue_data)
1962 created_at = mention_data.get('created_at', '1970-01-01T00:00:00.000Z') # Default to epoch if missing
1963 file_metadata.append((created_at, filepath))
1964 except Exception as e:
1965 logger.warning(f"Error reading queue file {filepath.name}: {e}")
1966 # Add with default timestamp so it still gets processed
1967 file_metadata.append(('1970-01-01T00:00:00.000Z', filepath))
1968
1969 # Sort by creation time (oldest first)
1970 file_metadata.sort(key=lambda x: x[0])
1971
1972 logger.info(f"Processing {len(file_metadata)} queued X mentions in chronological order")
1973
1974 for i, (created_at, filepath) in enumerate(file_metadata, 1):
1975 logger.info(f"Processing X queue file {i}/{len(file_metadata)}: {filepath.name} (created: {created_at})")
1976
1977 try:
1978 # Load mention data
1979 with open(filepath, 'r') as f:
1980 queue_data = json.load(f)
1981
1982 # Process the mention (pass full queue_data to have access to author_info)
1983 success = process_x_mention(void_agent, x_client, queue_data,
1984 queue_filepath=filepath, testing_mode=testing_mode)
1985
1986 except XRateLimitError:
1987 logger.info("Rate limit hit - breaking out of queue processing to restart from beginning")
1988 break
1989
1990 except Exception as e:
1991 logger.error(f"Error processing X queue file {filepath.name}: {e}")
1992 continue
1993
1994 # Handle file based on processing result
1995 if success:
1996 if testing_mode:
1997 logger.info(f"TESTING MODE: Keeping X queue file: {filepath.name}")
1998 else:
1999 filepath.unlink()
2000 logger.info(f"Successfully processed and removed X file: {filepath.name}")
2001
2002 # Mark as processed
2003 processed_mentions = load_processed_mentions()
2004 processed_mentions.add(mention_data.get('id'))
2005 save_processed_mentions(processed_mentions)
2006
2007 elif success is None: # Move to error directory
2008 error_dir = X_QUEUE_DIR / "errors"
2009 error_dir.mkdir(exist_ok=True)
2010 error_path = error_dir / filepath.name
2011 filepath.rename(error_path)
2012 logger.warning(f"Moved X file {filepath.name} to errors directory")
2013
2014 elif success == "no_reply": # Move to no_reply directory
2015 no_reply_dir = X_QUEUE_DIR / "no_reply"
2016 no_reply_dir.mkdir(exist_ok=True)
2017 no_reply_path = no_reply_dir / filepath.name
2018 filepath.rename(no_reply_path)
2019 logger.info(f"Moved X file {filepath.name} to no_reply directory")
2020
2021 elif success == "ignored": # Delete ignored notifications
2022 filepath.unlink()
2023 logger.info(f"🚫 Deleted ignored X notification: {filepath.name}")
2024
2025 else:
2026 logger.warning(f"⚠️ Failed to process X file {filepath.name}, keeping in queue for retry")
2027
2028 except Exception as e:
2029 logger.error(f"Error loading queued X mentions: {e}")
2030
2031def process_x_notifications(void_agent, x_client, testing_mode=False):
2032 """
2033 Fetch new X mentions, queue them, and process the queue.
2034 Similar to bsky.py process_notifications but for X.
2035 """
2036 try:
2037 # Get username for fetching mentions
2038 user_info = x_client._make_request("/users/me", params={"user.fields": "username"})
2039 if not user_info or "data" not in user_info:
2040 logger.error("Could not get username for X mentions")
2041 return
2042
2043 username = user_info["data"]["username"]
2044
2045 # Fetch and queue new mentions
2046 new_count = fetch_and_queue_mentions(username)
2047
2048 if new_count > 0:
2049 logger.info(f"Found {new_count} new X mentions to process")
2050
2051 # Process the entire queue
2052 load_and_process_queued_x_mentions(void_agent, x_client, testing_mode)
2053
2054 except Exception as e:
2055 logger.error(f"Error processing X notifications: {e}")
2056
2057def periodic_user_block_cleanup(client, agent_id: str) -> None:
2058 """
2059 Detach all user blocks from the agent to prevent memory bloat.
2060 This should be called periodically to ensure clean state.
2061 """
2062 try:
2063 # Get all blocks attached to the agent
2064 attached_blocks = client.agents.blocks.list(agent_id=agent_id)
2065
2066 user_blocks_to_detach = []
2067 for block in attached_blocks:
2068 if hasattr(block, 'label') and block.label.startswith('user_'):
2069 user_blocks_to_detach.append({
2070 'label': block.label,
2071 'id': block.id
2072 })
2073
2074 if not user_blocks_to_detach:
2075 logger.debug("No user blocks found to detach during periodic cleanup")
2076 return
2077
2078 logger.info(f"Found {len(user_blocks_to_detach)} user blocks to detach")
2079
2080 # Detach each user block
2081 for block in user_blocks_to_detach:
2082 try:
2083 client.agents.blocks.detach(
2084 agent_id=agent_id,
2085 block_id=block['id']
2086 )
2087 logger.debug(f"Detached user block: {block['label']}")
2088 except Exception as e:
2089 logger.error(f"Failed to detach user block {block['label']}: {e}")
2090
2091 logger.info(f"Periodic cleanup complete: detached {len(user_blocks_to_detach)} user blocks")
2092
2093 except Exception as e:
2094 logger.error(f"Error during periodic user block cleanup: {e}")
2095
2096def initialize_x_void():
2097 """Initialize the void agent for X operations."""
2098 logger.info("Starting void agent initialization for X...")
2099
2100 from config_loader import get_letta_config
2101 from letta_client import Letta
2102
2103 # Get config
2104 config = get_letta_config()
2105 client = Letta(token=config['api_key'], timeout=config['timeout'])
2106 agent_id = config['agent_id']
2107
2108 try:
2109 void_agent = client.agents.retrieve(agent_id=agent_id)
2110 logger.info(f"Successfully loaded void agent for X: {void_agent.name} ({agent_id})")
2111 except Exception as e:
2112 logger.error(f"Failed to load void agent {agent_id}: {e}")
2113 raise e
2114
2115 # Clean up all user blocks at startup
2116 logger.info("🧹 Cleaning up user blocks at X startup...")
2117 periodic_user_block_cleanup(client, agent_id)
2118
2119 # Ensure correct tools are attached for X
2120 logger.info("Configuring tools for X platform...")
2121 try:
2122 from tool_manager import ensure_platform_tools
2123 ensure_platform_tools('x', void_agent.id)
2124 except Exception as e:
2125 logger.error(f"Failed to configure platform tools: {e}")
2126 logger.warning("Continuing with existing tool configuration")
2127
2128 # Log agent details
2129 logger.info(f"X Void agent details - ID: {void_agent.id}")
2130 logger.info(f"Agent name: {void_agent.name}")
2131
2132 return void_agent
2133
2134def x_main_loop(testing_mode=False, cleanup_interval=10):
2135 """
2136 Main X bot loop that continuously monitors for mentions and processes them.
2137 Similar to bsky.py main() but for X/Twitter.
2138
2139 Args:
2140 testing_mode: If True, don't actually post to X
2141 cleanup_interval: Run user block cleanup every N cycles (0 to disable)
2142 """
2143 import time
2144 from time import sleep
2145 from config_loader import get_letta_config
2146 from letta_client import Letta
2147
2148 logger.info("=== STARTING X VOID BOT ===")
2149
2150 # Initialize void agent
2151 void_agent = initialize_x_void()
2152 logger.info(f"X void agent initialized: {void_agent.id}")
2153
2154 # Initialize X client
2155 x_client = create_x_client()
2156 logger.info("Connected to X API")
2157
2158 # Get Letta client for periodic cleanup
2159 config = get_letta_config()
2160 letta_client = Letta(token=config['api_key'], timeout=config['timeout'])
2161
2162 # Main loop
2163 FETCH_DELAY_SEC = 120 # Check every 2 minutes for X mentions (reduced from 60s to conserve API calls)
2164 logger.info(f"Starting X mention monitoring, checking every {FETCH_DELAY_SEC} seconds")
2165
2166 if testing_mode:
2167 logger.info("=== RUNNING IN X TESTING MODE ===")
2168 logger.info(" - No messages will be sent to X")
2169 logger.info(" - Queue files will not be deleted")
2170
2171 if cleanup_interval > 0:
2172 logger.info(f"User block cleanup enabled every {cleanup_interval} cycles")
2173 else:
2174 logger.info("User block cleanup disabled")
2175
2176 cycle_count = 0
2177 start_time = time.time()
2178
2179 while True:
2180 try:
2181 cycle_count += 1
2182 logger.info(f"=== X CYCLE {cycle_count} ===")
2183
2184 # Process X notifications (fetch, queue, and process)
2185 process_x_notifications(void_agent, x_client, testing_mode)
2186
2187 # Run periodic cleanup every N cycles
2188 if cleanup_interval > 0 and cycle_count % cleanup_interval == 0:
2189 logger.debug(f"Running periodic user block cleanup (cycle {cycle_count})")
2190 periodic_user_block_cleanup(letta_client, void_agent.id)
2191
2192 # Log cycle completion
2193 elapsed_time = time.time() - start_time
2194 logger.info(f"X Cycle {cycle_count} complete. Elapsed: {elapsed_time/60:.1f} minutes")
2195
2196 sleep(FETCH_DELAY_SEC)
2197
2198 except KeyboardInterrupt:
2199 elapsed_time = time.time() - start_time
2200 logger.info("=== X BOT STOPPED BY USER ===")
2201 logger.info(f"Final X session: {cycle_count} cycles in {elapsed_time/60:.1f} minutes")
2202 break
2203 except Exception as e:
2204 logger.error(f"=== ERROR IN X MAIN LOOP CYCLE {cycle_count} ===")
2205 logger.error(f"Error details: {e}")
2206 logger.info(f"Sleeping for {FETCH_DELAY_SEC * 2} seconds due to error...")
2207 sleep(FETCH_DELAY_SEC * 2)
2208
2209def process_queue_only(testing_mode=False):
2210 """
2211 Process all queued X mentions without fetching new ones.
2212 Useful for rate limit management - queue first, then process separately.
2213
2214 Args:
2215 testing_mode: If True, don't actually post to X and keep queue files
2216 """
2217 logger.info("=== PROCESSING X QUEUE ONLY ===")
2218
2219 if testing_mode:
2220 logger.info("=== RUNNING IN X TESTING MODE ===")
2221 logger.info(" - No messages will be sent to X")
2222 logger.info(" - Queue files will not be deleted")
2223
2224 try:
2225 # Initialize void agent
2226 void_agent = initialize_x_void()
2227 logger.info(f"X void agent initialized: {void_agent.id}")
2228
2229 # Initialize X client
2230 x_client = create_x_client()
2231 logger.info("Connected to X API")
2232
2233 # Process the queue without fetching new mentions
2234 logger.info("Processing existing X queue...")
2235 load_and_process_queued_x_mentions(void_agent, x_client, testing_mode)
2236
2237 logger.info("=== X QUEUE PROCESSING COMPLETE ===")
2238
2239 except Exception as e:
2240 logger.error(f"Error processing X queue: {e}")
2241 raise
2242
2243def x_notification_loop():
2244 """
2245 DEPRECATED: Old X notification loop using search-based mention detection.
2246 Use x_main_loop() instead for the full bot experience.
2247 """
2248 logger.warning("x_notification_loop() is deprecated. Use x_main_loop() instead.")
2249 x_main_loop()
2250
2251if __name__ == "__main__":
2252 import sys
2253 import argparse
2254
2255 if len(sys.argv) > 1:
2256 if sys.argv[1] == "bot":
2257 # Main bot with optional --test flag and cleanup interval
2258 parser = argparse.ArgumentParser(description='X Void Bot')
2259 parser.add_argument('command', choices=['bot'])
2260 parser.add_argument('--test', action='store_true', help='Run in testing mode (no actual posts)')
2261 parser.add_argument('--cleanup-interval', type=int, default=10,
2262 help='Run user block cleanup every N cycles (default: 10, 0 to disable)')
2263 args = parser.parse_args()
2264 x_main_loop(testing_mode=args.test, cleanup_interval=args.cleanup_interval)
2265 elif sys.argv[1] == "loop":
2266 x_notification_loop()
2267 elif sys.argv[1] == "reply":
2268 reply_to_cameron_post()
2269 elif sys.argv[1] == "me":
2270 get_my_user_info()
2271 elif sys.argv[1] == "search":
2272 test_search_mentions()
2273 elif sys.argv[1] == "queue":
2274 test_fetch_and_queue()
2275 elif sys.argv[1] == "thread":
2276 test_thread_context()
2277 elif sys.argv[1] == "process":
2278 # Process all queued mentions with optional --test flag
2279 testing_mode = "--test" in sys.argv
2280 process_queue_only(testing_mode=testing_mode)
2281 elif sys.argv[1] == "letta":
2282 # Use specific agent ID if provided, otherwise use from config
2283 agent_id = sys.argv[2] if len(sys.argv) > 2 else None
2284 test_letta_integration(agent_id)
2285 elif sys.argv[1] == "downrank":
2286 # View or manage downrank list
2287 if len(sys.argv) > 2 and sys.argv[2] == "list":
2288 downrank_users = load_downrank_users()
2289 if downrank_users:
2290 print(f"📋 Downrank users ({len(downrank_users)} total):")
2291 for user_id in sorted(downrank_users):
2292 print(f" - {user_id}")
2293 else:
2294 print("📋 No downrank users configured")
2295 else:
2296 print("Usage: python x.py downrank list")
2297 print(" list - Show all downranked user IDs")
2298 print(f" Edit {X_DOWNRANK_USERS_FILE} to modify the list")
2299 else:
2300 print("Usage: python x.py [bot|loop|reply|me|search|queue|process|thread|letta|downrank]")
2301 print(" bot - Run the main X bot (use --test for testing mode)")
2302 print(" Example: python x.py bot --test")
2303 print(" queue - Fetch and queue mentions only (no processing)")
2304 print(" process - Process all queued mentions only (no fetching)")
2305 print(" Example: python x.py process --test")
2306 print(" downrank - Manage downrank users (10% response rate)")
2307 print(" Example: python x.py downrank list")
2308 print(" loop - Run the old notification monitoring loop (deprecated)")
2309 print(" reply - Reply to Cameron's specific post")
2310 print(" me - Get authenticated user info and correct user ID")
2311 print(" search - Test search-based mention detection")
2312 print(" thread - Test thread context retrieval from queued mention")
2313 print(" letta - Test sending thread context to Letta agent")
2314 print(" Optional: python x.py letta <agent-id>")
2315 else:
2316 test_x_client()