this repo has no description
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 674 lines 28 kB view raw
1#!/usr/bin/env python3 2"""Jetstream-Letta bridge for bidirectional multi-agent communication.""" 3import asyncio 4import json 5import logging 6import os 7import signal 8import sys 9import time 10from typing import Optional, List, Dict, Any 11from datetime import datetime 12from pathlib import Path 13 14import click 15import websockets 16from rich.console import Console 17from rich.logging import RichHandler 18 19try: 20 from .letta_integration import LettaAgentIntegration 21 from .config_loader import load_config 22 from .models import JetstreamEvent, BlipRecord, BlipMessage 23 from .did_cache import DIDCache 24except ImportError: 25 # Handle running as script directly 26 import sys 27 from pathlib import Path 28 sys.path.insert(0, str(Path(__file__).parent)) 29 from letta_integration import LettaAgentIntegration 30 from config_loader import load_config 31 from models import JetstreamEvent, BlipRecord, BlipMessage 32 from did_cache import DIDCache 33 34# Set up logging 35console = Console() 36logging.basicConfig( 37 level=logging.INFO, 38 format="%(message)s", 39 datefmt="[%X]", 40 handlers=[RichHandler(console=console, rich_tracebacks=True, show_path=False)] 41) 42logger = logging.getLogger(__name__) 43 44# Suppress noisy HTTP logs from letta client 45logging.getLogger("httpx").setLevel(logging.WARNING) 46logging.getLogger("httpcore").setLevel(logging.WARNING) 47logging.getLogger("openai._base_client").setLevel(logging.WARNING) 48 49 50class JetstreamLettaBridge: 51 """Bridge connecting Jetstream blip monitoring with Letta agent processing.""" 52 53 def __init__(self, config: Dict[str, Any]): 54 """Initialize the bridge.""" 55 self.config = config 56 self.bridge_config = config.get('bridge', {}) 57 self.jetstream_config = config.get('jetstream', {}) 58 59 # Get agent name for logging 60 self.agent_name = config.get('agent', {}).get('name', 'unknown') 61 62 # Initialize components 63 self.letta_integration = LettaAgentIntegration(config) 64 self.did_cache = DIDCache( 65 max_size=config['cache']['max_cache_size'], 66 ttl=config['cache']['did_cache_ttl'] 67 ) 68 69 # Jetstream connection 70 self.websocket: Optional[websockets.WebSocketServerProtocol] = None 71 self.running = False 72 self.reconnect_count = 0 73 74 # Message processing 75 self.message_queue: List[Dict[str, Any]] = [] 76 self.queue_lock = asyncio.Lock() 77 self.processing_queue = False # Flag to prevent concurrent processing 78 self.batch_size = config.get('agent', {}).get('batch_size', 1) 79 self.queue_flush_timeout = config.get('agent', {}).get('queue_flush_timeout', 30) # seconds 80 self.queue_first_message_time: Optional[float] = None 81 self.flush_task: Optional[asyncio.Task] = None 82 83 # Configuration 84 self.wanted_dids = set(self.jetstream_config.get('wanted_dids', [])) 85 self.prompt_template = self.bridge_config.get('prompt_template', 86 "[@{author}] {content}") 87 self.include_metadata = self.bridge_config.get('include_metadata', True) 88 self.max_reconnect_attempts = self.jetstream_config.get('max_reconnect_attempts', 10) 89 self.reconnect_delay = self.jetstream_config.get('reconnect_delay', 5) 90 91 # Statistics 92 self.blips_received = 0 93 self.messages_sent_to_agent = 0 94 self.blips_published = 0 95 self.start_time = time.time() 96 97 # Agent DID (set after authentication) 98 self.agent_did: Optional[str] = None 99 100 logger.debug(f"Initialized bridge for agent {self.letta_integration.agent_id}") 101 logger.debug(f"Batch size: {self.batch_size}, flush timeout: {self.queue_flush_timeout}s") 102 103 104 def build_websocket_url(self) -> str: 105 """Build the websocket URL with query parameters.""" 106 base_url = self.jetstream_config['instance'] 107 if not base_url.endswith('/subscribe'): 108 base_url = base_url.rstrip('/') + '/subscribe' 109 110 params = [] 111 112 # Filter for stream.thought.blip collection 113 params.append("wantedCollections=stream.thought.blip") 114 115 # Add wanted DIDs if specified 116 if self.wanted_dids: 117 for did in self.wanted_dids: 118 params.append(f"wantedDids={did}") 119 120 # Add compression support 121 params.append("compress=false") 122 123 url = base_url 124 if params: 125 url += "?" + "&".join(params) 126 127 return url 128 129 async def connect_jetstream(self) -> bool: 130 """Connect to jetstream websocket.""" 131 url = self.build_websocket_url() 132 133 try: 134 logger.debug(f"[{self.agent_name}] Connecting to jetstream: {url}") 135 self.websocket = await websockets.connect( 136 url, 137 ping_interval=30, 138 ping_timeout=10, 139 close_timeout=10 140 ) 141 logger.info(f"[{self.agent_name}] Connected to jetstream") 142 return True 143 144 except Exception as e: 145 logger.error(f"[{self.agent_name}] Failed to connect: {e}") 146 return False 147 148 async def disconnect_jetstream(self) -> None: 149 """Disconnect from jetstream.""" 150 if self.websocket: 151 await self.websocket.close() 152 self.websocket = None 153 logger.debug("Disconnected from jetstream") 154 155 async def handle_jetstream_message(self, message: str) -> None: 156 """Handle incoming jetstream message.""" 157 try: 158 data = json.loads(message) 159 event = JetstreamEvent(**data) 160 161 # Only process commit events with blip records 162 if event.kind != "commit" or not event.commit: 163 return 164 165 commit = event.commit 166 if commit.collection != "stream.thought.blip": 167 return 168 169 # Skip delete operations 170 if commit.operation == "delete": 171 return 172 173 # Filter by wanted DIDs if specified 174 if self.wanted_dids and event.did not in self.wanted_dids: 175 return 176 177 # Parse blip record 178 if not commit.record: 179 logger.warning(f"No record data in commit from {event.did}") 180 return 181 182 try: 183 blip_record = BlipRecord(**commit.record) 184 except Exception as e: 185 logger.warning(f"Failed to parse blip record from {event.did}: {e}") 186 return 187 188 # Filter out own messages to prevent feedback loop 189 if self.agent_did and event.did == self.agent_did: 190 logger.debug(f"Ignoring own message from {event.did}") 191 return 192 193 # Resolve DID to profile data 194 profile_data = await self.did_cache.resolve_did(event.did) 195 if profile_data: 196 handle = profile_data.handle 197 display_name = profile_data.display_name 198 else: 199 handle = event.did # Fallback to DID 200 display_name = None 201 202 # Create blip message for processing 203 blip_message = BlipMessage( 204 author_handle=handle, 205 author_display_name=display_name, 206 author_did=event.did, 207 created_at=blip_record.created_at, 208 content=blip_record.content, 209 record_uri=f"at://{event.did}/{commit.collection}/{commit.rkey}", 210 record_cid=commit.cid 211 ) 212 213 # Queue message for agent processing 214 await self.queue_blip_for_processing(blip_message) 215 216 self.blips_received += 1 217 218 except json.JSONDecodeError as e: 219 logger.error(f"Failed to parse JSON message: {e}") 220 except Exception as e: 221 logger.error(f"Error handling jetstream message: {e}") 222 223 async def queue_blip_for_processing(self, blip_message: BlipMessage) -> None: 224 """Queue a blip message for agent processing.""" 225 should_process_immediately = False 226 227 async with self.queue_lock: 228 # Create prompt from blip 229 prompt = self.create_prompt_from_blip(blip_message) 230 231 # Add to queue with metadata 232 queue_item = { 233 'prompt': prompt, 234 'blip_message': blip_message, 235 'timestamp': time.time() 236 } 237 238 # Track first message time for timeout 239 is_first_message = len(self.message_queue) == 0 240 self.message_queue.append(queue_item) 241 242 if is_first_message: 243 self.queue_first_message_time = time.time() 244 245 logger.info(f"[{self.agent_name}] Queued: @{blip_message.author_handle} (queue: {len(self.message_queue)})") 246 logger.debug(f"[{self.agent_name}] Content: {blip_message.content[:100]}...") 247 248 # Check if we should process immediately or wait for batch 249 should_process_immediately = (self.batch_size == 1 or len(self.message_queue) >= self.batch_size) 250 should_schedule_flush = is_first_message and self.batch_size > 1 251 252 # Process immediately if needed (outside the lock to avoid deadlock) 253 if should_process_immediately: 254 await self.process_message_queue() 255 elif should_schedule_flush: 256 await self.schedule_queue_flush() 257 258 def create_prompt_from_blip(self, blip_message: BlipMessage) -> str: 259 """Create a prompt for the agent from a blip message using XML format.""" 260 # Use the standardized XML format from BlipMessage.format_display() 261 return blip_message.format_display() 262 263 async def schedule_queue_flush(self) -> None: 264 """Schedule a queue flush after the timeout period.""" 265 # Cancel any existing flush task 266 if self.flush_task and not self.flush_task.done(): 267 self.flush_task.cancel() 268 269 # Schedule new flush task 270 self.flush_task = asyncio.create_task(self._flush_after_timeout()) 271 272 async def _flush_after_timeout(self) -> None: 273 """Wait for timeout then flush the queue.""" 274 try: 275 await asyncio.sleep(self.queue_flush_timeout) 276 277 # Check if we still have messages to process and not already processing 278 async with self.queue_lock: 279 if self.message_queue and not self.processing_queue: 280 logger.info(f"[{self.agent_name}] Timeout flush: {len(self.message_queue)} messages") 281 # Process outside the lock 282 else: 283 logger.debug(f"[{self.agent_name}] Flush skipped - queue empty or already processing") 284 return 285 286 await self.process_message_queue() 287 288 except asyncio.CancelledError: 289 # Task was cancelled, which is fine 290 logger.debug(f"[{self.agent_name}] Flush task cancelled") 291 pass 292 293 async def process_message_queue(self) -> None: 294 """Process all queued messages by sending them to the agent.""" 295 # Acquire lock and check if we should process 296 async with self.queue_lock: 297 # Check if already processing or queue is empty 298 if self.processing_queue: 299 logger.debug(f"[{self.agent_name}] Already processing queue, skipping") 300 return 301 302 if not self.message_queue: 303 logger.debug(f"[{self.agent_name}] Queue is empty, nothing to process") 304 return 305 306 # Set processing flag and get items 307 self.processing_queue = True 308 items_to_process = self.message_queue.copy() 309 self.message_queue.clear() 310 # Reset queue timing 311 self.queue_first_message_time = None 312 # Cancel flush task since we're processing now 313 if self.flush_task and not self.flush_task.done(): 314 self.flush_task.cancel() 315 self.flush_task = None 316 317 # Create combined prompt for batch processing 318 if len(items_to_process) == 1: 319 # Single message - use the XML format directly 320 item = items_to_process[0] 321 prompt = item['prompt'] # This is already in XML format from format_display() 322 logger.info(f"[{self.agent_name}] Sending to agent") 323 logger.debug(f"[{self.agent_name}] Prompt: {prompt[:200]}...") 324 else: 325 # Multiple messages - combine XML blocks 326 xml_blocks = [] 327 for item in items_to_process: 328 xml_blocks.append(item['prompt']) # Each prompt is already XML formatted 329 330 prompt = f"You received {len(items_to_process)} messages:\n\n" + "\n\n".join(xml_blocks) 331 logger.info(f"[{self.agent_name}] Sending {len(items_to_process)} messages to agent") 332 333 try: 334 # Send to agent - call directly (blocking is fine, we process one at a time) 335 logger.debug(f"[{self.agent_name}] Calling send_message_to_agent directly...") 336 337 result = self.letta_integration.send_message_to_agent( 338 prompt, 339 self.agent_stream_handler 340 ) 341 logger.info(f"[{self.agent_name}] Agent processing complete, result: {result}") 342 self.messages_sent_to_agent += len(items_to_process) 343 344 # Show updated statistics 345 elapsed = time.time() - self.start_time 346 rate = self.blips_received / (elapsed / 60) if elapsed > 0 else 0 347 logger.info(f"[{self.agent_name}] Stats: {self.blips_received} received | {self.messages_sent_to_agent} sent | {self.blips_published} published ({rate:.1f}/min)") 348 349 except Exception as e: 350 error_name = type(e).__name__ 351 logger.error(f"[{self.agent_name}] {error_name}: {str(e)[:100]}") 352 logger.error(f"[{self.agent_name}] Agent communication error: {e}") 353 # Continue processing - don't let one error stop the bridge 354 finally: 355 # Always clear the processing flag when done 356 async with self.queue_lock: 357 self.processing_queue = False 358 logger.debug(f"[{self.agent_name}] Queue processing complete, flag cleared") 359 360 def agent_stream_handler(self, chunk) -> None: 361 """Handle streaming chunks from the agent.""" 362 logger.debug(f"Processing chunk: {type(chunk)}") 363 364 if hasattr(chunk, 'message_type'): 365 logger.debug(f"Chunk: {chunk.message_type}") 366 367 if chunk.message_type == 'reasoning_message': 368 logger.debug(f"Reasoning: {chunk.reasoning[:100]}...") 369 370 elif chunk.message_type == 'tool_call_message': 371 tool_name = chunk.tool_call.name 372 logger.debug(f"Tool call: {tool_name}") 373 374 if tool_name == 'send_message': 375 try: 376 import json 377 args = json.loads(chunk.tool_call.arguments) 378 message_type = args.get('message_type', 'unknown') 379 message = args.get('message', '') 380 381 if message_type == 'assistant_message': 382 logger.info(f"[{self.agent_name}] Response: {message[:80]}{'...' if len(message) > 80 else ''}") 383 logger.debug(f"[{self.agent_name}] Publishing: {message[:50]}...") 384 else: 385 logger.debug(f"[{self.agent_name}] Ignoring {tool_name} type: {message_type}") 386 except Exception as e: 387 logger.error(f"[{self.agent_name}] Error parsing {tool_name}: {e}") 388 else: 389 logger.debug(f"[{self.agent_name}] Tool: {tool_name}") 390 391 elif chunk.message_type == 'tool_return_message': 392 status_icon = "" if chunk.status == 'success' else "" 393 logger.debug(f"[{self.agent_name}] {status_icon} {chunk.name} -> {chunk.status}") 394 395 elif chunk.message_type == 'assistant_message': 396 logger.info(f"[{self.agent_name}] Direct message: {chunk.content[:80]}{'...' if len(chunk.content) > 80 else ''}") 397 logger.debug(f"[{self.agent_name}] Full content: {chunk.content[:100]}...") 398 else: 399 logger.debug("Chunk has no message_type") 400 401 def agent_batch_callback(self, messages: List[str]) -> None: 402 """Handle published messages from the agent.""" 403 logger.debug(f"[{self.agent_name}] Publishing {len(messages)} messages") 404 logger.info(f"[{self.agent_name}] Publishing {len(messages)} blips...") 405 406 # Publish the messages 407 results = self.letta_integration.publish_messages_as_blips(messages) 408 409 # Update statistics 410 self.blips_published += len(messages) 411 successful = sum(1 for r in results if r is not None) 412 413 if successful == len(messages): 414 logger.info(f"[{self.agent_name}] Published all {len(messages)} blips") 415 else: 416 logger.warning(f"[{self.agent_name}] Published {successful}/{len(messages)} blips") 417 418 async def listen_jetstream(self) -> None: 419 """Listen for messages on the jetstream websocket.""" 420 if not self.websocket: 421 raise RuntimeError("Not connected to jetstream websocket") 422 423 try: 424 async for message in self.websocket: 425 await self.handle_jetstream_message(message) 426 427 except websockets.exceptions.ConnectionClosed: 428 logger.warning("Jetstream websocket connection closed") 429 except Exception as e: 430 logger.error(f"Error in jetstream listen loop: {e}") 431 432 async def run_with_reconnect(self) -> None: 433 """Run the bridge with automatic reconnection.""" 434 self.running = True 435 436 437 # Set up agent batch callback 438 self.letta_integration.set_batch_callback(self.agent_batch_callback) 439 440 # Authenticate with Bluesky 441 logger.info(f"[{self.agent_name}] Authenticating with Bluesky...") 442 if not self.letta_integration.authenticate_bluesky(): 443 logger.error(f"[{self.agent_name}] Failed to authenticate with Bluesky") 444 return 445 logger.info(f"[{self.agent_name}] Authenticated with Bluesky") 446 447 # Store agent DID for filtering 448 self.agent_did = self.letta_integration.blip_publisher.user_did 449 if self.agent_did: 450 logger.debug(f"[{self.agent_name}] Agent DID: {self.agent_did}") 451 else: 452 logger.warning(f"[{self.agent_name}] Could not retrieve agent DID - own message filtering disabled") 453 454 # Check and register thought stream tool 455 logger.debug(f"[{self.agent_name}] Checking thought stream tool...") 456 self.letta_integration.check_and_register_thought_stream_tool(auto_attach=True) 457 458 # Get agent info 459 agent_info = self.letta_integration.get_agent_info() 460 if agent_info: 461 logger.info(f"[{self.agent_name}] Connected to agent {agent_info['id']}") 462 logger.info(f"[{self.agent_name}] Model: {agent_info['model']}") 463 logger.info(f"[{self.agent_name}] Tools: {', '.join(agent_info['tools'][:5])}{'...' if len(agent_info['tools']) > 5 else ''}") 464 465 while self.running: 466 try: 467 # Connect to jetstream 468 if not await self.connect_jetstream(): 469 await self._handle_reconnect_delay() 470 continue 471 472 # Reset reconnect count on successful connection 473 self.reconnect_count = 0 474 475 # Listen for jetstream messages 476 await self.listen_jetstream() 477 478 except KeyboardInterrupt: 479 logger.info(f"[{self.agent_name}] Shutting down...") 480 break 481 except Exception as e: 482 logger.error(f"Unexpected error: {e}") 483 finally: 484 await self.disconnect_jetstream() 485 486 # Handle reconnection if still running 487 if self.running: 488 await self._handle_reconnect_delay() 489 490 # Clean up when exiting the loop 491 await self.stop() 492 493 async def _handle_reconnect_delay(self) -> None: 494 """Handle reconnection delay with exponential backoff.""" 495 self.reconnect_count += 1 496 497 if self.max_reconnect_attempts > 0 and self.reconnect_count > self.max_reconnect_attempts: 498 logger.error(f"[{self.agent_name}] Max reconnect attempts ({self.max_reconnect_attempts}) exceeded") 499 self.running = False 500 return 501 502 # Exponential backoff 503 delay = min(self.reconnect_delay * (2 ** (self.reconnect_count - 1)), 300) # Cap at 5 minutes 504 505 logger.info(f"[{self.agent_name}] Reconnecting in {delay}s (attempt {self.reconnect_count})") 506 await asyncio.sleep(delay) 507 508 async def stop(self) -> None: 509 """Stop the bridge.""" 510 logger.debug(f"[{self.agent_name}] Stopping bridge...") 511 self.running = False 512 513 # Cancel any pending flush task 514 if self.flush_task and not self.flush_task.done(): 515 logger.debug(f"[{self.agent_name}] Cancelling flush task") 516 self.flush_task.cancel() 517 try: 518 await self.flush_task 519 except asyncio.CancelledError: 520 pass 521 522 # Process any remaining messages in queue 523 if self.message_queue: 524 logger.debug(f"[{self.agent_name}] Processing {len(self.message_queue)} remaining messages") 525 await self.process_message_queue() 526 527 await self.disconnect_jetstream() 528 await self.did_cache.close() 529 530 # Final statistics 531 elapsed = time.time() - self.start_time 532 logger.info(f"[{self.agent_name}] Final Stats: received={self.blips_received}, sent={self.messages_sent_to_agent}, published={self.blips_published}, runtime={elapsed/60:.1f}m") 533 if elapsed > 0: 534 logger.info(f"[{self.agent_name}] Average rate: {self.blips_received / (elapsed / 60):.1f}/min") 535 536 537def list_available_agents(directory: str) -> None: 538 """List all agent configurations in a directory.""" 539 try: 540 agent_dir = Path(directory) 541 if not agent_dir.exists(): 542 console.print(f" Directory not found: {directory}") 543 return 544 545 configs = list(agent_dir.glob("*.yaml")) + list(agent_dir.glob("*.yml")) 546 547 if not configs: 548 console.print(f" No agent configurations found in: {directory}") 549 console.print(" Looking for *.yaml or *.yml files") 550 return 551 552 console.print(f" Available agent configurations in {directory}:") 553 console.print() 554 555 for config_path in sorted(configs): 556 try: 557 # Import here to avoid circular imports 558 from config_loader import load_config 559 config = load_config(str(config_path)) 560 561 agent_name = config.get('agent', {}).get('name', 'Unknown Agent') 562 agent_id = config.get('agent', {}).get('agent_id', 'No ID') 563 batch_size = config.get('agent', {}).get('batch_size', 1) 564 wanted_dids = config.get('jetstream', {}).get('wanted_dids', []) 565 566 console.print(f" {config_path.stem}") 567 console.print(f" Name: {agent_name}") 568 console.print(f" ID: {agent_id}") 569 console.print(f" Batch size: {batch_size}") 570 if wanted_dids: 571 console.print(f" Monitoring: {len(wanted_dids)} DIDs") 572 else: 573 console.print(f" Monitoring: All DIDs") 574 console.print() 575 576 except Exception as e: 577 console.print(f" {config_path.stem} (Error loading: {str(e)})") 578 console.print() 579 580 console.print(f"Usage: python {sys.argv[0]} --agent {directory}/AGENT_NAME.yaml") 581 582 except Exception as e: 583 console.print(f" Error listing agents: {e}") 584 585 586@click.command() 587@click.option('--config', '-c', type=click.Path(exists=True), help='Path to configuration file (deprecated, use --agent)') 588@click.option('--agent', '-a', type=click.Path(exists=True), help='Path to agent configuration file') 589@click.option('--list-agents', type=click.Path(exists=True), help='Directory to list available agent configurations') 590@click.option('--setup', is_flag=True, help='Run interactive setup wizard to create agent configuration') 591@click.option('--verbose', '-v', is_flag=True, help='Enable verbose logging') 592@click.option('--batch-size', type=int, help='Override batch size for message processing') 593@click.option('--wanted-dids', help='Comma-separated list of DIDs to monitor') 594def main(config: Optional[str], agent: Optional[str], list_agents: Optional[str], setup: bool, verbose: bool, batch_size: Optional[int], wanted_dids: Optional[str]): 595 """Run the Jetstream-Letta bridge for bidirectional agent communication.""" 596 597 # Handle --list-agents option 598 if list_agents: 599 list_available_agents(list_agents) 600 return 601 602 # Handle --setup option 603 if setup: 604 try: 605 from setup_wizard import SetupWizard 606 607 console.print(" [bold blue]Starting setup wizard...[/bold blue]") 608 wizard = SetupWizard() 609 asyncio.run(wizard.run()) 610 return 611 612 except ImportError: 613 console.print(" [bold red]Setup wizard not available.[/bold red]") 614 console.print("Make sure setup_wizard.py is in the src/ directory.") 615 sys.exit(1) 616 except Exception as e: 617 console.print(f" [bold red]Setup wizard error:[/bold red] {e}") 618 sys.exit(1) 619 620 # Set up logging level 621 if verbose: 622 logging.getLogger().setLevel(logging.DEBUG) 623 624 try: 625 # Determine config file to use (priority: --agent > --config > env var > default) 626 config_file = None 627 628 if agent: 629 config_file = agent 630 logger.info(f"Using agent configuration: {agent}") 631 elif config: 632 config_file = config 633 logger.warning(f"Using legacy --config option: {config}") 634 logger.info("Consider using --agent instead for better agent management") 635 else: 636 # Check environment variable 637 env_config = os.getenv('LETTA_AGENT_CONFIG') 638 if env_config and Path(env_config).exists(): 639 config_file = env_config 640 logger.info(f"Using config from environment variable: {env_config}") 641 else: 642 logger.error("No configuration specified!") 643 logger.error("Use --agent PATH_TO_AGENT.yaml or --config PATH_TO_CONFIG.yaml") 644 logger.error("Or set LETTA_AGENT_CONFIG environment variable") 645 logger.error("Use --list-agents DIRECTORY to see available agents") 646 sys.exit(1) 647 648 # Load configuration 649 app_config = load_config(config_file) 650 651 # Override configuration with command line options 652 if batch_size: 653 app_config.setdefault('agent', {})['batch_size'] = batch_size 654 if wanted_dids: 655 did_list = [did.strip() for did in wanted_dids.split(',') if did.strip()] 656 app_config.setdefault('jetstream', {})['wanted_dids'] = did_list 657 658 # Create bridge 659 bridge = JetstreamLettaBridge(app_config) 660 661 # Run the bridge 662 logger.info("Starting Jetstream-Letta Bridge") 663 logger.info("Monitoring jetstream for blips → sending to agent → publishing responses") 664 asyncio.run(bridge.run_with_reconnect()) 665 666 except KeyboardInterrupt: 667 logger.info("Interrupted by user") 668 except Exception as e: 669 logger.error(f"Fatal error: {e}") 670 sys.exit(1) 671 672 673if __name__ == '__main__': 674 main()