swarmbot
1"""Async IRC bot that dispatches @mentions to Letta agents."""
2
3import asyncio
4import json
5import logging
6import os
7import pathlib
8import re
9import sys
10import time
11from typing import Dict, Optional
12
13import yaml
14
15from letta_client import LettaClient, AgentNotFoundError, RateLimitError, APIError
16
17# Regex for @agentname followed by message text
18MENTION_RE = re.compile(r"^@(\w+)\s+(.*)$", re.DOTALL)
19
20# IRC protocol max is ~512 bytes including CRLF; stay well under
21MAX_MSG_LEN = 400
22
23# ANSI bold red for terminal disconnect alerts
24RED = "\033[1;31m"
25RST = "\033[0m"
26
27
28def setup_logging() -> logging.Logger:
29 """Configure stdout logging with optional level from BOT_LOG_LEVEL env var."""
30 log = logging.getLogger("bot")
31 log.setLevel(logging.DEBUG)
32
33 handler = logging.StreamHandler(sys.stdout)
34 level_name = os.getenv("BOT_LOG_LEVEL", "DEBUG").upper()
35 handler.setLevel(getattr(logging, level_name, logging.INFO))
36
37 fmt = logging.Formatter(
38 "%(asctime)s [%(name)s] %(levelname)s %(message)s",
39 datefmt="%Y-%m-%d %H:%M:%S",
40 )
41 handler.setFormatter(fmt)
42 log.addHandler(handler)
43
44 # child loggers inherit handler
45 for child in ("irc.raw", "dispatch", "letta", "reply"):
46 logging.getLogger(f"bot.{child}").setLevel(logging.DEBUG)
47
48 return log
49
50
51def load_config(path: str = "config.yaml") -> dict:
52 with open(path, "r") as f:
53 raw = f.read()
54
55 def replacer(match):
56 var = match.group(1)
57 return os.getenv(var, f"${{{var}}}")
58
59 raw = re.sub(r"\$\{([^}]+)\}", replacer, raw)
60 return yaml.safe_load(raw)
61
62
63def load_letta_key_from_settings() -> Optional[str]:
64 """Load LETTA_API_KEY from Letta Code's settings.json if env var is unset."""
65 path = pathlib.Path.home() / ".letta" / "settings.json"
66 if not path.exists():
67 return None
68 try:
69 data = json.loads(path.read_text())
70 return data.get("env", {}).get("LETTA_API_KEY")
71 except (json.JSONDecodeError, OSError):
72 return None
73
74
75class IRCBot:
76 def __init__(self, config: dict, log: logging.Logger):
77 self.config = config
78 self.log = log
79 self.log_irc = logging.getLogger("bot.irc.raw")
80 self.log_dispatch = logging.getLogger("bot.dispatch")
81 self.log_letta = logging.getLogger("bot.letta")
82 self.log_reply = logging.getLogger("bot.reply")
83
84 irc_cfg = config["irc"]
85 self.server = irc_cfg["server"]
86 self.port = irc_cfg["port"]
87 self.nick = irc_cfg["nick"]
88 self.channel = irc_cfg["channel"]
89
90 letta_cfg = config["letta"]
91 self.letta = LettaClient(
92 base_url=letta_cfg["base_url"],
93 api_key=letta_cfg.get("api_key"),
94 timeout=letta_cfg.get("timeout", 60),
95 )
96
97 self.agents: Dict[str, str] = config.get("agents", {})
98 bot_cfg = config.get("bot", {})
99 self.max_len = bot_cfg.get("max_message_length", MAX_MSG_LEN)
100 self.reconnect_delay = bot_cfg.get("reconnect_delay", 5)
101 self.reconnect_backoff = bot_cfg.get("reconnect_backoff", 2)
102 self.chunk_delay = bot_cfg.get("chunk_delay", 1.0)
103
104 self.reader: Optional[asyncio.StreamReader] = None
105 self.writer: Optional[asyncio.StreamWriter] = None
106 self.ready_after: float = 0.0
107
108 async def connect(self):
109 self.reader, self.writer = await asyncio.open_connection(self.server, self.port)
110 self._send_raw(f"NICK {self.nick}")
111 self._send_raw(f"USER {self.nick} 0 * :{self.nick}")
112 self._send_raw(f"JOIN {self.channel}")
113 self.ready_after = time.time() + 3
114 self.log.info("Connected to %s:%s as %s", self.server, self.port, self.nick)
115
116 def _send_raw(self, line: str) -> bool:
117 if self.writer and not self.writer.is_closing():
118 try:
119 self.writer.write((line + "\r\n").encode("utf-8"))
120 self.log_irc.debug(">> %s", line)
121 return True
122 except (BrokenPipeError, ConnectionResetError):
123 self.log.warning(f"{RED}Tried to write to closed IRC socket{RST}")
124 return False
125 return False
126
127 async def _privmsg(self, target: str, text: str):
128 for i, chunk in enumerate(self._chunk_text(text)):
129 ok = self._send_raw(f"PRIVMSG {target} :{chunk}")
130 if not ok:
131 self.log.warning("Aborting multi-chunk reply — socket is dead")
132 return
133 self.log_reply.info("<< %s: %s", target, chunk[:120])
134 if i > 0:
135 await asyncio.sleep(self.chunk_delay)
136
137 def _chunk_text(self, text: str):
138 # Normalize whitespace then split into IRC-safe chunks
139 normalized = " ".join(text.split())
140 if len(normalized) <= self.max_len:
141 return [normalized]
142 chunks = []
143 while normalized:
144 if len(normalized) <= self.max_len:
145 chunks.append(normalized)
146 break
147 idx = normalized.rfind(" ", 0, self.max_len)
148 if idx == -1:
149 idx = self.max_len
150 chunks.append(normalized[:idx])
151 normalized = normalized[idx:].lstrip()
152 return chunks
153
154 async def handle_line(self, line: str):
155 self.log_irc.debug("<< %s", line)
156
157 ping_match = re.match(r'^(:[^ ]+\s+)?PING\s+(.*)$', line)
158 if ping_match:
159 payload = ping_match.group(2)
160 self._send_raw(f"PONG {payload}")
161 self.log.debug("PONG → %s", payload)
162 return
163
164 if " PRIVMSG " in line:
165 prefix, _, rest = line.partition(" PRIVMSG ")
166 sender = prefix[1:].split("!", 1)[0] if prefix.startswith(":") else ""
167 target, _, msg = rest.partition(" :")
168 msg = msg.strip()
169
170 if sender == self.nick:
171 return
172 if not target.startswith("#"):
173 return
174 if target != self.channel:
175 return
176 if time.time() < self.ready_after:
177 return
178
179 match = MENTION_RE.match(msg)
180 if not match:
181 return
182
183 agent_name, payload = match.group(1), match.group(2).strip()
184 self.log_dispatch.info(
185 "%s → @%s: %s", sender, agent_name, payload[:200])
186 await self.dispatch(sender, target, agent_name, payload)
187 return
188
189 self.log.debug("Unhandled IRC line: %s", line)
190
191 async def _stream_and_reply(self, sender: str, target: str, agent_name: str, agent_id: str, payload: str):
192 """Background task: stream Letta events and send the final reply."""
193 assistant_reply = ""
194 try:
195 async for event in self.letta.stream_message(agent_id, payload):
196 msg_type = event.get("message_type", "unknown")
197
198 if msg_type == "reasoning_message":
199 reasoning = event.get("reasoning", "")
200 self.log_letta.info("[reasoning] %s", reasoning[:200])
201
202 elif msg_type == "hidden_reasoning_message":
203 state = event.get("state", "")
204 self.log_letta.info("[hidden_reasoning] state=%s", state)
205
206 elif msg_type == "assistant_message":
207 content = event.get("content", "")
208 self.log_letta.info("[assistant] %s", content[:200])
209 if assistant_reply:
210 assistant_reply += " " + content
211 else:
212 assistant_reply = content
213
214 elif msg_type == "tool_call_message":
215 name = event.get("name", "unknown_tool")
216 args = event.get("arguments", {})
217 self.log_letta.info("[tool_call] %s(%s)", name, json.dumps(args)[:200])
218
219 elif msg_type == "tool_return_message":
220 ret = event.get("return_value", "")
221 self.log_letta.info("[tool_return] %s", str(ret)[:200])
222
223 elif msg_type == "stop_reason":
224 reason = event.get("stop_reason", "")
225 self.log_letta.info("[stop] %s", reason)
226
227 elif msg_type == "usage_statistics":
228 total = event.get("total_tokens", 0)
229 steps = event.get("step_count", 0)
230 self.log_letta.info("[usage] %s tokens, %s steps", total, steps)
231
232 elif msg_type == "ping":
233 self.log_letta.debug("[ping]")
234
235 else:
236 self.log_letta.debug("[%s] %s", msg_type, str(event)[:200])
237
238 except AgentNotFoundError:
239 await self._privmsg(target, f"{sender}: Agent '@{agent_name}' no longer exists on Letta.")
240 return
241 except RateLimitError:
242 await self._privmsg(target, f"{sender}: Rate limited by Letta API, wait a moment.")
243 return
244 except asyncio.TimeoutError:
245 await self._privmsg(target, f"{sender}: Letta API timed out, try again later.")
246 return
247 except APIError as exc:
248 await self._privmsg(target, f"{sender}: Letta API error {exc.status}: {exc.body}")
249 return
250 except Exception as exc:
251 await self._privmsg(target, f"{sender}: Error talking to Letta: {type(exc).__name__}: {exc}")
252 return
253
254 if not assistant_reply:
255 assistant_reply = "(no response)"
256
257 await self._privmsg(target, f"{sender}: {assistant_reply}")
258
259 async def dispatch(self, sender: str, target: str, agent_name: str, payload: str):
260 agent_id = self.agents.get(agent_name)
261 if not agent_id:
262 available = ", ".join(sorted(self.agents.keys()))
263 await self._privmsg(target, f"{sender}: Unknown agent '@{agent_name}'. Available: {available}")
264 return
265
266 await self._privmsg(target, f"{sender}: researching via @{agent_name}...")
267 # Spawn background task so IRC read loop stays responsive to PINGs
268 asyncio.create_task(self._stream_and_reply(sender, target, agent_name, agent_id, payload))
269
270 async def run(self):
271 delay = self.reconnect_delay
272 while True:
273 try:
274 await self.connect()
275 delay = self.reconnect_delay
276 while True:
277 raw = await self.reader.readline()
278 if not raw:
279 self.log.warning(f"{RED}Server closed connection (empty read){RST}")
280 break
281 line = raw.decode("utf-8", errors="replace").strip()
282 if line:
283 await self.handle_line(line)
284 except asyncio.CancelledError:
285 raise
286 except (BrokenPipeError, ConnectionResetError):
287 self.log.error(f"{RED}Server dropped connection (flood or network){RST}")
288 self.log.info(f"{RED}Reconnecting in %ss...{RST}", delay)
289 await asyncio.sleep(delay)
290 delay = min(delay * self.reconnect_backoff, 300)
291 except Exception as exc:
292 self.log.error(f"{RED}Connection lost{RST}", exc_info=True)
293 self.log.info(f"{RED}Reconnecting in %ss...{RST}", delay)
294 await asyncio.sleep(delay)
295 delay = min(delay * self.reconnect_backoff, 300)
296 finally:
297 if self.writer:
298 self.writer.close()
299 try:
300 await self.writer.wait_closed()
301 except Exception:
302 pass
303
304
305def main():
306 if not os.getenv("LETTA_API_KEY"):
307 key = load_letta_key_from_settings()
308 if key:
309 os.environ["LETTA_API_KEY"] = key
310 config = load_config()
311 log = setup_logging()
312 bot = IRCBot(config, log)
313 try:
314 asyncio.run(bot.run())
315 except KeyboardInterrupt:
316 log.info("Shutting down.")
317 sys.exit(0)
318
319
320if __name__ == "__main__":
321 main()