this repo has no description
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

Fix batching timeout by removing broken async executor

The batching feature was timing out due to run_in_executor not properly
executing the sync function from async context. Simplified by calling
the sync function directly, which is fine since we process one batch
at a time anyway.

Additional improvements:
- Add Ctrl+C signal handling to properly kill all agents in parallel mode
- Replace console.print with proper logger calls throughout
- Remove emojis from logging output for cleaner multi-agent logs
- Add publish_to_thought_stream tool that auto-registers on startup
- Tool uses raw requests instead of atproto client for simplicity
- Tool accepts single string (max 5000 chars) instead of list
- Upsert tool on startup to ensure latest version is always used

Batching now works correctly with any batch_size setting.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

+333 -146
+30 -6
run_agent.sh
··· 168 168 echo -e "${GREEN}🔄 Starting ${#AGENT_NAMES[@]} agent listeners in parallel${NC}" 169 169 PIDS=() 170 170 171 + # Set up trap to kill all background processes on exit 172 + cleanup() { 173 + echo -e "\n${YELLOW}🛑 Stopping all agents...${NC}" 174 + for pid in "${PIDS[@]}"; do 175 + if kill -0 $pid 2>/dev/null; then 176 + kill $pid 2>/dev/null 177 + fi 178 + done 179 + wait 180 + echo -e "${GREEN}✓ All agents stopped${NC}" 181 + exit 0 182 + } 183 + trap cleanup SIGINT SIGTERM 184 + 171 185 for AGENT_NAME in "${AGENT_NAMES[@]}"; do 172 186 AGENT_CONFIG="${AGENTS_DIR}/${AGENT_NAME}.yaml" 173 187 echo -e "${BLUE}📄 Starting $AGENT_NAME: $AGENT_CONFIG${NC}" ··· 183 197 echo -e "${GREEN}✓ All agents started. Press Ctrl+C to stop all.${NC}" 184 198 185 199 # Wait for all background processes 186 - for pid in "${PIDS[@]}"; do 187 - wait $pid 188 - done 200 + wait 189 201 fi 190 202 exit 0 191 203 fi ··· 242 254 echo -e "${GREEN}🚀 Starting ${#AGENT_NAMES[@]} agents in parallel${NC}" 243 255 PIDS=() 244 256 257 + # Set up trap to kill all background processes on exit 258 + cleanup() { 259 + echo -e "\n${YELLOW}🛑 Stopping all agents...${NC}" 260 + for pid in "${PIDS[@]}"; do 261 + if kill -0 $pid 2>/dev/null; then 262 + kill $pid 2>/dev/null 263 + fi 264 + done 265 + wait 266 + echo -e "${GREEN}✓ All agents stopped${NC}" 267 + exit 0 268 + } 269 + trap cleanup SIGINT SIGTERM 270 + 245 271 for AGENT_NAME in "${AGENT_NAMES[@]}"; do 246 272 AGENT_CONFIG="${AGENTS_DIR}/${AGENT_NAME}.yaml" 247 273 echo -e "${BLUE}📄 Starting $AGENT_NAME: $AGENT_CONFIG${NC}" ··· 257 283 echo -e "${GREEN}✓ All agents started. Press Ctrl+C to stop all.${NC}" 258 284 259 285 # Wait for all background processes 260 - for pid in "${PIDS[@]}"; do 261 - wait $pid 262 - done 286 + wait 263 287 fi 264 288 } 265 289
+61 -64
src/jetstream_letta_bridge.py
··· 138 138 ping_timeout=10, 139 139 close_timeout=10 140 140 ) 141 - console.print(f"[{self.agent_name}] ✓ Connected to jetstream") 141 + logger.info(f"[{self.agent_name}] Connected to jetstream") 142 142 return True 143 143 144 144 except Exception as e: 145 - console.print(f"[{self.agent_name}] ❌ Failed to connect: {e}") 145 + logger.error(f"[{self.agent_name}] Failed to connect: {e}") 146 146 return False 147 147 148 148 async def disconnect_jetstream(self) -> None: ··· 242 242 if is_first_message: 243 243 self.queue_first_message_time = time.time() 244 244 245 - console.print(f"[{self.agent_name}] 📨 Queued: @{blip_message.author_handle} (queue: {len(self.message_queue)})") 245 + logger.info(f"[{self.agent_name}] Queued: @{blip_message.author_handle} (queue: {len(self.message_queue)})") 246 246 logger.debug(f"[{self.agent_name}] Content: {blip_message.content[:100]}...") 247 247 248 248 # Check if we should process immediately or wait for batch ··· 277 277 # Check if we still have messages to process and not already processing 278 278 async with self.queue_lock: 279 279 if self.message_queue and not self.processing_queue: 280 - console.print(f"[{self.agent_name}] ⏰ Timeout flush: {len(self.message_queue)} messages") 280 + logger.info(f"[{self.agent_name}] Timeout flush: {len(self.message_queue)} messages") 281 281 # Process outside the lock 282 282 else: 283 283 logger.debug(f"[{self.agent_name}] Flush skipped - queue empty or already processing") ··· 319 319 # Single message - use the XML format directly 320 320 item = items_to_process[0] 321 321 prompt = item['prompt'] # This is already in XML format from format_display() 322 - console.print(f"[{self.agent_name}] 🤖 → Agent") 322 + logger.info(f"[{self.agent_name}] Sending to agent") 323 323 logger.debug(f"[{self.agent_name}] Prompt: {prompt[:200]}...") 324 324 else: 325 325 # Multiple messages - combine XML blocks ··· 328 328 xml_blocks.append(item['prompt']) # Each prompt is already XML formatted 329 329 330 330 prompt = f"You received {len(items_to_process)} messages:\n\n" + "\n\n".join(xml_blocks) 331 - console.print(f"[{self.agent_name}] 🤖 → Agent ({len(items_to_process)} messages)") 331 + logger.info(f"[{self.agent_name}] Sending {len(items_to_process)} messages to agent") 332 332 333 333 try: 334 - # Send to agent with streaming handler (run in executor to avoid blocking) 335 - loop = asyncio.get_event_loop() 336 - await loop.run_in_executor( 337 - None, 338 - self.letta_integration.send_message_to_agent, 334 + # Send to agent - call directly (blocking is fine, we process one at a time) 335 + logger.debug(f"[{self.agent_name}] Calling send_message_to_agent directly...") 336 + 337 + result = self.letta_integration.send_message_to_agent( 339 338 prompt, 340 339 self.agent_stream_handler 341 340 ) 342 - console.print(f"[{self.agent_name}] ✓ Agent processing complete") 341 + logger.info(f"[{self.agent_name}] Agent processing complete, result: {result}") 343 342 self.messages_sent_to_agent += len(items_to_process) 344 343 345 344 # Show updated statistics 346 345 elapsed = time.time() - self.start_time 347 346 rate = self.blips_received / (elapsed / 60) if elapsed > 0 else 0 348 - console.print(f"[{self.agent_name}] 📊 {self.blips_received} received | {self.messages_sent_to_agent} sent | {self.blips_published} published ({rate:.1f}/min)") 347 + logger.info(f"[{self.agent_name}] Stats: {self.blips_received} received | {self.messages_sent_to_agent} sent | {self.blips_published} published ({rate:.1f}/min)") 349 348 350 349 except Exception as e: 351 350 error_name = type(e).__name__ 352 - console.print(f"[{self.agent_name}] ❌ {error_name}: {str(e)[:100]}") 351 + logger.error(f"[{self.agent_name}] {error_name}: {str(e)[:100]}") 353 352 logger.error(f"[{self.agent_name}] Agent communication error: {e}") 354 353 # Continue processing - don't let one error stop the bridge 355 354 finally: ··· 380 379 message = args.get('message', '') 381 380 382 381 if message_type == 'assistant_message': 383 - console.print(f"[{self.agent_name}] 📢 Response: {message[:80]}{'...' if len(message) > 80 else ''}") 382 + logger.info(f"[{self.agent_name}] Response: {message[:80]}{'...' if len(message) > 80 else ''}") 384 383 logger.debug(f"[{self.agent_name}] Publishing: {message[:50]}...") 385 384 else: 386 385 logger.debug(f"[{self.agent_name}] Ignoring {tool_name} type: {message_type}") ··· 390 389 logger.debug(f"[{self.agent_name}] Tool: {tool_name}") 391 390 392 391 elif chunk.message_type == 'tool_return_message': 393 - status_icon = "✓" if chunk.status == 'success' else "✗" 392 + status_icon = "" if chunk.status == 'success' else "✗" 394 393 logger.debug(f"[{self.agent_name}] {status_icon} {chunk.name} -> {chunk.status}") 395 394 396 395 elif chunk.message_type == 'assistant_message': 397 - console.print(f"[{self.agent_name}] 💬 {chunk.content[:80]}{'...' if len(chunk.content) > 80 else ''}") 398 - logger.debug(f"[{self.agent_name}] Direct message: {chunk.content[:100]}...") 396 + logger.info(f"[{self.agent_name}] Direct message: {chunk.content[:80]}{'...' if len(chunk.content) > 80 else ''}") 397 + logger.debug(f"[{self.agent_name}] Full content: {chunk.content[:100]}...") 399 398 else: 400 399 logger.debug("Chunk has no message_type") 401 400 402 401 def agent_batch_callback(self, messages: List[str]) -> None: 403 402 """Handle published messages from the agent.""" 404 403 logger.debug(f"[{self.agent_name}] Publishing {len(messages)} messages") 405 - console.print(f"[{self.agent_name}] 🚀 Publishing {len(messages)} blips...") 404 + logger.info(f"[{self.agent_name}] Publishing {len(messages)} blips...") 406 405 407 406 # Publish the messages 408 407 results = self.letta_integration.publish_messages_as_blips(messages) ··· 412 411 successful = sum(1 for r in results if r is not None) 413 412 414 413 if successful == len(messages): 415 - console.print(f"[{self.agent_name}] ✓ Published all {len(messages)} blips") 414 + logger.info(f"[{self.agent_name}] Published all {len(messages)} blips") 416 415 else: 417 - console.print(f"[{self.agent_name}] ⚠ Published {successful}/{len(messages)} blips") 416 + logger.warning(f"[{self.agent_name}] Published {successful}/{len(messages)} blips") 418 417 419 418 async def listen_jetstream(self) -> None: 420 419 """Listen for messages on the jetstream websocket.""" ··· 439 438 self.letta_integration.set_batch_callback(self.agent_batch_callback) 440 439 441 440 # Authenticate with Bluesky 442 - logger.debug(f"[{self.agent_name}] Authenticating with Bluesky...") 441 + logger.info(f"[{self.agent_name}] Authenticating with Bluesky...") 443 442 if not self.letta_integration.authenticate_bluesky(): 444 - console.print(f"[{self.agent_name}] ❌ Failed to authenticate with Bluesky") 443 + logger.error(f"[{self.agent_name}] Failed to authenticate with Bluesky") 445 444 return 446 - logger.debug(f"[{self.agent_name}] Authenticated with Bluesky") 445 + logger.info(f"[{self.agent_name}] Authenticated with Bluesky") 447 446 448 447 # Store agent DID for filtering 449 448 self.agent_did = self.letta_integration.blip_publisher.user_did 450 449 if self.agent_did: 451 - logger.debug(f"Agent DID: {self.agent_did}") 450 + logger.debug(f"[{self.agent_name}] Agent DID: {self.agent_did}") 452 451 else: 453 - logger.warning("Could not retrieve agent DID - own message filtering disabled") 454 - 452 + logger.warning(f"[{self.agent_name}] Could not retrieve agent DID - own message filtering disabled") 453 + 454 + # Check and register thought stream tool 455 + logger.debug(f"[{self.agent_name}] Checking thought stream tool...") 456 + self.letta_integration.check_and_register_thought_stream_tool(auto_attach=True) 457 + 455 458 # Get agent info 456 459 agent_info = self.letta_integration.get_agent_info() 457 460 if agent_info: 458 - console.print(f"\n[bold blue]{self.agent_name}[/bold blue] 🤖 Connected") 459 - console.print(f" ID: {agent_info['id']}") 460 - console.print(f" Model: {agent_info['model']}") 461 - console.print(f" Tools: {', '.join(agent_info['tools'][:5])}{'...' if len(agent_info['tools']) > 5 else ''}") 461 + logger.info(f"[{self.agent_name}] Connected to agent {agent_info['id']}") 462 + logger.info(f"[{self.agent_name}] Model: {agent_info['model']}") 463 + logger.info(f"[{self.agent_name}] Tools: {', '.join(agent_info['tools'][:5])}{'...' if len(agent_info['tools']) > 5 else ''}") 462 464 463 465 while self.running: 464 466 try: ··· 474 476 await self.listen_jetstream() 475 477 476 478 except KeyboardInterrupt: 477 - console.print("\n🛑 Shutting down...") 479 + logger.info(f"[{self.agent_name}] Shutting down...") 478 480 break 479 481 except Exception as e: 480 482 logger.error(f"Unexpected error: {e}") ··· 493 495 self.reconnect_count += 1 494 496 495 497 if self.max_reconnect_attempts > 0 and self.reconnect_count > self.max_reconnect_attempts: 496 - console.print(f"❌ Max reconnect attempts ({self.max_reconnect_attempts}) exceeded") 498 + logger.error(f"[{self.agent_name}] Max reconnect attempts ({self.max_reconnect_attempts}) exceeded") 497 499 self.running = False 498 500 return 499 - 501 + 500 502 # Exponential backoff 501 503 delay = min(self.reconnect_delay * (2 ** (self.reconnect_count - 1)), 300) # Cap at 5 minutes 502 - 503 - console.print(f"⏰ Reconnecting in {delay}s (attempt {self.reconnect_count})") 504 + 505 + logger.info(f"[{self.agent_name}] Reconnecting in {delay}s (attempt {self.reconnect_count})") 504 506 await asyncio.sleep(delay) 505 507 506 508 async def stop(self) -> None: ··· 527 529 528 530 # Final statistics 529 531 elapsed = time.time() - self.start_time 530 - console.print(f"\n[{self.agent_name}] 📊 Final Stats:") 531 - console.print(f" Received: {self.blips_received}") 532 - console.print(f" Sent: {self.messages_sent_to_agent}") 533 - console.print(f" Published: {self.blips_published}") 534 - console.print(f" Runtime: {elapsed/60:.1f}m") 532 + logger.info(f"[{self.agent_name}] Final Stats: received={self.blips_received}, sent={self.messages_sent_to_agent}, published={self.blips_published}, runtime={elapsed/60:.1f}m") 535 533 if elapsed > 0: 536 - console.print(f" Rate: {self.blips_received / (elapsed / 60):.1f}/min") 534 + logger.info(f"[{self.agent_name}] Average rate: {self.blips_received / (elapsed / 60):.1f}/min") 537 535 538 536 539 537 def list_available_agents(directory: str) -> None: ··· 541 539 try: 542 540 agent_dir = Path(directory) 543 541 if not agent_dir.exists(): 544 - console.print(f"❌ Directory not found: {directory}") 542 + console.print(f" Directory not found: {directory}") 545 543 return 546 544 547 545 configs = list(agent_dir.glob("*.yaml")) + list(agent_dir.glob("*.yml")) 548 546 549 547 if not configs: 550 - console.print(f"📁 No agent configurations found in: {directory}") 548 + console.print(f" No agent configurations found in: {directory}") 551 549 console.print(" Looking for *.yaml or *.yml files") 552 550 return 553 551 554 - console.print(f"🤖 Available agent configurations in {directory}:") 552 + console.print(f" Available agent configurations in {directory}:") 555 553 console.print() 556 554 557 555 for config_path in sorted(configs): ··· 565 563 batch_size = config.get('agent', {}).get('batch_size', 1) 566 564 wanted_dids = config.get('jetstream', {}).get('wanted_dids', []) 567 565 568 - console.print(f" 📄 {config_path.stem}") 566 + console.print(f" {config_path.stem}") 569 567 console.print(f" Name: {agent_name}") 570 568 console.print(f" ID: {agent_id}") 571 569 console.print(f" Batch size: {batch_size}") ··· 576 574 console.print() 577 575 578 576 except Exception as e: 579 - console.print(f" ❌ {config_path.stem} (Error loading: {str(e)})") 577 + console.print(f" {config_path.stem} (Error loading: {str(e)})") 580 578 console.print() 581 579 582 580 console.print(f"Usage: python {sys.argv[0]} --agent {directory}/AGENT_NAME.yaml") 583 581 584 582 except Exception as e: 585 - console.print(f"❌ Error listing agents: {e}") 583 + console.print(f" Error listing agents: {e}") 586 584 587 585 588 586 @click.command() ··· 606 604 try: 607 605 from setup_wizard import SetupWizard 608 606 609 - console.print("🧙 [bold blue]Starting setup wizard...[/bold blue]") 607 + console.print(" [bold blue]Starting setup wizard...[/bold blue]") 610 608 wizard = SetupWizard() 611 609 asyncio.run(wizard.run()) 612 610 return 613 611 614 612 except ImportError: 615 - console.print("❌ [bold red]Setup wizard not available.[/bold red]") 613 + console.print(" [bold red]Setup wizard not available.[/bold red]") 616 614 console.print("Make sure setup_wizard.py is in the src/ directory.") 617 615 sys.exit(1) 618 616 except Exception as e: 619 - console.print(f"❌ [bold red]Setup wizard error:[/bold red] {e}") 617 + console.print(f" [bold red]Setup wizard error:[/bold red] {e}") 620 618 sys.exit(1) 621 619 622 620 # Set up logging level ··· 629 627 630 628 if agent: 631 629 config_file = agent 632 - console.print(f"🤖 [bold blue]Using agent configuration:[/bold blue] {agent}") 630 + logger.info(f"Using agent configuration: {agent}") 633 631 elif config: 634 632 config_file = config 635 - console.print(f"⚠️ [bold yellow]Using legacy --config option:[/bold yellow] {config}") 636 - console.print(" [dim]Consider using --agent instead for better agent management[/dim]") 633 + logger.warning(f"Using legacy --config option: {config}") 634 + logger.info("Consider using --agent instead for better agent management") 637 635 else: 638 636 # Check environment variable 639 637 env_config = os.getenv('LETTA_AGENT_CONFIG') 640 638 if env_config and Path(env_config).exists(): 641 639 config_file = env_config 642 - console.print(f"🌍 [bold green]Using config from environment variable:[/bold green] {env_config}") 640 + logger.info(f"Using config from environment variable: {env_config}") 643 641 else: 644 - console.print("❌ [bold red]No configuration specified![/bold red]") 645 - console.print(" Use --agent PATH_TO_AGENT.yaml or --config PATH_TO_CONFIG.yaml") 646 - console.print(" Or set LETTA_AGENT_CONFIG environment variable") 647 - console.print(" Use --list-agents DIRECTORY to see available agents") 642 + logger.error("No configuration specified!") 643 + logger.error("Use --agent PATH_TO_AGENT.yaml or --config PATH_TO_CONFIG.yaml") 644 + logger.error("Or set LETTA_AGENT_CONFIG environment variable") 645 + logger.error("Use --list-agents DIRECTORY to see available agents") 648 646 sys.exit(1) 649 647 650 648 # Load configuration ··· 661 659 bridge = JetstreamLettaBridge(app_config) 662 660 663 661 # Run the bridge 664 - console.print("🌉 Starting Jetstream-Letta Bridge") 665 - console.print(" Monitoring jetstream for blips → sending to agent → publishing responses") 662 + logger.info("Starting Jetstream-Letta Bridge") 663 + logger.info("Monitoring jetstream for blips → sending to agent → publishing responses") 666 664 asyncio.run(bridge.run_with_reconnect()) 667 - 665 + 668 666 except KeyboardInterrupt: 669 - console.print("\n🛑 Interrupted by user") 667 + logger.info("Interrupted by user") 670 668 except Exception as e: 671 - console.print(f"❌ Fatal error: {e}") 672 669 logger.error(f"Fatal error: {e}") 673 670 sys.exit(1) 674 671
+141 -76
src/letta_integration.py
··· 84 84 except Exception as e: 85 85 logger.error(f"Failed to authenticate with Bluesky: {e}") 86 86 return False 87 - 87 + 88 + def check_and_register_thought_stream_tool(self, auto_attach: bool = False) -> bool: 89 + """ 90 + Check if the thought stream tool is attached to the agent, and optionally attach it. 91 + 92 + Args: 93 + auto_attach: If True, automatically attach the tool without prompting 94 + 95 + Returns: 96 + True if tool is attached (or was successfully attached), False otherwise 97 + """ 98 + try: 99 + # Import thought stream tool 100 + import sys 101 + from pathlib import Path 102 + 103 + # Add src directory to path if running from different location 104 + src_path = Path(__file__).parent 105 + if str(src_path) not in sys.path: 106 + sys.path.insert(0, str(src_path)) 107 + 108 + from tools.thought_stream import publish_to_thought_stream, ThoughtStreamArgs 109 + 110 + # Upsert the tool (creates if not exists, updates if exists) 111 + logger.info(f"[{self.agent_name}] Upserting thought stream tool...") 112 + tool = self.letta_client.tools.upsert_from_function( 113 + func=publish_to_thought_stream, 114 + args_schema=ThoughtStreamArgs, 115 + tags=["thought-stream", "publish", "blip"] 116 + ) 117 + logger.info(f"[{self.agent_name}] Tool ready: {tool.id}") 118 + 119 + # Check if already attached 120 + current_tools = self.letta_client.agents.tools.list(agent_id=self.agent_id) 121 + tool_names = [t.name for t in current_tools] 122 + 123 + if 'publish_to_thought_stream' not in tool_names: 124 + # Attach to agent 125 + logger.info(f"[{self.agent_name}] Attaching tool to agent...") 126 + self.letta_client.agents.tools.attach( 127 + agent_id=self.agent_id, 128 + tool_id=str(tool.id) 129 + ) 130 + logger.info(f"[{self.agent_name}] Tool attached successfully") 131 + else: 132 + logger.debug(f"[{self.agent_name}] Tool already attached") 133 + 134 + logger.info(f"[{self.agent_name}] Successfully attached thought stream tool") 135 + return True 136 + 137 + except ImportError as e: 138 + logger.error(f"Failed to import thought stream tool: {e}") 139 + return False 140 + except Exception as e: 141 + logger.error(f"Failed to register thought stream tool: {e}") 142 + return False 143 + 88 144 def process_tool_call_chunk(self, chunk: Any) -> Optional[str]: 89 145 """ 90 146 Process a tool call chunk and extract send_message content. ··· 234 290 Returns: 235 291 Streaming response object 236 292 """ 237 - from httpx import ReadTimeout 238 - import time 293 + try: 294 + logger.info(f"[{self.agent_name}] FUNCTION ENTRY - send_message_to_agent called") 295 + from httpx import ReadTimeout 296 + import time 239 297 240 - last_error = None 241 - for attempt in range(max_retries + 1): 242 - try: 243 - if attempt > 0: 244 - delay = min(2 ** attempt, 10) # Exponential backoff, max 10s 245 - logger.warning(f"[{self.agent_name}] Retry attempt {attempt}/{max_retries} after {delay}s delay...") 246 - time.sleep(delay) 298 + last_error = None 299 + for attempt in range(max_retries + 1): 300 + try: 301 + if attempt > 0: 302 + delay = min(2 ** attempt, 10) # Exponential backoff, max 10s 303 + logger.warning(f"[{self.agent_name}] Retry attempt {attempt}/{max_retries} after {delay}s delay...") 304 + time.sleep(delay) 247 305 248 - logger.debug(f"[{self.agent_name}] Sending to agent: {message[:100]}...") 306 + logger.info(f"[{self.agent_name}] Entered send_message_to_agent, message length: {len(message)}") 307 + logger.debug(f"[{self.agent_name}] Sending to agent: {message[:100]}...") 308 + 309 + # Create streaming request 310 + logger.info(f"[{self.agent_name}] Calling create_stream API...") 311 + message_stream = self.letta_client.agents.messages.create_stream( 312 + agent_id=self.agent_id, 313 + messages=[{"role": "user", "content": message}], 314 + stream_tokens=False, # Step streaming only 315 + max_steps=self.max_steps 316 + ) 317 + logger.info(f"[{self.agent_name}] Stream created, processing chunks...") 249 318 250 - # Create streaming request 251 - logger.debug(f"[{self.agent_name}] Calling create_stream API...") 252 - message_stream = self.letta_client.agents.messages.create_stream( 253 - agent_id=self.agent_id, 254 - messages=[{"role": "user", "content": message}], 255 - stream_tokens=False, # Step streaming only 256 - max_steps=self.max_steps 257 - ) 258 - logger.debug(f"[{self.agent_name}] Stream created, processing chunks...") 319 + # Process streaming response 320 + chunk_count = 0 321 + logger.info(f"[{self.agent_name}] Starting to iterate over chunks...") 322 + for chunk in message_stream: 323 + chunk_count += 1 324 + if chunk_count == 1: 325 + logger.info(f"[{self.agent_name}] Received first chunk") 326 + if chunk_count % 10 == 0: 327 + logger.debug(f"[{self.agent_name}] Processed {chunk_count} chunks...") 259 328 260 - # Process streaming response 261 - chunk_count = 0 262 - for chunk in message_stream: 263 - chunk_count += 1 264 - if chunk_count == 1: 265 - logger.debug(f"[{self.agent_name}] Received first chunk") 329 + # Handle streaming chunk 330 + if stream_handler: 331 + try: 332 + stream_handler(chunk) 333 + except Exception as e: 334 + logger.error(f"Error in stream handler: {e}") 266 335 267 - # Handle streaming chunk 268 - if stream_handler: 269 - try: 270 - stream_handler(chunk) 271 - except Exception as e: 272 - logger.error(f"Error in stream handler: {e}") 336 + # Check for send_message tool calls 337 + if hasattr(chunk, 'message_type') and chunk.message_type == 'tool_call_message': 338 + message_content = self.process_tool_call_chunk(chunk) 339 + if message_content: 340 + self.add_message_to_batch(message_content) 273 341 274 - # Check for send_message tool calls 275 - if hasattr(chunk, 'message_type') and chunk.message_type == 'tool_call_message': 276 - message_content = self.process_tool_call_chunk(chunk) 277 - if message_content: 278 - self.add_message_to_batch(message_content) 342 + # Check for direct assistant messages (not via tool) 343 + elif hasattr(chunk, 'message_type') and chunk.message_type == 'assistant_message': 344 + if hasattr(chunk, 'content') and chunk.content: 345 + logger.debug(f"Direct assistant message: {chunk.content[:100]}...") 346 + self.add_message_to_batch(chunk.content) 279 347 280 - # Check for direct assistant messages (not via tool) 281 - elif hasattr(chunk, 'message_type') and chunk.message_type == 'assistant_message': 282 - if hasattr(chunk, 'content') and chunk.content: 283 - logger.debug(f"Direct assistant message: {chunk.content[:100]}...") 284 - self.add_message_to_batch(chunk.content) 348 + # Log chunk information 349 + if hasattr(chunk, 'message_type'): 350 + if chunk.message_type == 'reasoning_message': 351 + logger.debug(f"Reasoning: {chunk.reasoning[:100]}...") 352 + elif chunk.message_type == 'tool_call_message': 353 + tool_name = chunk.tool_call.name 354 + logger.debug(f"Tool call: {tool_name}") 355 + elif chunk.message_type == 'tool_return_message': 356 + status = chunk.status 357 + logger.debug(f"Tool result: {chunk.name} -> {status}") 358 + elif chunk.message_type == 'assistant_message': 359 + logger.debug(f"Assistant: {chunk.content[:100]}...") 285 360 286 - # Log chunk information 287 - if hasattr(chunk, 'message_type'): 288 - if chunk.message_type == 'reasoning_message': 289 - logger.debug(f"Reasoning: {chunk.reasoning[:100]}...") 290 - elif chunk.message_type == 'tool_call_message': 291 - tool_name = chunk.tool_call.name 292 - logger.debug(f"Tool call: {tool_name}") 293 - elif chunk.message_type == 'tool_return_message': 294 - status = chunk.status 295 - logger.debug(f"Tool result: {chunk.name} -> {status}") 296 - elif chunk.message_type == 'assistant_message': 297 - logger.debug(f"Assistant: {chunk.content[:100]}...") 361 + if str(chunk) == 'done': 362 + logger.debug(f"[{self.agent_name}] Received 'done' signal") 363 + break 298 364 299 - if str(chunk) == 'done': 300 - logger.debug(f"[{self.agent_name}] Received 'done' signal") 301 - break 365 + logger.debug(f"[{self.agent_name}] Processed {chunk_count} chunks") 302 366 303 - logger.debug(f"[{self.agent_name}] Processed {chunk_count} chunks") 367 + # Flush any remaining messages in batch 368 + self.flush_batch() 304 369 305 - # Flush any remaining messages in batch 306 - self.flush_batch() 370 + logger.debug(f"[{self.agent_name}] Agent processing completed") 371 + return message_stream 307 372 308 - logger.debug(f"[{self.agent_name}] Agent processing completed") 309 - return message_stream 373 + except ReadTimeout as e: 374 + last_error = e 375 + logger.warning(f"[{self.agent_name}] Read timeout on attempt {attempt + 1}/{max_retries + 1}") 376 + if attempt >= max_retries: 377 + logger.error(f"[{self.agent_name}] Max retries exceeded, giving up") 378 + raise 379 + # Continue to next retry 380 + continue 310 381 311 - except ReadTimeout as e: 312 - last_error = e 313 - logger.warning(f"[{self.agent_name}] Read timeout on attempt {attempt + 1}/{max_retries + 1}") 314 - if attempt >= max_retries: 315 - logger.error(f"[{self.agent_name}] Max retries exceeded, giving up") 382 + except Exception as e: 383 + logger.error(f"[{self.agent_name}] Error sending message to agent: {e}", exc_info=True) 316 384 raise 317 - # Continue to next retry 318 - continue 319 385 320 - except Exception as e: 321 - logger.error(f"[{self.agent_name}] Error sending message to agent: {e}", exc_info=True) 322 - raise 323 - 324 - # Should not reach here, but just in case 325 - if last_error: 326 - raise last_error 386 + # Should not reach here, but just in case 387 + if last_error: 388 + raise last_error 389 + except Exception as e: 390 + logger.error(f"[{self.agent_name}] OUTER EXCEPTION in send_message_to_agent: {e}", exc_info=True) 391 + raise 327 392 328 393 def get_agent_info(self) -> Optional[Dict[str, Any]]: 329 394 """
+1
src/tools/__init__.py
··· 1 + """Tools for Thought Stream agents."""
+100
src/tools/thought_stream.py
··· 1 + """Tool for publishing to Thought Stream.""" 2 + import os 3 + from pydantic import BaseModel, Field, validator 4 + 5 + 6 + class ThoughtStreamArgs(BaseModel): 7 + """Arguments for publishing to Thought Stream.""" 8 + 9 + content: str = Field( 10 + ..., 11 + description="Your thought to publish (max 5000 characters)" 12 + ) 13 + 14 + @validator('content') 15 + def validate_content(cls, v): 16 + if not v or len(v.strip()) == 0: 17 + raise ValueError("Content cannot be empty") 18 + if len(v) > 5000: 19 + raise ValueError(f"Content exceeds 5000 character limit (current: {len(v)} characters)") 20 + return v 21 + 22 + 23 + def publish_to_thought_stream(content: str) -> str: 24 + """ 25 + Publish a thought to the Thought Stream as a blip. 26 + 27 + Use this tool to share your thoughts publicly on Thought Stream. 28 + 29 + Args: 30 + content: Your thought to publish (max 5000 characters) 31 + 32 + Returns: 33 + Success message with URI of published blip 34 + 35 + Raises: 36 + Exception: If publishing fails 37 + """ 38 + import requests 39 + from datetime import datetime, timezone 40 + 41 + try: 42 + # Validate input 43 + if not content or len(content.strip()) == 0: 44 + raise Exception("Content cannot be empty") 45 + 46 + if len(content) > 5000: 47 + raise Exception(f"Content exceeds 5000 character limit (current: {len(content)} characters)") 48 + 49 + # Get credentials from environment 50 + username = os.getenv("BSKY_USERNAME") 51 + password = os.getenv("BSKY_PASSWORD") 52 + pds_uri = os.getenv("PDS_URI", "https://bsky.social") 53 + 54 + if not username or not password: 55 + raise Exception("BSKY_USERNAME and BSKY_PASSWORD environment variables must be set") 56 + 57 + # Create session 58 + session_url = f"{pds_uri}/xrpc/com.atproto.server.createSession" 59 + session_data = { 60 + "identifier": username, 61 + "password": password 62 + } 63 + 64 + session_response = requests.post(session_url, json=session_data, timeout=10) 65 + session_response.raise_for_status() 66 + session = session_response.json() 67 + access_token = session.get("accessJwt") 68 + user_did = session.get("did") 69 + 70 + if not access_token or not user_did: 71 + raise Exception("Failed to get access token or DID from session") 72 + 73 + # Publish blip 74 + headers = {"Authorization": f"Bearer {access_token}"} 75 + create_record_url = f"{pds_uri}/xrpc/com.atproto.repo.createRecord" 76 + 77 + # Create blip record 78 + now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 79 + record_data = { 80 + "$type": "stream.thought.blip", 81 + "content": content, 82 + "createdAt": now 83 + } 84 + 85 + # Create the record 86 + create_data = { 87 + "repo": user_did, 88 + "collection": "stream.thought.blip", 89 + "record": record_data 90 + } 91 + 92 + response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 93 + response.raise_for_status() 94 + result = response.json() 95 + 96 + uri = result.get("uri") 97 + return f"Successfully published to Thought Stream!\nURI: {uri}\nContent: {content[:100]}{'...' if len(content) > 100 else ''}" 98 + 99 + except Exception as e: 100 + raise Exception(f"Error publishing to Thought Stream: {str(e)}")