this repo has no description
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

Improve multi-agent logging clarity and reduce noise

- Add agent name prefix to all log messages for multi-agent identification
- Suppress noisy HTTP logs from httpx/httpcore/letta client libraries
- Improve agent connection display with cleaner formatting
- Add better debug logging for message flow tracking
- Consolidate verbose logs to debug level for cleaner output
- Make agent status more prominent with bold formatting

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

+135 -143
+87 -101
src/jetstream_letta_bridge.py
··· 37 37 level=logging.INFO, 38 38 format="%(message)s", 39 39 datefmt="[%X]", 40 - handlers=[RichHandler(console=console, rich_tracebacks=True)] 40 + handlers=[RichHandler(console=console, rich_tracebacks=True, show_path=False)] 41 41 ) 42 42 logger = logging.getLogger(__name__) 43 43 44 + # Suppress noisy HTTP logs from letta client 45 + logging.getLogger("httpx").setLevel(logging.WARNING) 46 + logging.getLogger("httpcore").setLevel(logging.WARNING) 47 + logging.getLogger("openai._base_client").setLevel(logging.WARNING) 48 + 44 49 45 50 class JetstreamLettaBridge: 46 51 """Bridge connecting Jetstream blip monitoring with Letta agent processing.""" ··· 50 55 self.config = config 51 56 self.bridge_config = config.get('bridge', {}) 52 57 self.jetstream_config = config.get('jetstream', {}) 53 - 58 + 59 + # Get agent name for logging 60 + self.agent_name = config.get('agent', {}).get('name', 'unknown') 61 + 54 62 # Initialize components 55 63 self.letta_integration = LettaAgentIntegration(config) 56 64 self.did_cache = DIDCache( ··· 88 96 # Agent DID (set after authentication) 89 97 self.agent_did: Optional[str] = None 90 98 91 - logger.info(f"Initialized Jetstream-Letta bridge for agent {self.letta_integration.agent_id}") 92 - logger.info(f"Batch size: {self.batch_size}") 93 - logger.info(f"Queue flush timeout: {self.queue_flush_timeout}s") 94 - if self.wanted_dids: 95 - logger.info(f"Monitoring DIDs: {list(self.wanted_dids)}") 96 - else: 97 - logger.info("Monitoring all DIDs") 99 + logger.debug(f"Initialized bridge for agent {self.letta_integration.agent_id}") 100 + logger.debug(f"Batch size: {self.batch_size}, flush timeout: {self.queue_flush_timeout}s") 98 101 99 102 100 103 def build_websocket_url(self) -> str: ··· 125 128 async def connect_jetstream(self) -> bool: 126 129 """Connect to jetstream websocket.""" 127 130 url = self.build_websocket_url() 128 - 131 + 129 132 try: 130 - console.print(f"🌊 [bold blue]Connecting to jetstream:[/bold blue] {url}") 133 + logger.debug(f"[{self.agent_name}] Connecting to jetstream: {url}") 131 134 self.websocket = await websockets.connect( 132 135 url, 133 136 ping_interval=30, 134 137 ping_timeout=10, 135 138 close_timeout=10 136 139 ) 137 - console.print("✓ [bold green]Connected to jetstream[/bold green]") 140 + console.print(f"[{self.agent_name}] ✓ Connected to jetstream") 138 141 return True 139 - 142 + 140 143 except Exception as e: 141 - console.print(f"❌ [bold red]Failed to connect to jetstream:[/bold red] {e}") 144 + console.print(f"[{self.agent_name}] ❌ Failed to connect: {e}") 142 145 return False 143 146 144 147 async def disconnect_jetstream(self) -> None: ··· 146 149 if self.websocket: 147 150 await self.websocket.close() 148 151 self.websocket = None 149 - console.print("🌊 [dim]Disconnected from jetstream[/dim]") 152 + logger.debug("Disconnected from jetstream") 150 153 151 154 async def handle_jetstream_message(self, message: str) -> None: 152 155 """Handle incoming jetstream message.""" ··· 238 241 if is_first_message: 239 242 self.queue_first_message_time = time.time() 240 243 241 - console.print(f"📨 [bold cyan]Queued blip from @{blip_message.author_handle}:[/bold cyan]") 242 - console.print(f" [dim]{blip_message.content[:100]}{'...' if len(blip_message.content) > 100 else ''}[/dim]") 243 - console.print(f" [yellow]Queue size: {len(self.message_queue)}[/yellow]") 244 + console.print(f"[{self.agent_name}] 📨 Queued: @{blip_message.author_handle} (queue: {len(self.message_queue)})") 245 + logger.debug(f"[{self.agent_name}] Content: {blip_message.content[:100]}...") 244 246 245 247 # Check if we should process immediately or wait for batch 246 248 should_process_immediately = (self.batch_size == 1 or len(self.message_queue) >= self.batch_size) ··· 274 276 # Check if we still have messages to process 275 277 async with self.queue_lock: 276 278 if self.message_queue: 277 - console.print(f"⏰ [yellow]Flushing queue after {self.queue_flush_timeout}s timeout ({len(self.message_queue)} messages)[/yellow]") 279 + console.print(f"[{self.agent_name}] ⏰ Timeout flush: {len(self.message_queue)} messages") 278 280 279 281 await self.process_message_queue() 280 282 ··· 304 306 # Single message - use the XML format directly 305 307 item = items_to_process[0] 306 308 prompt = item['prompt'] # This is already in XML format from format_display() 307 - console.print(f"\n🤖 [bold yellow]Sending message to agent:[/bold yellow]") 308 - console.print(f" [dim]{prompt[:200]}{'...' if len(prompt) > 200 else ''}[/dim]") 309 + console.print(f"[{self.agent_name}] 🤖 → Agent") 310 + logger.debug(f"[{self.agent_name}] Prompt: {prompt[:200]}...") 309 311 else: 310 312 # Multiple messages - combine XML blocks 311 313 xml_blocks = [] 312 314 for item in items_to_process: 313 315 xml_blocks.append(item['prompt']) # Each prompt is already XML formatted 314 - 316 + 315 317 prompt = f"You received {len(items_to_process)} messages:\n\n" + "\n\n".join(xml_blocks) 316 - console.print(f"\n🤖 [bold yellow]Sending batch of {len(items_to_process)} messages to agent[/bold yellow]") 317 - 318 + console.print(f"[{self.agent_name}] 🤖 → Agent ({len(items_to_process)} messages)") 319 + 318 320 try: 319 321 # Send to agent with streaming handler (run in executor to avoid blocking) 320 322 loop = asyncio.get_event_loop() 321 323 await loop.run_in_executor( 322 - None, 324 + None, 323 325 self.letta_integration.send_message_to_agent, 324 - prompt, 326 + prompt, 325 327 self.agent_stream_handler 326 328 ) 329 + console.print(f"[{self.agent_name}] ✓ Agent processing complete") 327 330 self.messages_sent_to_agent += len(items_to_process) 328 - 331 + 329 332 # Show updated statistics 330 333 elapsed = time.time() - self.start_time 331 334 rate = self.blips_received / (elapsed / 60) if elapsed > 0 else 0 332 - console.print(f"📊 [bold cyan]Stats:[/bold cyan] {self.blips_received} blips received, {self.messages_sent_to_agent} sent to agent, {self.blips_published} published ({rate:.1f}/min)") 333 - 335 + console.print(f"[{self.agent_name}] 📊 {self.blips_received} received | {self.messages_sent_to_agent} sent | {self.blips_published} published ({rate:.1f}/min)") 336 + 334 337 except Exception as e: 335 - console.print(f"❌ [bold red]Error sending to agent:[/bold red] {e}") 336 - logger.error(f"Agent communication error: {e}") 338 + console.print(f"[{self.agent_name}] ❌ Agent error: {e}") 339 + logger.error(f"[{self.agent_name}] Agent communication error: {e}") 337 340 338 341 def agent_stream_handler(self, chunk) -> None: 339 342 """Handle streaming chunks from the agent.""" 340 - logger.debug(f"🔍 Processing chunk: {type(chunk)}") 343 + logger.debug(f"Processing chunk: {type(chunk)}") 341 344 342 345 if hasattr(chunk, 'message_type'): 343 - logger.debug(f"📨 Chunk message_type: {chunk.message_type}") 346 + logger.debug(f"Chunk: {chunk.message_type}") 344 347 345 348 if chunk.message_type == 'reasoning_message': 346 - console.print(f"\n🤔 [bold yellow]Agent Reasoning:[/bold yellow]") 347 - for line in chunk.reasoning.split('\n'): 348 - console.print(f" [dim]{line}[/dim]") 349 + logger.debug(f"Reasoning: {chunk.reasoning[:100]}...") 349 350 350 351 elif chunk.message_type == 'tool_call_message': 351 352 tool_name = chunk.tool_call.name 352 - logger.info(f"🔧 Tool call detected: {tool_name}") 353 - 353 + logger.debug(f"Tool call: {tool_name}") 354 + 354 355 if tool_name == 'send_message': 355 - logger.info(f"🎯 Found send_message tool call!") 356 356 try: 357 357 import json 358 358 args = json.loads(chunk.tool_call.arguments) 359 - logger.info(f"🔍 send_message args: {args}") 360 - 361 359 message_type = args.get('message_type', 'unknown') 362 360 message = args.get('message', '') 363 - 364 - logger.info(f"📋 message_type: {message_type}, message: {message[:100]}...") 365 - 361 + 366 362 if message_type == 'assistant_message': 367 - console.print(f"\n📢 [bold green]Agent Response (will become blip):[/bold green]") 368 - console.print(f" [cyan]{message}[/cyan]") 369 - logger.info(f"✅ Detected assistant_message for publishing: {message[:50]}...") 363 + console.print(f"[{self.agent_name}] 📢 Response: {message[:80]}{'...' if len(message) > 80 else ''}") 364 + logger.debug(f"[{self.agent_name}] Publishing: {message[:50]}...") 370 365 else: 371 - console.print(f"\n🔧 [bold blue]Tool Call:[/bold blue] {tool_name} ({message_type})") 372 - logger.debug(f"❌ Ignoring send_message with message_type: {message_type}") 366 + logger.debug(f"[{self.agent_name}] Ignoring {tool_name} type: {message_type}") 373 367 except Exception as e: 374 - logger.error(f"❌ Error parsing send_message arguments: {e}") 375 - console.print(f"\n🔧 [bold blue]Tool Call:[/bold blue] {tool_name}") 368 + logger.error(f"[{self.agent_name}] Error parsing {tool_name}: {e}") 376 369 else: 377 - console.print(f"\n🔧 [bold blue]Tool Call:[/bold blue] {tool_name}") 378 - logger.debug(f"🚫 Ignoring non-send_message tool: {tool_name}") 379 - 370 + logger.debug(f"[{self.agent_name}] Tool: {tool_name}") 371 + 380 372 elif chunk.message_type == 'tool_return_message': 381 - status_color = "green" if chunk.status == 'success' else "red" 382 373 status_icon = "✓" if chunk.status == 'success' else "✗" 383 - console.print(f"\n{status_icon} [bold {status_color}]Tool Result:[/bold {status_color}] {chunk.name} -> {chunk.status}") 384 - logger.debug(f"🔄 Tool result: {chunk.name} -> {chunk.status}") 385 - 374 + logger.debug(f"[{self.agent_name}] {status_icon} {chunk.name} -> {chunk.status}") 375 + 386 376 elif chunk.message_type == 'assistant_message': 387 - console.print(f"\n💬 [bold cyan]Assistant Message:[/bold cyan]") 388 - for line in chunk.content.split('\n'): 389 - console.print(f" {line}") 390 - logger.debug(f"💬 Direct assistant message: {chunk.content[:100]}...") 377 + console.print(f"[{self.agent_name}] 💬 {chunk.content[:80]}{'...' if len(chunk.content) > 80 else ''}") 378 + logger.debug(f"[{self.agent_name}] Direct message: {chunk.content[:100]}...") 391 379 else: 392 - logger.debug(f"❓ Chunk has no message_type attribute") 380 + logger.debug("Chunk has no message_type") 393 381 394 382 def agent_batch_callback(self, messages: List[str]) -> None: 395 383 """Handle published messages from the agent.""" 396 - logger.info(f"agent_batch_callback called with {len(messages)} messages") 397 - console.print(f"\n🚀 [bold magenta]Publishing {len(messages)} response(s) as blips...[/bold magenta]") 398 - 399 - for i, message in enumerate(messages, 1): 400 - console.print(f" {i}. [green]{message[:80]}{'...' if len(message) > 80 else ''}[/green]") 401 - 384 + logger.debug(f"[{self.agent_name}] Publishing {len(messages)} messages") 385 + console.print(f"[{self.agent_name}] 🚀 Publishing {len(messages)} blips...") 386 + 402 387 # Publish the messages 403 388 results = self.letta_integration.publish_messages_as_blips(messages) 404 - 389 + 405 390 # Update statistics 406 391 self.blips_published += len(messages) 407 392 successful = sum(1 for r in results if r is not None) 408 - 393 + 409 394 if successful == len(messages): 410 - console.print(f"✓ [bold green]All {len(messages)} response blips published![/bold green]") 395 + console.print(f"[{self.agent_name}] ✓ Published all {len(messages)} blips") 411 396 else: 412 - console.print(f"⚠ [bold yellow]Published {successful}/{len(messages)} response blips[/bold yellow]") 397 + console.print(f"[{self.agent_name}] ⚠ Published {successful}/{len(messages)} blips") 413 398 414 399 async def listen_jetstream(self) -> None: 415 400 """Listen for messages on the jetstream websocket.""" ··· 434 419 self.letta_integration.set_batch_callback(self.agent_batch_callback) 435 420 436 421 # Authenticate with Bluesky 437 - console.print("🔐 [bold blue]Authenticating with Bluesky...[/bold blue]") 422 + logger.debug(f"[{self.agent_name}] Authenticating with Bluesky...") 438 423 if not self.letta_integration.authenticate_bluesky(): 439 - console.print("❌ [bold red]Failed to authenticate with Bluesky[/bold red]") 424 + console.print(f"[{self.agent_name}] ❌ Failed to authenticate with Bluesky") 440 425 return 441 - console.print("✓ [bold green]Authenticated with Bluesky[/bold green]") 426 + logger.debug(f"[{self.agent_name}] Authenticated with Bluesky") 442 427 443 428 # Store agent DID for filtering 444 429 self.agent_did = self.letta_integration.blip_publisher.user_did 445 430 if self.agent_did: 446 - logger.info(f"Agent DID: {self.agent_did}") 431 + logger.debug(f"Agent DID: {self.agent_did}") 447 432 else: 448 433 logger.warning("Could not retrieve agent DID - own message filtering disabled") 449 434 450 435 # Get agent info 451 436 agent_info = self.letta_integration.get_agent_info() 452 437 if agent_info: 453 - console.print(f"🤖 [bold blue]Connected to agent:[/bold blue] {agent_info['name']} ({agent_info['id']})") 454 - console.print(f" Model: {agent_info['model']}") 455 - console.print(f" Tools: {', '.join(agent_info['tools'][:5])}{'...' if len(agent_info['tools']) > 5 else ''}") 438 + console.print(f"\n[bold blue]{self.agent_name}[/bold blue] 🤖 Connected") 439 + console.print(f" ID: {agent_info['id']}") 440 + console.print(f" Model: {agent_info['model']}") 441 + console.print(f" Tools: {', '.join(agent_info['tools'][:5])}{'...' if len(agent_info['tools']) > 5 else ''}") 456 442 457 443 while self.running: 458 444 try: ··· 468 454 await self.listen_jetstream() 469 455 470 456 except KeyboardInterrupt: 471 - console.print("\n🛑 [bold yellow]Received interrupt signal, shutting down...[/bold yellow]") 457 + console.print("\n🛑 Shutting down...") 472 458 break 473 459 except Exception as e: 474 460 logger.error(f"Unexpected error: {e}") ··· 487 473 self.reconnect_count += 1 488 474 489 475 if self.max_reconnect_attempts > 0 and self.reconnect_count > self.max_reconnect_attempts: 490 - console.print(f"❌ [bold red]Max reconnection attempts ({self.max_reconnect_attempts}) exceeded[/bold red]") 476 + console.print(f"❌ Max reconnect attempts ({self.max_reconnect_attempts}) exceeded") 491 477 self.running = False 492 478 return 493 479 494 480 # Exponential backoff 495 481 delay = min(self.reconnect_delay * (2 ** (self.reconnect_count - 1)), 300) # Cap at 5 minutes 496 482 497 - console.print(f"⏰ [yellow]Reconnecting in {delay}s (attempt {self.reconnect_count})[/yellow]") 483 + console.print(f"⏰ Reconnecting in {delay}s (attempt {self.reconnect_count})") 498 484 await asyncio.sleep(delay) 499 485 500 486 async def stop(self) -> None: ··· 505 491 506 492 # Final statistics 507 493 elapsed = time.time() - self.start_time 508 - console.print(f"\n📊 [bold cyan]Final Stats:[/bold cyan]") 509 - console.print(f" Blips received: {self.blips_received}") 510 - console.print(f" Messages sent to agent: {self.messages_sent_to_agent}") 511 - console.print(f" Blips published: {self.blips_published}") 512 - console.print(f" Runtime: {elapsed/60:.1f} minutes") 513 - console.print(f" Average rate: {self.blips_received / (elapsed / 60):.1f} blips/minute") 494 + console.print(f"\n📊 Final Stats:") 495 + console.print(f" Received: {self.blips_received}") 496 + console.print(f" Sent: {self.messages_sent_to_agent}") 497 + console.print(f" Published: {self.blips_published}") 498 + console.print(f" Runtime: {elapsed/60:.1f}m") 499 + console.print(f" Rate: {self.blips_received / (elapsed / 60):.1f}/min") 514 500 515 501 516 502 def list_available_agents(directory: str) -> None: ··· 518 504 try: 519 505 agent_dir = Path(directory) 520 506 if not agent_dir.exists(): 521 - console.print(f"❌ [bold red]Directory not found:[/bold red] {directory}") 507 + console.print(f"❌ Directory not found: {directory}") 522 508 return 523 509 524 510 configs = list(agent_dir.glob("*.yaml")) + list(agent_dir.glob("*.yml")) 525 511 526 512 if not configs: 527 - console.print(f"📁 [yellow]No agent configurations found in:[/yellow] {directory}") 528 - console.print(" [dim]Looking for *.yaml or *.yml files[/dim]") 513 + console.print(f"📁 No agent configurations found in: {directory}") 514 + console.print(" Looking for *.yaml or *.yml files") 529 515 return 530 516 531 - console.print(f"🤖 [bold blue]Available agent configurations in {directory}:[/bold blue]") 517 + console.print(f"🤖 Available agent configurations in {directory}:") 532 518 console.print() 533 519 534 520 for config_path in sorted(configs): ··· 542 528 batch_size = config.get('agent', {}).get('batch_size', 1) 543 529 wanted_dids = config.get('jetstream', {}).get('wanted_dids', []) 544 530 545 - console.print(f" 📄 [bold cyan]{config_path.stem}[/bold cyan]") 531 + console.print(f" 📄 {config_path.stem}") 546 532 console.print(f" Name: {agent_name}") 547 533 console.print(f" ID: {agent_id}") 548 534 console.print(f" Batch size: {batch_size}") ··· 553 539 console.print() 554 540 555 541 except Exception as e: 556 - console.print(f" ❌ [red]{config_path.stem}[/red] (Error loading: {str(e)})") 542 + console.print(f" ❌ {config_path.stem} (Error loading: {str(e)})") 557 543 console.print() 558 544 559 - console.print(f"Usage: [bold]python {sys.argv[0]} --agent {directory}/AGENT_NAME.yaml[/bold]") 545 + console.print(f"Usage: python {sys.argv[0]} --agent {directory}/AGENT_NAME.yaml") 560 546 561 547 except Exception as e: 562 - console.print(f"❌ [bold red]Error listing agents:[/bold red] {e}") 548 + console.print(f"❌ Error listing agents: {e}") 563 549 564 550 565 551 @click.command() ··· 638 624 bridge = JetstreamLettaBridge(app_config) 639 625 640 626 # Run the bridge 641 - console.print("🌉 [bold magenta]Starting Jetstream-Letta Bridge[/bold magenta]") 642 - console.print(" [dim]Monitoring jetstream for blips → sending to agent → publishing responses[/dim]") 627 + console.print("🌉 Starting Jetstream-Letta Bridge") 628 + console.print(" Monitoring jetstream for blips → sending to agent → publishing responses") 643 629 asyncio.run(bridge.run_with_reconnect()) 644 630 645 631 except KeyboardInterrupt: 646 - console.print("\n🛑 [bold yellow]Interrupted by user[/bold yellow]") 632 + console.print("\n🛑 Interrupted by user") 647 633 except Exception as e: 648 - console.print(f"❌ [bold red]Fatal error:[/bold red] {e}") 634 + console.print(f"❌ Fatal error: {e}") 649 635 logger.error(f"Fatal error: {e}") 650 636 sys.exit(1) 651 637
+43 -36
src/letta_integration.py
··· 32 32 def __init__(self, config: Dict[str, Any]): 33 33 """ 34 34 Initialize the Letta agent integration. 35 - 35 + 36 36 Args: 37 37 config: Configuration dictionary 38 38 """ 39 39 self.config = config 40 40 self.letta_config = config.get('letta', {}) 41 41 self.agent_config = config.get('agent', {}) 42 + self.agent_name = self.agent_config.get('name', 'unknown') 42 43 43 44 # Initialize Letta client 44 45 api_key = self.letta_config.get('api_key') ··· 64 65 self.message_batch: List[str] = [] 65 66 self.batch_callback: Optional[Callable[[List[str]], None]] = None 66 67 67 - logger.info(f"Initialized Letta integration for agent {self.agent_id}") 68 - logger.info(f"Batch size: {self.batch_size}, Max steps: {self.max_steps}") 68 + logger.debug(f"Letta integration initialized: {self.agent_id}") 69 + logger.debug(f"Batch: {self.batch_size}, max steps: {self.max_steps}") 69 70 70 71 def set_batch_callback(self, callback: Callable[[List[str]], None]) -> None: 71 72 """ ··· 129 130 logger.warning("send_message tool call has empty message") 130 131 return None 131 132 132 - logger.info(f"✓ Found send_message(assistant_message): {message[:100]}...") 133 + logger.debug(f"Found assistant message: {message[:50]}...") 133 134 return message 134 135 135 136 except Exception as e: ··· 143 144 Args: 144 145 message: Message content to add 145 146 """ 146 - logger.info(f"🔄 Adding message to batch: {message[:50]}...") 147 + logger.debug(f"Adding to batch: {message[:50]}...") 147 148 self.message_batch.append(message) 148 - logger.info(f"📦 Batch status: {len(self.message_batch)}/{self.batch_size} messages") 149 + logger.debug(f"Batch: {len(self.message_batch)}/{self.batch_size}") 149 150 150 151 # Check if batch is ready 151 152 if len(self.message_batch) >= self.batch_size: 152 - logger.info(f"🚀 Batch full ({len(self.message_batch)}>={self.batch_size}), flushing...") 153 + logger.debug(f"Batch full, flushing {len(self.message_batch)} messages") 153 154 self.flush_batch() 154 155 else: 155 - logger.debug(f"Batch not ready yet ({len(self.message_batch)}<{self.batch_size})") 156 + logger.debug(f"Batch not ready: {len(self.message_batch)}<{self.batch_size}") 156 157 157 158 def flush_batch(self) -> None: 158 159 """Flush the current message batch.""" ··· 163 164 batch_to_send = self.message_batch.copy() 164 165 self.message_batch.clear() 165 166 166 - logger.info(f"💧 Flushing batch with {len(batch_to_send)} messages:") 167 - for i, msg in enumerate(batch_to_send, 1): 168 - logger.info(f" {i}. {msg[:50]}...") 167 + logger.debug(f"Flushing {len(batch_to_send)} messages") 169 168 170 169 # Call the batch callback if set 171 170 if self.batch_callback: 172 - logger.info(f"🔔 Calling batch callback with {len(batch_to_send)} messages") 171 + logger.debug(f"Calling batch callback: {len(batch_to_send)} messages") 173 172 try: 174 173 self.batch_callback(batch_to_send) 175 174 except Exception as e: 176 - logger.error(f"❌ Error in batch callback: {e}") 175 + logger.error(f"Error in batch callback: {e}") 177 176 else: 178 - logger.info(f"📢 No batch callback set, publishing directly") 179 - # Default behavior: publish as blips 177 + logger.debug("No batch callback, publishing directly") 180 178 self.publish_messages_as_blips(batch_to_send) 181 179 182 180 def publish_messages_as_blips(self, messages: List[str]) -> List[Optional[dict]]: ··· 189 187 Returns: 190 188 List of publish results 191 189 """ 192 - logger.info(f"📤 publish_messages_as_blips called with {len(messages)} messages") 190 + logger.debug(f"Publishing {len(messages)} blips") 193 191 194 192 if not self.blip_publisher.client: 195 - logger.error("❌ Bluesky client not authenticated") 193 + logger.error("Bluesky client not authenticated") 196 194 return [None] * len(messages) 197 - 198 - logger.info(f"✅ Bluesky client authenticated, proceeding with publishing") 195 + 196 + logger.debug("Client authenticated, publishing") 199 197 200 198 results = [] 201 199 for i, message in enumerate(messages, 1): 202 - logger.info(f"📝 Publishing blip {i}/{len(messages)}: {message[:50]}...") 200 + logger.debug(f"Publishing {i}/{len(messages)}: {message[:50]}...") 203 201 204 202 try: 205 203 result = self.blip_publisher.publish_blip(message) 206 204 results.append(result) 207 205 208 206 if result: 209 - logger.info(f"✅ Published blip: {result['uri']}") 207 + logger.debug(f"Published: {result['uri']}") 210 208 else: 211 - logger.error(f"❌ Failed to publish blip (no result returned)") 209 + logger.error("Failed to publish blip (no result)") 212 210 213 211 # Small delay between messages to avoid rate limiting 214 212 if i < len(messages): 215 213 import time 216 - logger.debug(f"⏱️ Waiting 0.5s before next publish...") 217 214 time.sleep(0.5) 218 215 219 216 except Exception as e: 220 - logger.error(f"❌ Exception publishing blip: {e}") 217 + logger.error(f"Exception publishing blip: {e}") 221 218 results.append(None) 222 219 223 220 successful = sum(1 for r in results if r is not None) 224 - logger.info(f"📊 Publishing summary: {successful}/{len(messages)} messages published successfully") 221 + logger.debug(f"Published {successful}/{len(messages)} successfully") 225 222 226 223 return results 227 224 228 225 def send_message_to_agent(self, message: str, stream_handler: Optional[Callable] = None) -> Any: 229 226 """ 230 227 Send a message to the Letta agent with streaming support. 231 - 228 + 232 229 Args: 233 230 message: Message to send to the agent 234 231 stream_handler: Optional function to handle streaming chunks 235 - 232 + 236 233 Returns: 237 234 Streaming response object 238 235 """ 239 - logger.info(f"Sending message to agent: {message[:100]}...") 240 - 236 + logger.debug(f"[{self.agent_name}] Sending to agent: {message[:100]}...") 237 + 241 238 try: 242 239 # Create streaming request 240 + logger.debug(f"[{self.agent_name}] Calling create_stream API...") 243 241 message_stream = self.letta_client.agents.messages.create_stream( 244 242 agent_id=self.agent_id, 245 243 messages=[{"role": "user", "content": message}], 246 244 stream_tokens=False, # Step streaming only 247 245 max_steps=self.max_steps 248 246 ) 249 - 247 + logger.debug(f"[{self.agent_name}] Stream created, processing chunks...") 248 + 250 249 # Process streaming response 250 + chunk_count = 0 251 251 for chunk in message_stream: 252 + chunk_count += 1 253 + if chunk_count == 1: 254 + logger.debug(f"[{self.agent_name}] Received first chunk") 255 + 252 256 # Handle streaming chunk 253 257 if stream_handler: 254 258 try: ··· 265 269 # Check for direct assistant messages (not via tool) 266 270 elif hasattr(chunk, 'message_type') and chunk.message_type == 'assistant_message': 267 271 if hasattr(chunk, 'content') and chunk.content: 268 - logger.info(f"📬 Detected direct assistant message: {chunk.content[:100]}...") 272 + logger.debug(f"Direct assistant message: {chunk.content[:100]}...") 269 273 self.add_message_to_batch(chunk.content) 270 274 271 275 # Log chunk information ··· 282 286 logger.debug(f"Assistant: {chunk.content[:100]}...") 283 287 284 288 if str(chunk) == 'done': 289 + logger.debug(f"[{self.agent_name}] Received 'done' signal") 285 290 break 286 - 291 + 292 + logger.debug(f"[{self.agent_name}] Processed {chunk_count} chunks") 293 + 287 294 # Flush any remaining messages in batch 288 295 self.flush_batch() 289 - 290 - logger.info("Agent message processing completed") 296 + 297 + logger.debug(f"[{self.agent_name}] Agent processing completed") 291 298 return message_stream 292 - 299 + 293 300 except Exception as e: 294 - logger.error(f"Error sending message to agent: {e}") 301 + logger.error(f"[{self.agent_name}] Error sending message to agent: {e}", exc_info=True) 295 302 raise 296 303 297 304 def get_agent_info(self) -> Optional[Dict[str, Any]]:
+5 -6
src/publish_blip.py
··· 50 50 password = self.bluesky_config['password'] 51 51 pds_uri = self.bluesky_config.get('pds_uri', 'https://bsky.social') 52 52 53 - logger.info(f"Authenticating as {username} via {pds_uri}") 53 + logger.debug(f"Authenticating {username} via {pds_uri}") 54 54 55 55 self.client = Client(base_url=pds_uri) 56 56 self.client.login(username, password) 57 57 58 - logger.info("Authentication successful") 58 + logger.debug("Authentication successful") 59 59 return True 60 60 61 61 except Exception as e: ··· 106 106 "record": record_data 107 107 }) 108 108 109 - logger.info(f"Published blip: {response.uri}") 110 - logger.debug(f"CID: {response.cid}") 109 + logger.debug(f"Published: {response.uri}") 111 110 112 111 return { 113 112 "uri": response.uri, ··· 133 132 results = [] 134 133 135 134 for i, content in enumerate(messages, 1): 136 - logger.info(f"Publishing message {i}/{len(messages)}") 135 + logger.debug(f"Publishing {i}/{len(messages)}") 137 136 result = self.publish_blip(content) 138 137 results.append(result) 139 138 ··· 143 142 time.sleep(0.5) 144 143 145 144 successful = sum(1 for r in results if r is not None) 146 - logger.info(f"Published {successful}/{len(messages)} messages successfully") 145 + logger.debug(f"Published {successful}/{len(messages)} successfully") 147 146 148 147 return results 149 148