swarmbot
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 95 lines 3.7 kB view raw
1"""Async client for the Letta Cloud API.""" 2 3import json 4import os 5import ssl 6from typing import AsyncIterator, Optional 7 8import aiohttp 9import certifi 10 11 12class LettaClient: 13 def __init__(self, base_url: str, api_key: Optional[str] = None, timeout: int = 180): 14 self.base_url = base_url.rstrip("/") 15 self.api_key = api_key or os.getenv("LETTA_API_KEY", "") 16 self.timeout = aiohttp.ClientTimeout(total=timeout) 17 self._ssl = ssl.create_default_context(cafile=certifi.where()) 18 19 async def send_message(self, agent_id: str, text: str) -> str: 20 """Send a message to a Letta agent and return the assistant reply (non-streaming fallback).""" 21 url = f"{self.base_url}/agents/{agent_id}/messages" 22 headers = { 23 "Authorization": f"Bearer {self.api_key}", 24 "Content-Type": "application/json", 25 } 26 payload = {"input": text} 27 28 connector = aiohttp.TCPConnector(ssl=self._ssl) 29 async with aiohttp.ClientSession(timeout=self.timeout, connector=connector) as session: 30 async with session.post(url, headers=headers, json=payload) as resp: 31 body = await resp.text() 32 if resp.status == 404: 33 raise AgentNotFoundError(f"Agent {agent_id} not found") 34 if resp.status == 429: 35 raise RateLimitError("Rate limited by Letta API") 36 if resp.status >= 400: 37 raise APIError(resp.status, body[:200]) 38 data = await resp.json() 39 40 messages = data.get("messages", []) 41 for msg in messages: 42 if msg.get("message_type") == "assistant_message": 43 return msg.get("content", "") 44 return "(no response)" 45 46 async def stream_message(self, agent_id: str, text: str) -> AsyncIterator[dict]: 47 """Stream SSE events from Letta /messages/stream endpoint.""" 48 url = f"{self.base_url}/agents/{agent_id}/messages/stream" 49 headers = { 50 "Authorization": f"Bearer {self.api_key}", 51 "Content-Type": "application/json", 52 "Accept": "text/event-stream", 53 } 54 payload = {"input": text} 55 56 connector = aiohttp.TCPConnector(ssl=self._ssl) 57 async with aiohttp.ClientSession(timeout=self.timeout, connector=connector) as session: 58 async with session.post(url, headers=headers, json=payload) as resp: 59 if resp.status == 404: 60 raise AgentNotFoundError(f"Agent {agent_id} not found") 61 if resp.status == 429: 62 raise RateLimitError("Rate limited by Letta API") 63 if resp.status >= 400: 64 body = await resp.text() 65 raise APIError(resp.status, body[:200]) 66 67 async for line in resp.content: 68 line = line.decode("utf-8", errors="replace").strip() 69 if not line: 70 continue 71 if line.startswith("data: "): 72 data_str = line[6:] 73 if data_str == "[DONE]": 74 return 75 try: 76 yield json.loads(data_str) 77 except json.JSONDecodeError: 78 continue 79 80 81class AgentNotFoundError(Exception): 82 pass 83 84 85class RateLimitError(Exception): 86 pass 87 88 89class APIError(Exception): 90 """Non-2xx HTTP response from Letta API.""" 91 92 def __init__(self, status: int, body: str): 93 self.status = status 94 self.body = body 95 super().__init__(f"HTTP {status}: {body}")