this repo has no description
1#!/usr/bin/env python3
2"""Jetstream-Letta bridge for bidirectional multi-agent communication."""
3import asyncio
4import json
5import logging
6import os
7import signal
8import sys
9import time
10from typing import Optional, List, Dict, Any
11from datetime import datetime
12from pathlib import Path
13
14import click
15import websockets
16from rich.console import Console
17from rich.logging import RichHandler
18
19try:
20 from .letta_integration import LettaAgentIntegration
21 from .config_loader import load_config
22 from .models import JetstreamEvent, BlipRecord, BlipMessage
23 from .did_cache import DIDCache
24except ImportError:
25 # Handle running as script directly
26 import sys
27 from pathlib import Path
28 sys.path.insert(0, str(Path(__file__).parent))
29 from letta_integration import LettaAgentIntegration
30 from config_loader import load_config
31 from models import JetstreamEvent, BlipRecord, BlipMessage
32 from did_cache import DIDCache
33
34# Set up logging
35console = Console()
36logging.basicConfig(
37 level=logging.INFO,
38 format="%(message)s",
39 datefmt="[%X]",
40 handlers=[RichHandler(console=console, rich_tracebacks=True, show_path=False)]
41)
42logger = logging.getLogger(__name__)
43
44# Suppress noisy HTTP logs from letta client
45logging.getLogger("httpx").setLevel(logging.WARNING)
46logging.getLogger("httpcore").setLevel(logging.WARNING)
47logging.getLogger("openai._base_client").setLevel(logging.WARNING)
48
49
50class JetstreamLettaBridge:
51 """Bridge connecting Jetstream blip monitoring with Letta agent processing."""
52
53 def __init__(self, config: Dict[str, Any]):
54 """Initialize the bridge."""
55 self.config = config
56 self.bridge_config = config.get('bridge', {})
57 self.jetstream_config = config.get('jetstream', {})
58
59 # Get agent name for logging
60 self.agent_name = config.get('agent', {}).get('name', 'unknown')
61
62 # Initialize components
63 self.letta_integration = LettaAgentIntegration(config)
64 self.did_cache = DIDCache(
65 max_size=config['cache']['max_cache_size'],
66 ttl=config['cache']['did_cache_ttl']
67 )
68
69 # Jetstream connection
70 self.websocket: Optional[websockets.WebSocketServerProtocol] = None
71 self.running = False
72 self.reconnect_count = 0
73
74 # Message processing
75 self.message_queue: List[Dict[str, Any]] = []
76 self.queue_lock = asyncio.Lock()
77 self.processing_queue = False # Flag to prevent concurrent processing
78 self.batch_size = config.get('agent', {}).get('batch_size', 1)
79 self.queue_flush_timeout = config.get('agent', {}).get('queue_flush_timeout', 30) # seconds
80 self.queue_first_message_time: Optional[float] = None
81 self.flush_task: Optional[asyncio.Task] = None
82
83 # Configuration
84 self.wanted_dids = set(self.jetstream_config.get('wanted_dids', []))
85 self.prompt_template = self.bridge_config.get('prompt_template',
86 "[@{author}] {content}")
87 self.include_metadata = self.bridge_config.get('include_metadata', True)
88 self.max_reconnect_attempts = self.jetstream_config.get('max_reconnect_attempts', 10)
89 self.reconnect_delay = self.jetstream_config.get('reconnect_delay', 5)
90
91 # Statistics
92 self.blips_received = 0
93 self.messages_sent_to_agent = 0
94 self.blips_published = 0
95 self.start_time = time.time()
96
97 # Agent DID (set after authentication)
98 self.agent_did: Optional[str] = None
99
100 logger.debug(f"Initialized bridge for agent {self.letta_integration.agent_id}")
101 logger.debug(f"Batch size: {self.batch_size}, flush timeout: {self.queue_flush_timeout}s")
102
103
104 def build_websocket_url(self) -> str:
105 """Build the websocket URL with query parameters."""
106 base_url = self.jetstream_config['instance']
107 if not base_url.endswith('/subscribe'):
108 base_url = base_url.rstrip('/') + '/subscribe'
109
110 params = []
111
112 # Filter for stream.thought.blip collection
113 params.append("wantedCollections=stream.thought.blip")
114
115 # Add wanted DIDs if specified
116 if self.wanted_dids:
117 for did in self.wanted_dids:
118 params.append(f"wantedDids={did}")
119
120 # Add compression support
121 params.append("compress=false")
122
123 url = base_url
124 if params:
125 url += "?" + "&".join(params)
126
127 return url
128
129 async def connect_jetstream(self) -> bool:
130 """Connect to jetstream websocket."""
131 url = self.build_websocket_url()
132
133 try:
134 logger.debug(f"[{self.agent_name}] Connecting to jetstream: {url}")
135 self.websocket = await websockets.connect(
136 url,
137 ping_interval=30,
138 ping_timeout=10,
139 close_timeout=10
140 )
141 logger.info(f"[{self.agent_name}] Connected to jetstream")
142 return True
143
144 except Exception as e:
145 logger.error(f"[{self.agent_name}] Failed to connect: {e}")
146 return False
147
148 async def disconnect_jetstream(self) -> None:
149 """Disconnect from jetstream."""
150 if self.websocket:
151 await self.websocket.close()
152 self.websocket = None
153 logger.debug("Disconnected from jetstream")
154
155 async def handle_jetstream_message(self, message: str) -> None:
156 """Handle incoming jetstream message."""
157 try:
158 data = json.loads(message)
159 event = JetstreamEvent(**data)
160
161 # Only process commit events with blip records
162 if event.kind != "commit" or not event.commit:
163 return
164
165 commit = event.commit
166 if commit.collection != "stream.thought.blip":
167 return
168
169 # Skip delete operations
170 if commit.operation == "delete":
171 return
172
173 # Filter by wanted DIDs if specified
174 if self.wanted_dids and event.did not in self.wanted_dids:
175 return
176
177 # Parse blip record
178 if not commit.record:
179 logger.warning(f"No record data in commit from {event.did}")
180 return
181
182 try:
183 blip_record = BlipRecord(**commit.record)
184 except Exception as e:
185 logger.warning(f"Failed to parse blip record from {event.did}: {e}")
186 return
187
188 # Filter out own messages to prevent feedback loop
189 if self.agent_did and event.did == self.agent_did:
190 logger.debug(f"Ignoring own message from {event.did}")
191 return
192
193 # Resolve DID to profile data
194 profile_data = await self.did_cache.resolve_did(event.did)
195 if profile_data:
196 handle = profile_data.handle
197 display_name = profile_data.display_name
198 else:
199 handle = event.did # Fallback to DID
200 display_name = None
201
202 # Create blip message for processing
203 blip_message = BlipMessage(
204 author_handle=handle,
205 author_display_name=display_name,
206 author_did=event.did,
207 created_at=blip_record.created_at,
208 content=blip_record.content,
209 record_uri=f"at://{event.did}/{commit.collection}/{commit.rkey}",
210 record_cid=commit.cid
211 )
212
213 # Queue message for agent processing
214 await self.queue_blip_for_processing(blip_message)
215
216 self.blips_received += 1
217
218 except json.JSONDecodeError as e:
219 logger.error(f"Failed to parse JSON message: {e}")
220 except Exception as e:
221 logger.error(f"Error handling jetstream message: {e}")
222
223 async def queue_blip_for_processing(self, blip_message: BlipMessage) -> None:
224 """Queue a blip message for agent processing."""
225 should_process_immediately = False
226
227 async with self.queue_lock:
228 # Create prompt from blip
229 prompt = self.create_prompt_from_blip(blip_message)
230
231 # Add to queue with metadata
232 queue_item = {
233 'prompt': prompt,
234 'blip_message': blip_message,
235 'timestamp': time.time()
236 }
237
238 # Track first message time for timeout
239 is_first_message = len(self.message_queue) == 0
240 self.message_queue.append(queue_item)
241
242 if is_first_message:
243 self.queue_first_message_time = time.time()
244
245 logger.info(f"[{self.agent_name}] Queued: @{blip_message.author_handle} (queue: {len(self.message_queue)})")
246 logger.debug(f"[{self.agent_name}] Content: {blip_message.content[:100]}...")
247
248 # Check if we should process immediately or wait for batch
249 should_process_immediately = (self.batch_size == 1 or len(self.message_queue) >= self.batch_size)
250 should_schedule_flush = is_first_message and self.batch_size > 1
251
252 # Process immediately if needed (outside the lock to avoid deadlock)
253 if should_process_immediately:
254 await self.process_message_queue()
255 elif should_schedule_flush:
256 await self.schedule_queue_flush()
257
258 def create_prompt_from_blip(self, blip_message: BlipMessage) -> str:
259 """Create a prompt for the agent from a blip message using XML format."""
260 # Use the standardized XML format from BlipMessage.format_display()
261 return blip_message.format_display()
262
263 async def schedule_queue_flush(self) -> None:
264 """Schedule a queue flush after the timeout period."""
265 # Cancel any existing flush task
266 if self.flush_task and not self.flush_task.done():
267 self.flush_task.cancel()
268
269 # Schedule new flush task
270 self.flush_task = asyncio.create_task(self._flush_after_timeout())
271
272 async def _flush_after_timeout(self) -> None:
273 """Wait for timeout then flush the queue."""
274 try:
275 await asyncio.sleep(self.queue_flush_timeout)
276
277 # Check if we still have messages to process and not already processing
278 async with self.queue_lock:
279 if self.message_queue and not self.processing_queue:
280 logger.info(f"[{self.agent_name}] Timeout flush: {len(self.message_queue)} messages")
281 # Process outside the lock
282 else:
283 logger.debug(f"[{self.agent_name}] Flush skipped - queue empty or already processing")
284 return
285
286 await self.process_message_queue()
287
288 except asyncio.CancelledError:
289 # Task was cancelled, which is fine
290 logger.debug(f"[{self.agent_name}] Flush task cancelled")
291 pass
292
293 async def process_message_queue(self) -> None:
294 """Process all queued messages by sending them to the agent."""
295 # Acquire lock and check if we should process
296 async with self.queue_lock:
297 # Check if already processing or queue is empty
298 if self.processing_queue:
299 logger.debug(f"[{self.agent_name}] Already processing queue, skipping")
300 return
301
302 if not self.message_queue:
303 logger.debug(f"[{self.agent_name}] Queue is empty, nothing to process")
304 return
305
306 # Set processing flag and get items
307 self.processing_queue = True
308 items_to_process = self.message_queue.copy()
309 self.message_queue.clear()
310 # Reset queue timing
311 self.queue_first_message_time = None
312 # Cancel flush task since we're processing now
313 if self.flush_task and not self.flush_task.done():
314 self.flush_task.cancel()
315 self.flush_task = None
316
317 # Create combined prompt for batch processing
318 if len(items_to_process) == 1:
319 # Single message - use the XML format directly
320 item = items_to_process[0]
321 prompt = item['prompt'] # This is already in XML format from format_display()
322 logger.info(f"[{self.agent_name}] Sending to agent")
323 logger.debug(f"[{self.agent_name}] Prompt: {prompt[:200]}...")
324 else:
325 # Multiple messages - combine XML blocks
326 xml_blocks = []
327 for item in items_to_process:
328 xml_blocks.append(item['prompt']) # Each prompt is already XML formatted
329
330 prompt = f"You received {len(items_to_process)} messages:\n\n" + "\n\n".join(xml_blocks)
331 logger.info(f"[{self.agent_name}] Sending {len(items_to_process)} messages to agent")
332
333 try:
334 # Send to agent - call directly (blocking is fine, we process one at a time)
335 logger.debug(f"[{self.agent_name}] Calling send_message_to_agent directly...")
336
337 result = self.letta_integration.send_message_to_agent(
338 prompt,
339 self.agent_stream_handler
340 )
341 logger.info(f"[{self.agent_name}] Agent processing complete, result: {result}")
342 self.messages_sent_to_agent += len(items_to_process)
343
344 # Show updated statistics
345 elapsed = time.time() - self.start_time
346 rate = self.blips_received / (elapsed / 60) if elapsed > 0 else 0
347 logger.info(f"[{self.agent_name}] Stats: {self.blips_received} received | {self.messages_sent_to_agent} sent | {self.blips_published} published ({rate:.1f}/min)")
348
349 except Exception as e:
350 error_name = type(e).__name__
351 logger.error(f"[{self.agent_name}] {error_name}: {str(e)[:100]}")
352 logger.error(f"[{self.agent_name}] Agent communication error: {e}")
353 # Continue processing - don't let one error stop the bridge
354 finally:
355 # Always clear the processing flag when done
356 async with self.queue_lock:
357 self.processing_queue = False
358 logger.debug(f"[{self.agent_name}] Queue processing complete, flag cleared")
359
360 def agent_stream_handler(self, chunk) -> None:
361 """Handle streaming chunks from the agent."""
362 logger.debug(f"Processing chunk: {type(chunk)}")
363
364 if hasattr(chunk, 'message_type'):
365 logger.debug(f"Chunk: {chunk.message_type}")
366
367 if chunk.message_type == 'reasoning_message':
368 logger.debug(f"Reasoning: {chunk.reasoning[:100]}...")
369
370 elif chunk.message_type == 'tool_call_message':
371 tool_name = chunk.tool_call.name
372 logger.debug(f"Tool call: {tool_name}")
373
374 if tool_name == 'send_message':
375 try:
376 import json
377 args = json.loads(chunk.tool_call.arguments)
378 message_type = args.get('message_type', 'unknown')
379 message = args.get('message', '')
380
381 if message_type == 'assistant_message':
382 logger.info(f"[{self.agent_name}] Response: {message[:80]}{'...' if len(message) > 80 else ''}")
383 logger.debug(f"[{self.agent_name}] Publishing: {message[:50]}...")
384 else:
385 logger.debug(f"[{self.agent_name}] Ignoring {tool_name} type: {message_type}")
386 except Exception as e:
387 logger.error(f"[{self.agent_name}] Error parsing {tool_name}: {e}")
388 else:
389 logger.debug(f"[{self.agent_name}] Tool: {tool_name}")
390
391 elif chunk.message_type == 'tool_return_message':
392 status_icon = "" if chunk.status == 'success' else "✗"
393 logger.debug(f"[{self.agent_name}] {status_icon} {chunk.name} -> {chunk.status}")
394
395 elif chunk.message_type == 'assistant_message':
396 logger.info(f"[{self.agent_name}] Direct message: {chunk.content[:80]}{'...' if len(chunk.content) > 80 else ''}")
397 logger.debug(f"[{self.agent_name}] Full content: {chunk.content[:100]}...")
398 else:
399 logger.debug("Chunk has no message_type")
400
401 def agent_batch_callback(self, messages: List[str]) -> None:
402 """Handle published messages from the agent."""
403 logger.debug(f"[{self.agent_name}] Publishing {len(messages)} messages")
404 logger.info(f"[{self.agent_name}] Publishing {len(messages)} blips...")
405
406 # Publish the messages
407 results = self.letta_integration.publish_messages_as_blips(messages)
408
409 # Update statistics
410 self.blips_published += len(messages)
411 successful = sum(1 for r in results if r is not None)
412
413 if successful == len(messages):
414 logger.info(f"[{self.agent_name}] Published all {len(messages)} blips")
415 else:
416 logger.warning(f"[{self.agent_name}] Published {successful}/{len(messages)} blips")
417
418 async def listen_jetstream(self) -> None:
419 """Listen for messages on the jetstream websocket."""
420 if not self.websocket:
421 raise RuntimeError("Not connected to jetstream websocket")
422
423 try:
424 async for message in self.websocket:
425 await self.handle_jetstream_message(message)
426
427 except websockets.exceptions.ConnectionClosed:
428 logger.warning("Jetstream websocket connection closed")
429 except Exception as e:
430 logger.error(f"Error in jetstream listen loop: {e}")
431
432 async def run_with_reconnect(self) -> None:
433 """Run the bridge with automatic reconnection."""
434 self.running = True
435
436
437 # Set up agent batch callback
438 self.letta_integration.set_batch_callback(self.agent_batch_callback)
439
440 # Authenticate with Bluesky
441 logger.info(f"[{self.agent_name}] Authenticating with Bluesky...")
442 if not self.letta_integration.authenticate_bluesky():
443 logger.error(f"[{self.agent_name}] Failed to authenticate with Bluesky")
444 return
445 logger.info(f"[{self.agent_name}] Authenticated with Bluesky")
446
447 # Store agent DID for filtering
448 self.agent_did = self.letta_integration.blip_publisher.user_did
449 if self.agent_did:
450 logger.debug(f"[{self.agent_name}] Agent DID: {self.agent_did}")
451 else:
452 logger.warning(f"[{self.agent_name}] Could not retrieve agent DID - own message filtering disabled")
453
454 # Check and register thought stream tool
455 logger.debug(f"[{self.agent_name}] Checking thought stream tool...")
456 self.letta_integration.check_and_register_thought_stream_tool(auto_attach=True)
457
458 # Get agent info
459 agent_info = self.letta_integration.get_agent_info()
460 if agent_info:
461 logger.info(f"[{self.agent_name}] Connected to agent {agent_info['id']}")
462 logger.info(f"[{self.agent_name}] Model: {agent_info['model']}")
463 logger.info(f"[{self.agent_name}] Tools: {', '.join(agent_info['tools'][:5])}{'...' if len(agent_info['tools']) > 5 else ''}")
464
465 while self.running:
466 try:
467 # Connect to jetstream
468 if not await self.connect_jetstream():
469 await self._handle_reconnect_delay()
470 continue
471
472 # Reset reconnect count on successful connection
473 self.reconnect_count = 0
474
475 # Listen for jetstream messages
476 await self.listen_jetstream()
477
478 except KeyboardInterrupt:
479 logger.info(f"[{self.agent_name}] Shutting down...")
480 break
481 except Exception as e:
482 logger.error(f"Unexpected error: {e}")
483 finally:
484 await self.disconnect_jetstream()
485
486 # Handle reconnection if still running
487 if self.running:
488 await self._handle_reconnect_delay()
489
490 # Clean up when exiting the loop
491 await self.stop()
492
493 async def _handle_reconnect_delay(self) -> None:
494 """Handle reconnection delay with exponential backoff."""
495 self.reconnect_count += 1
496
497 if self.max_reconnect_attempts > 0 and self.reconnect_count > self.max_reconnect_attempts:
498 logger.error(f"[{self.agent_name}] Max reconnect attempts ({self.max_reconnect_attempts}) exceeded")
499 self.running = False
500 return
501
502 # Exponential backoff
503 delay = min(self.reconnect_delay * (2 ** (self.reconnect_count - 1)), 300) # Cap at 5 minutes
504
505 logger.info(f"[{self.agent_name}] Reconnecting in {delay}s (attempt {self.reconnect_count})")
506 await asyncio.sleep(delay)
507
508 async def stop(self) -> None:
509 """Stop the bridge."""
510 logger.debug(f"[{self.agent_name}] Stopping bridge...")
511 self.running = False
512
513 # Cancel any pending flush task
514 if self.flush_task and not self.flush_task.done():
515 logger.debug(f"[{self.agent_name}] Cancelling flush task")
516 self.flush_task.cancel()
517 try:
518 await self.flush_task
519 except asyncio.CancelledError:
520 pass
521
522 # Process any remaining messages in queue
523 if self.message_queue:
524 logger.debug(f"[{self.agent_name}] Processing {len(self.message_queue)} remaining messages")
525 await self.process_message_queue()
526
527 await self.disconnect_jetstream()
528 await self.did_cache.close()
529
530 # Final statistics
531 elapsed = time.time() - self.start_time
532 logger.info(f"[{self.agent_name}] Final Stats: received={self.blips_received}, sent={self.messages_sent_to_agent}, published={self.blips_published}, runtime={elapsed/60:.1f}m")
533 if elapsed > 0:
534 logger.info(f"[{self.agent_name}] Average rate: {self.blips_received / (elapsed / 60):.1f}/min")
535
536
537def list_available_agents(directory: str) -> None:
538 """List all agent configurations in a directory."""
539 try:
540 agent_dir = Path(directory)
541 if not agent_dir.exists():
542 console.print(f" Directory not found: {directory}")
543 return
544
545 configs = list(agent_dir.glob("*.yaml")) + list(agent_dir.glob("*.yml"))
546
547 if not configs:
548 console.print(f" No agent configurations found in: {directory}")
549 console.print(" Looking for *.yaml or *.yml files")
550 return
551
552 console.print(f" Available agent configurations in {directory}:")
553 console.print()
554
555 for config_path in sorted(configs):
556 try:
557 # Import here to avoid circular imports
558 from config_loader import load_config
559 config = load_config(str(config_path))
560
561 agent_name = config.get('agent', {}).get('name', 'Unknown Agent')
562 agent_id = config.get('agent', {}).get('agent_id', 'No ID')
563 batch_size = config.get('agent', {}).get('batch_size', 1)
564 wanted_dids = config.get('jetstream', {}).get('wanted_dids', [])
565
566 console.print(f" {config_path.stem}")
567 console.print(f" Name: {agent_name}")
568 console.print(f" ID: {agent_id}")
569 console.print(f" Batch size: {batch_size}")
570 if wanted_dids:
571 console.print(f" Monitoring: {len(wanted_dids)} DIDs")
572 else:
573 console.print(f" Monitoring: All DIDs")
574 console.print()
575
576 except Exception as e:
577 console.print(f" {config_path.stem} (Error loading: {str(e)})")
578 console.print()
579
580 console.print(f"Usage: python {sys.argv[0]} --agent {directory}/AGENT_NAME.yaml")
581
582 except Exception as e:
583 console.print(f" Error listing agents: {e}")
584
585
586@click.command()
587@click.option('--config', '-c', type=click.Path(exists=True), help='Path to configuration file (deprecated, use --agent)')
588@click.option('--agent', '-a', type=click.Path(exists=True), help='Path to agent configuration file')
589@click.option('--list-agents', type=click.Path(exists=True), help='Directory to list available agent configurations')
590@click.option('--setup', is_flag=True, help='Run interactive setup wizard to create agent configuration')
591@click.option('--verbose', '-v', is_flag=True, help='Enable verbose logging')
592@click.option('--batch-size', type=int, help='Override batch size for message processing')
593@click.option('--wanted-dids', help='Comma-separated list of DIDs to monitor')
594def main(config: Optional[str], agent: Optional[str], list_agents: Optional[str], setup: bool, verbose: bool, batch_size: Optional[int], wanted_dids: Optional[str]):
595 """Run the Jetstream-Letta bridge for bidirectional agent communication."""
596
597 # Handle --list-agents option
598 if list_agents:
599 list_available_agents(list_agents)
600 return
601
602 # Handle --setup option
603 if setup:
604 try:
605 from setup_wizard import SetupWizard
606
607 console.print(" [bold blue]Starting setup wizard...[/bold blue]")
608 wizard = SetupWizard()
609 asyncio.run(wizard.run())
610 return
611
612 except ImportError:
613 console.print(" [bold red]Setup wizard not available.[/bold red]")
614 console.print("Make sure setup_wizard.py is in the src/ directory.")
615 sys.exit(1)
616 except Exception as e:
617 console.print(f" [bold red]Setup wizard error:[/bold red] {e}")
618 sys.exit(1)
619
620 # Set up logging level
621 if verbose:
622 logging.getLogger().setLevel(logging.DEBUG)
623
624 try:
625 # Determine config file to use (priority: --agent > --config > env var > default)
626 config_file = None
627
628 if agent:
629 config_file = agent
630 logger.info(f"Using agent configuration: {agent}")
631 elif config:
632 config_file = config
633 logger.warning(f"Using legacy --config option: {config}")
634 logger.info("Consider using --agent instead for better agent management")
635 else:
636 # Check environment variable
637 env_config = os.getenv('LETTA_AGENT_CONFIG')
638 if env_config and Path(env_config).exists():
639 config_file = env_config
640 logger.info(f"Using config from environment variable: {env_config}")
641 else:
642 logger.error("No configuration specified!")
643 logger.error("Use --agent PATH_TO_AGENT.yaml or --config PATH_TO_CONFIG.yaml")
644 logger.error("Or set LETTA_AGENT_CONFIG environment variable")
645 logger.error("Use --list-agents DIRECTORY to see available agents")
646 sys.exit(1)
647
648 # Load configuration
649 app_config = load_config(config_file)
650
651 # Override configuration with command line options
652 if batch_size:
653 app_config.setdefault('agent', {})['batch_size'] = batch_size
654 if wanted_dids:
655 did_list = [did.strip() for did in wanted_dids.split(',') if did.strip()]
656 app_config.setdefault('jetstream', {})['wanted_dids'] = did_list
657
658 # Create bridge
659 bridge = JetstreamLettaBridge(app_config)
660
661 # Run the bridge
662 logger.info("Starting Jetstream-Letta Bridge")
663 logger.info("Monitoring jetstream for blips → sending to agent → publishing responses")
664 asyncio.run(bridge.run_with_reconnect())
665
666 except KeyboardInterrupt:
667 logger.info("Interrupted by user")
668 except Exception as e:
669 logger.error(f"Fatal error: {e}")
670 sys.exit(1)
671
672
673if __name__ == '__main__':
674 main()