swarmbot
1"""Async client for the Letta Cloud API."""
2
3import json
4import os
5import ssl
6from typing import AsyncIterator, Optional
7
8import aiohttp
9import certifi
10
11
12class LettaClient:
13 def __init__(self, base_url: str, api_key: Optional[str] = None, timeout: int = 180):
14 self.base_url = base_url.rstrip("/")
15 self.api_key = api_key or os.getenv("LETTA_API_KEY", "")
16 self.timeout = aiohttp.ClientTimeout(total=timeout)
17 self._ssl = ssl.create_default_context(cafile=certifi.where())
18
19 async def send_message(self, agent_id: str, text: str) -> str:
20 """Send a message to a Letta agent and return the assistant reply (non-streaming fallback)."""
21 url = f"{self.base_url}/agents/{agent_id}/messages"
22 headers = {
23 "Authorization": f"Bearer {self.api_key}",
24 "Content-Type": "application/json",
25 }
26 payload = {"input": text}
27
28 connector = aiohttp.TCPConnector(ssl=self._ssl)
29 async with aiohttp.ClientSession(timeout=self.timeout, connector=connector) as session:
30 async with session.post(url, headers=headers, json=payload) as resp:
31 body = await resp.text()
32 if resp.status == 404:
33 raise AgentNotFoundError(f"Agent {agent_id} not found")
34 if resp.status == 429:
35 raise RateLimitError("Rate limited by Letta API")
36 if resp.status >= 400:
37 raise APIError(resp.status, body[:200])
38 data = await resp.json()
39
40 messages = data.get("messages", [])
41 for msg in messages:
42 if msg.get("message_type") == "assistant_message":
43 return msg.get("content", "")
44 return "(no response)"
45
46 async def stream_message(self, agent_id: str, text: str) -> AsyncIterator[dict]:
47 """Stream SSE events from Letta /messages/stream endpoint."""
48 url = f"{self.base_url}/agents/{agent_id}/messages/stream"
49 headers = {
50 "Authorization": f"Bearer {self.api_key}",
51 "Content-Type": "application/json",
52 "Accept": "text/event-stream",
53 }
54 payload = {"input": text}
55
56 connector = aiohttp.TCPConnector(ssl=self._ssl)
57 async with aiohttp.ClientSession(timeout=self.timeout, connector=connector) as session:
58 async with session.post(url, headers=headers, json=payload) as resp:
59 if resp.status == 404:
60 raise AgentNotFoundError(f"Agent {agent_id} not found")
61 if resp.status == 429:
62 raise RateLimitError("Rate limited by Letta API")
63 if resp.status >= 400:
64 body = await resp.text()
65 raise APIError(resp.status, body[:200])
66
67 async for line in resp.content:
68 line = line.decode("utf-8", errors="replace").strip()
69 if not line:
70 continue
71 if line.startswith("data: "):
72 data_str = line[6:]
73 if data_str == "[DONE]":
74 return
75 try:
76 yield json.loads(data_str)
77 except json.JSONDecodeError:
78 continue
79
80
81class AgentNotFoundError(Exception):
82 pass
83
84
85class RateLimitError(Exception):
86 pass
87
88
89class APIError(Exception):
90 """Non-2xx HTTP response from Letta API."""
91
92 def __init__(self, status: int, body: str):
93 self.status = status
94 self.body = body
95 super().__init__(f"HTTP {status}: {body}")