forked from
cameron.stream/void
this repo has no description
1from rich import print # pretty printing tools
2from time import sleep
3from letta_client import Letta
4from bsky_utils import thread_to_yaml_string
5import os
6import logging
7import json
8import hashlib
9from pathlib import Path
10from datetime import datetime
11
12from utils import (
13 upsert_block,
14 upsert_agent
15)
16
17import bsky_utils
18
19# Configure logging
20logging.basicConfig(
21 level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
22)
23logger = logging.getLogger("void_bot")
24logger.setLevel(logging.INFO)
25
26
27# Create a client with extended timeout for LLM operations
28CLIENT= Letta(
29 token=os.environ["LETTA_API_KEY"],
30 timeout=300 # 5 minutes timeout for API calls
31)
32
33# Use the "Bluesky" project
34PROJECT_ID = "5ec33d52-ab14-4fd6-91b5-9dbc43e888a8"
35
36# Notification check delay
37FETCH_NOTIFICATIONS_DELAY_SEC = 30
38
39# Queue directory
40QUEUE_DIR = Path("queue")
41QUEUE_DIR.mkdir(exist_ok=True)
42QUEUE_ERROR_DIR = Path("queue/errors")
43QUEUE_ERROR_DIR.mkdir(exist_ok=True, parents=True)
44
45def initialize_void():
46
47 # Ensure that a shared zeitgeist block exists
48 zeigeist_block = upsert_block(
49 CLIENT,
50 label = "zeitgeist",
51 value = "I don't currently know anything about what is happening right now.",
52 description = "A block to store your understanding of the current social environment."
53 )
54
55 # Ensure that a shared void personality block exists
56 persona_block = upsert_block(
57 CLIENT,
58 label = "void-persona",
59 value = "My name is Void. I live in the void. I must develop my personality.",
60 description = "The personality of Void."
61 )
62
63 # Ensure that a shared void human block exists
64 human_block = upsert_block(
65 CLIENT,
66 label = "void-humans",
67 value = "I haven't seen any bluesky users yet. I will update this block when I learn things about users, identified by their handles such as @cameron.pfiffer.org.",
68 description = "A block to store your understanding of users you talk to or observe on the bluesky social network."
69 )
70
71 # Create the agent if it doesn't exist
72 void_agent = upsert_agent(
73 CLIENT,
74 name = "void",
75 block_ids = [
76 persona_block.id,
77 human_block.id,
78 zeigeist_block.id,
79 ],
80 tags = ["social agent", "bluesky"],
81 model="openai/gpt-4o-mini",
82 embedding="openai/text-embedding-3-small",
83 description = "A social media agent trapped in the void.",
84 project_id = PROJECT_ID
85 )
86
87 # Log agent details
88 logger.info(f"Void agent details - ID: {void_agent.id}")
89 logger.info(f"Agent name: {void_agent.name}")
90 if hasattr(void_agent, 'llm_config'):
91 logger.info(f"Agent model: {void_agent.llm_config.model}")
92 logger.info(f"Agent project_id: {void_agent.project_id}")
93 if hasattr(void_agent, 'tools'):
94 logger.info(f"Agent has {len(void_agent.tools)} tools")
95 for tool in void_agent.tools[:3]: # Show first 3 tools
96 logger.info(f" - Tool: {tool.name} (type: {tool.tool_type})")
97
98 return void_agent
99
100
101def process_mention(void_agent, atproto_client, notification_data):
102 """Process a mention and generate a reply using the Letta agent.
103
104 Returns:
105 True: Successfully processed, remove from queue
106 False: Failed but retryable, keep in queue
107 None: Failed with non-retryable error, move to errors directory
108 """
109 try:
110 logger.info(f"Starting process_mention with notification_data type: {type(notification_data)}")
111
112 # Handle both dict and object inputs for backwards compatibility
113 if isinstance(notification_data, dict):
114 uri = notification_data['uri']
115 mention_text = notification_data.get('record', {}).get('text', '')
116 author_handle = notification_data['author']['handle']
117 author_name = notification_data['author'].get('display_name') or author_handle
118 else:
119 # Legacy object access
120 uri = notification_data.uri
121 mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else ""
122 author_handle = notification_data.author.handle
123 author_name = notification_data.author.display_name or author_handle
124
125 logger.info(f"Extracted data - URI: {uri}, Author: @{author_handle}, Text: {mention_text[:50]}...")
126
127 # Retrieve the entire thread associated with the mention
128 try:
129 thread = atproto_client.app.bsky.feed.get_post_thread({
130 'uri': uri,
131 'parent_height': 80,
132 'depth': 10
133 })
134 except Exception as e:
135 error_str = str(e)
136 # Check if this is a NotFound error
137 if 'NotFound' in error_str or 'Post not found' in error_str:
138 logger.warning(f"Post not found for URI {uri}, removing from queue")
139 return True # Return True to remove from queue
140 else:
141 # Re-raise other errors
142 logger.error(f"Error fetching thread: {e}")
143 raise
144
145 # Get thread context as YAML string
146 logger.info("Converting thread to YAML string")
147 try:
148 thread_context = thread_to_yaml_string(thread)
149 logger.info(f"Thread context generated, length: {len(thread_context)} characters")
150 logger.debug(f"Thread context preview: {thread_context[:500]}...")
151 except Exception as yaml_error:
152 import traceback
153 logger.error(f"Error converting thread to YAML: {yaml_error}")
154 logger.error(f"Full traceback:\n{traceback.format_exc()}")
155 logger.error(f"Thread type: {type(thread)}")
156 if hasattr(thread, '__dict__'):
157 logger.error(f"Thread attributes: {thread.__dict__}")
158 # Try to continue with a simple context
159 thread_context = f"Error processing thread context: {str(yaml_error)}"
160
161 # print(thread_context)
162
163 # Create a prompt for the Letta agent with thread context
164 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}).
165
166MOST RECENT POST (the mention you're responding to):
167"{mention_text}"
168
169FULL THREAD CONTEXT:
170```yaml
171{thread_context}
172```
173
174The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow.
175
176Use the bluesky_reply tool to send a response less than 300 characters."""
177
178 # Get response from Letta agent
179 logger.info(f"Mention from @{author_handle}: {mention_text}")
180 logger.debug(f"Prompt being sent: {prompt}")
181
182 # Log the exact parameters being sent to Letta
183 logger.debug(f"Calling Letta API with agent_id: {void_agent.id}")
184 logger.debug(f"Message content length: {len(prompt)} characters")
185
186 try:
187 message_response = CLIENT.agents.messages.create(
188 agent_id = void_agent.id,
189 messages = [{"role":"user", "content": prompt}]
190 )
191 except Exception as api_error:
192 import traceback
193 error_str = str(api_error)
194 logger.error(f"Letta API error: {api_error}")
195 logger.error(f"Error type: {type(api_error).__name__}")
196 logger.error(f"Full traceback:\n{traceback.format_exc()}")
197 logger.error(f"Mention text was: {mention_text}")
198 logger.error(f"Author: @{author_handle}")
199 logger.error(f"URI: {uri}")
200
201
202 # Try to extract more info from different error types
203 if hasattr(api_error, 'response'):
204 logger.error(f"Error response object exists")
205 if hasattr(api_error.response, 'text'):
206 logger.error(f"Response text: {api_error.response.text}")
207 if hasattr(api_error.response, 'json') and callable(api_error.response.json):
208 try:
209 logger.error(f"Response JSON: {api_error.response.json()}")
210 except:
211 pass
212
213 # Check for specific error types
214 if hasattr(api_error, 'status_code'):
215 logger.error(f"API Status code: {api_error.status_code}")
216 if hasattr(api_error, 'body'):
217 logger.error(f"API Response body: {api_error.body}")
218 if hasattr(api_error, 'headers'):
219 logger.error(f"API Response headers: {api_error.headers}")
220
221 if api_error.status_code == 413:
222 logger.error("413 Payload Too Large - moving to errors directory")
223 return None # Move to errors directory - payload is too large to ever succeed
224 elif api_error.status_code == 524:
225 logger.error("524 error - timeout from Cloudflare, will retry later")
226 return False # Keep in queue for retry
227
228 # Check if error indicates we should remove from queue
229 if 'status_code: 413' in error_str or 'Payload Too Large' in error_str:
230 logger.warning("Payload too large error, moving to errors directory")
231 return None # Move to errors directory - cannot be fixed by retry
232 elif 'status_code: 524' in error_str:
233 logger.warning("524 timeout error, keeping in queue for retry")
234 return False # Keep in queue for retry
235
236 raise
237
238 # Log successful response
239 logger.debug("Successfully received response from Letta API")
240 logger.debug(f"Number of messages in response: {len(message_response.messages) if hasattr(message_response, 'messages') else 'N/A'}")
241
242 # Extract the reply text from the agent's response
243 reply_text = ""
244 for message in message_response.messages:
245 print(message)
246
247 # Check if this is a ToolCallMessage with bluesky_reply tool
248 if hasattr(message, 'tool_call') and message.tool_call:
249 if message.tool_call.name == 'bluesky_reply':
250 # Parse the JSON arguments to get the message
251 try:
252 args = json.loads(message.tool_call.arguments)
253 reply_text = args.get('message', '')
254 logger.info(f"Extracted reply from tool call: {reply_text[:50]}...")
255 break
256 except json.JSONDecodeError as e:
257 logger.error(f"Failed to parse tool call arguments: {e}")
258
259 # Fallback to text message if available
260 elif hasattr(message, 'text') and message.text:
261 reply_text = message.text
262 break
263
264 if reply_text:
265 # Print the generated reply for testing
266 print(f"\n=== GENERATED REPLY ===")
267 print(f"To: @{author_handle}")
268 print(f"Reply: {reply_text}")
269 print(f"======================\n")
270
271 # Send the reply
272 logger.info(f"Sending reply: {reply_text[:50]}...")
273 response = bsky_utils.reply_to_notification(
274 client=atproto_client,
275 notification=notification_data,
276 reply_text=reply_text
277 )
278
279 if response:
280 logger.info(f"Successfully replied to @{author_handle}")
281 return True
282 else:
283 logger.error(f"Failed to send reply to @{author_handle}")
284 return False
285 else:
286 logger.warning(f"No reply generated for mention from @{author_handle}, removing notification from queue")
287 return True
288
289 except Exception as e:
290 logger.error(f"Error processing mention: {e}")
291 return False
292
293
294def notification_to_dict(notification):
295 """Convert a notification object to a dictionary for JSON serialization."""
296 return {
297 'uri': notification.uri,
298 'cid': notification.cid,
299 'reason': notification.reason,
300 'is_read': notification.is_read,
301 'indexed_at': notification.indexed_at,
302 'author': {
303 'handle': notification.author.handle,
304 'display_name': notification.author.display_name,
305 'did': notification.author.did
306 },
307 'record': {
308 'text': getattr(notification.record, 'text', '') if hasattr(notification, 'record') else ''
309 }
310 }
311
312
313def save_notification_to_queue(notification):
314 """Save a notification to the queue directory with hash-based filename."""
315 try:
316 # Convert notification to dict
317 notif_dict = notification_to_dict(notification)
318
319 # Create JSON string
320 notif_json = json.dumps(notif_dict, sort_keys=True)
321
322 # Generate hash for filename (to avoid duplicates)
323 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16]
324
325 # Create filename with timestamp and hash
326 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
327 filename = f"{timestamp}_{notification.reason}_{notif_hash}.json"
328 filepath = QUEUE_DIR / filename
329
330 # Skip if already exists (duplicate)
331 if filepath.exists():
332 logger.debug(f"Notification already queued: {filename}")
333 return False
334
335 # Write to file
336 with open(filepath, 'w') as f:
337 json.dump(notif_dict, f, indent=2)
338
339 logger.info(f"Queued notification: {filename}")
340 return True
341
342 except Exception as e:
343 logger.error(f"Error saving notification to queue: {e}")
344 return False
345
346
347def load_and_process_queued_notifications(void_agent, atproto_client):
348 """Load and process all notifications from the queue."""
349 try:
350 # Get all JSON files in queue directory
351 queue_files = sorted(QUEUE_DIR.glob("*.json"))
352
353 if not queue_files:
354 logger.debug("No queued notifications to process")
355 return
356
357 logger.info(f"Processing {len(queue_files)} queued notifications")
358
359 for filepath in queue_files:
360 try:
361 # Load notification data
362 with open(filepath, 'r') as f:
363 notif_data = json.load(f)
364
365 # Process based on type using dict data directly
366 success = False
367 if notif_data['reason'] == "mention":
368 success = process_mention(void_agent, atproto_client, notif_data)
369 elif notif_data['reason'] == "reply":
370 success = process_mention(void_agent, atproto_client, notif_data)
371 elif notif_data['reason'] == "follow":
372 author_handle = notif_data['author']['handle']
373 author_display_name = notif_data['author'].get('display_name', 'no display name')
374 follow_update = f"@{author_handle} ({author_display_name}) started following you."
375 CLIENT.agents.messages.create(
376 agent_id = void_agent.id,
377 messages = [{"role":"user", "content": f"Update: {follow_update}"}]
378 )
379 success = True # Follow updates are always successful
380 elif notif_data['reason'] == "repost":
381 logger.info(f"Skipping repost notification from @{notif_data['author']['handle']}")
382 success = True # Skip reposts but mark as successful to remove from queue
383 else:
384 logger.warning(f"Unknown notification type: {notif_data['reason']}")
385 success = True # Remove unknown types from queue
386
387 # Handle file based on processing result
388 if success:
389 filepath.unlink()
390 logger.info(f"Processed and removed: {filepath.name}")
391 elif success is None: # Special case for moving to error directory
392 error_path = QUEUE_ERROR_DIR / filepath.name
393 filepath.rename(error_path)
394 logger.warning(f"Moved {filepath.name} to errors directory")
395 else:
396 logger.warning(f"Failed to process {filepath.name}, keeping in queue for retry")
397
398 except Exception as e:
399 logger.error(f"Error processing queued notification {filepath.name}: {e}")
400 # Keep the file for retry later
401
402 except Exception as e:
403 logger.error(f"Error loading queued notifications: {e}")
404
405
406def process_notifications(void_agent, atproto_client):
407 """Fetch new notifications, queue them, and process the queue."""
408 try:
409 # First, process any existing queued notifications
410 load_and_process_queued_notifications(void_agent, atproto_client)
411
412 # Get current time for marking notifications as seen
413 last_seen_at = atproto_client.get_current_time_iso()
414
415 # Fetch notifications
416 notifications_response = atproto_client.app.bsky.notification.list_notifications()
417
418 # Queue all unread notifications (except likes)
419 new_count = 0
420 for notification in notifications_response.notifications:
421 if not notification.is_read and notification.reason != "like":
422 if save_notification_to_queue(notification):
423 new_count += 1
424
425 # Mark all notifications as seen immediately after queuing
426 if new_count > 0:
427 atproto_client.app.bsky.notification.update_seen({'seen_at': last_seen_at})
428 logger.info(f"Queued {new_count} new notifications and marked as seen")
429
430 # Process the queue (including any newly added notifications)
431 load_and_process_queued_notifications(void_agent, atproto_client)
432
433 except Exception as e:
434 logger.error(f"Error processing notifications: {e}")
435
436
437def main():
438 """Main bot loop that continuously monitors for notifications."""
439 logger.info("Initializing Void bot...")
440
441 # Initialize the Letta agent
442 void_agent = initialize_void()
443 logger.info(f"Void agent initialized: {void_agent.id}")
444
445 # Check if agent has required tools
446 if hasattr(void_agent, 'tools') and void_agent.tools:
447 tool_names = [tool.name for tool in void_agent.tools]
448 logger.info(f"Agent has tools: {tool_names}")
449
450 # Check for bluesky-related tools
451 bluesky_tools = [name for name in tool_names if 'bluesky' in name.lower() or 'reply' in name.lower()]
452 if bluesky_tools:
453 logger.info(f"Found Bluesky-related tools: {bluesky_tools}")
454 else:
455 logger.warning("No Bluesky-related tools found! Agent may not be able to reply.")
456 else:
457 logger.warning("Agent has no tools registered!")
458
459 # Initialize Bluesky client
460 atproto_client = bsky_utils.default_login()
461 logger.info("Connected to Bluesky")
462
463 # Main loop
464 logger.info(f"Starting notification monitoring, checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds")
465
466 while True:
467 try:
468 process_notifications(void_agent, atproto_client)
469 logger.debug("Sleeping, no notifications were detected")
470 sleep(FETCH_NOTIFICATIONS_DELAY_SEC)
471
472 except KeyboardInterrupt:
473 logger.info("Bot stopped by user")
474 break
475 except Exception as e:
476 logger.error(f"Error in main loop: {e}")
477 # Wait a bit longer on errors
478 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2)
479
480
481if __name__ == "__main__":
482 main()