swarmbot
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 321 lines 12 kB view raw
1"""Async IRC bot that dispatches @mentions to Letta agents.""" 2 3import asyncio 4import json 5import logging 6import os 7import pathlib 8import re 9import sys 10import time 11from typing import Dict, Optional 12 13import yaml 14 15from letta_client import LettaClient, AgentNotFoundError, RateLimitError, APIError 16 17# Regex for @agentname followed by message text 18MENTION_RE = re.compile(r"^@(\w+)\s+(.*)$", re.DOTALL) 19 20# IRC protocol max is ~512 bytes including CRLF; stay well under 21MAX_MSG_LEN = 400 22 23# ANSI bold red for terminal disconnect alerts 24RED = "\033[1;31m" 25RST = "\033[0m" 26 27 28def setup_logging() -> logging.Logger: 29 """Configure stdout logging with optional level from BOT_LOG_LEVEL env var.""" 30 log = logging.getLogger("bot") 31 log.setLevel(logging.DEBUG) 32 33 handler = logging.StreamHandler(sys.stdout) 34 level_name = os.getenv("BOT_LOG_LEVEL", "DEBUG").upper() 35 handler.setLevel(getattr(logging, level_name, logging.INFO)) 36 37 fmt = logging.Formatter( 38 "%(asctime)s [%(name)s] %(levelname)s %(message)s", 39 datefmt="%Y-%m-%d %H:%M:%S", 40 ) 41 handler.setFormatter(fmt) 42 log.addHandler(handler) 43 44 # child loggers inherit handler 45 for child in ("irc.raw", "dispatch", "letta", "reply"): 46 logging.getLogger(f"bot.{child}").setLevel(logging.DEBUG) 47 48 return log 49 50 51def load_config(path: str = "config.yaml") -> dict: 52 with open(path, "r") as f: 53 raw = f.read() 54 55 def replacer(match): 56 var = match.group(1) 57 return os.getenv(var, f"${{{var}}}") 58 59 raw = re.sub(r"\$\{([^}]+)\}", replacer, raw) 60 return yaml.safe_load(raw) 61 62 63def load_letta_key_from_settings() -> Optional[str]: 64 """Load LETTA_API_KEY from Letta Code's settings.json if env var is unset.""" 65 path = pathlib.Path.home() / ".letta" / "settings.json" 66 if not path.exists(): 67 return None 68 try: 69 data = json.loads(path.read_text()) 70 return data.get("env", {}).get("LETTA_API_KEY") 71 except (json.JSONDecodeError, OSError): 72 return None 73 74 75class IRCBot: 76 def __init__(self, config: dict, log: logging.Logger): 77 self.config = config 78 self.log = log 79 self.log_irc = logging.getLogger("bot.irc.raw") 80 self.log_dispatch = logging.getLogger("bot.dispatch") 81 self.log_letta = logging.getLogger("bot.letta") 82 self.log_reply = logging.getLogger("bot.reply") 83 84 irc_cfg = config["irc"] 85 self.server = irc_cfg["server"] 86 self.port = irc_cfg["port"] 87 self.nick = irc_cfg["nick"] 88 self.channel = irc_cfg["channel"] 89 90 letta_cfg = config["letta"] 91 self.letta = LettaClient( 92 base_url=letta_cfg["base_url"], 93 api_key=letta_cfg.get("api_key"), 94 timeout=letta_cfg.get("timeout", 60), 95 ) 96 97 self.agents: Dict[str, str] = config.get("agents", {}) 98 bot_cfg = config.get("bot", {}) 99 self.max_len = bot_cfg.get("max_message_length", MAX_MSG_LEN) 100 self.reconnect_delay = bot_cfg.get("reconnect_delay", 5) 101 self.reconnect_backoff = bot_cfg.get("reconnect_backoff", 2) 102 self.chunk_delay = bot_cfg.get("chunk_delay", 1.0) 103 104 self.reader: Optional[asyncio.StreamReader] = None 105 self.writer: Optional[asyncio.StreamWriter] = None 106 self.ready_after: float = 0.0 107 108 async def connect(self): 109 self.reader, self.writer = await asyncio.open_connection(self.server, self.port) 110 self._send_raw(f"NICK {self.nick}") 111 self._send_raw(f"USER {self.nick} 0 * :{self.nick}") 112 self._send_raw(f"JOIN {self.channel}") 113 self.ready_after = time.time() + 3 114 self.log.info("Connected to %s:%s as %s", self.server, self.port, self.nick) 115 116 def _send_raw(self, line: str) -> bool: 117 if self.writer and not self.writer.is_closing(): 118 try: 119 self.writer.write((line + "\r\n").encode("utf-8")) 120 self.log_irc.debug(">> %s", line) 121 return True 122 except (BrokenPipeError, ConnectionResetError): 123 self.log.warning(f"{RED}Tried to write to closed IRC socket{RST}") 124 return False 125 return False 126 127 async def _privmsg(self, target: str, text: str): 128 for i, chunk in enumerate(self._chunk_text(text)): 129 ok = self._send_raw(f"PRIVMSG {target} :{chunk}") 130 if not ok: 131 self.log.warning("Aborting multi-chunk reply — socket is dead") 132 return 133 self.log_reply.info("<< %s: %s", target, chunk[:120]) 134 if i > 0: 135 await asyncio.sleep(self.chunk_delay) 136 137 def _chunk_text(self, text: str): 138 # Normalize whitespace then split into IRC-safe chunks 139 normalized = " ".join(text.split()) 140 if len(normalized) <= self.max_len: 141 return [normalized] 142 chunks = [] 143 while normalized: 144 if len(normalized) <= self.max_len: 145 chunks.append(normalized) 146 break 147 idx = normalized.rfind(" ", 0, self.max_len) 148 if idx == -1: 149 idx = self.max_len 150 chunks.append(normalized[:idx]) 151 normalized = normalized[idx:].lstrip() 152 return chunks 153 154 async def handle_line(self, line: str): 155 self.log_irc.debug("<< %s", line) 156 157 ping_match = re.match(r'^(:[^ ]+\s+)?PING\s+(.*)$', line) 158 if ping_match: 159 payload = ping_match.group(2) 160 self._send_raw(f"PONG {payload}") 161 self.log.debug("PONG → %s", payload) 162 return 163 164 if " PRIVMSG " in line: 165 prefix, _, rest = line.partition(" PRIVMSG ") 166 sender = prefix[1:].split("!", 1)[0] if prefix.startswith(":") else "" 167 target, _, msg = rest.partition(" :") 168 msg = msg.strip() 169 170 if sender == self.nick: 171 return 172 if not target.startswith("#"): 173 return 174 if target != self.channel: 175 return 176 if time.time() < self.ready_after: 177 return 178 179 match = MENTION_RE.match(msg) 180 if not match: 181 return 182 183 agent_name, payload = match.group(1), match.group(2).strip() 184 self.log_dispatch.info( 185 "%s → @%s: %s", sender, agent_name, payload[:200]) 186 await self.dispatch(sender, target, agent_name, payload) 187 return 188 189 self.log.debug("Unhandled IRC line: %s", line) 190 191 async def _stream_and_reply(self, sender: str, target: str, agent_name: str, agent_id: str, payload: str): 192 """Background task: stream Letta events and send the final reply.""" 193 assistant_reply = "" 194 try: 195 async for event in self.letta.stream_message(agent_id, payload): 196 msg_type = event.get("message_type", "unknown") 197 198 if msg_type == "reasoning_message": 199 reasoning = event.get("reasoning", "") 200 self.log_letta.info("[reasoning] %s", reasoning[:200]) 201 202 elif msg_type == "hidden_reasoning_message": 203 state = event.get("state", "") 204 self.log_letta.info("[hidden_reasoning] state=%s", state) 205 206 elif msg_type == "assistant_message": 207 content = event.get("content", "") 208 self.log_letta.info("[assistant] %s", content[:200]) 209 if assistant_reply: 210 assistant_reply += " " + content 211 else: 212 assistant_reply = content 213 214 elif msg_type == "tool_call_message": 215 name = event.get("name", "unknown_tool") 216 args = event.get("arguments", {}) 217 self.log_letta.info("[tool_call] %s(%s)", name, json.dumps(args)[:200]) 218 219 elif msg_type == "tool_return_message": 220 ret = event.get("return_value", "") 221 self.log_letta.info("[tool_return] %s", str(ret)[:200]) 222 223 elif msg_type == "stop_reason": 224 reason = event.get("stop_reason", "") 225 self.log_letta.info("[stop] %s", reason) 226 227 elif msg_type == "usage_statistics": 228 total = event.get("total_tokens", 0) 229 steps = event.get("step_count", 0) 230 self.log_letta.info("[usage] %s tokens, %s steps", total, steps) 231 232 elif msg_type == "ping": 233 self.log_letta.debug("[ping]") 234 235 else: 236 self.log_letta.debug("[%s] %s", msg_type, str(event)[:200]) 237 238 except AgentNotFoundError: 239 await self._privmsg(target, f"{sender}: Agent '@{agent_name}' no longer exists on Letta.") 240 return 241 except RateLimitError: 242 await self._privmsg(target, f"{sender}: Rate limited by Letta API, wait a moment.") 243 return 244 except asyncio.TimeoutError: 245 await self._privmsg(target, f"{sender}: Letta API timed out, try again later.") 246 return 247 except APIError as exc: 248 await self._privmsg(target, f"{sender}: Letta API error {exc.status}: {exc.body}") 249 return 250 except Exception as exc: 251 await self._privmsg(target, f"{sender}: Error talking to Letta: {type(exc).__name__}: {exc}") 252 return 253 254 if not assistant_reply: 255 assistant_reply = "(no response)" 256 257 await self._privmsg(target, f"{sender}: {assistant_reply}") 258 259 async def dispatch(self, sender: str, target: str, agent_name: str, payload: str): 260 agent_id = self.agents.get(agent_name) 261 if not agent_id: 262 available = ", ".join(sorted(self.agents.keys())) 263 await self._privmsg(target, f"{sender}: Unknown agent '@{agent_name}'. Available: {available}") 264 return 265 266 await self._privmsg(target, f"{sender}: researching via @{agent_name}...") 267 # Spawn background task so IRC read loop stays responsive to PINGs 268 asyncio.create_task(self._stream_and_reply(sender, target, agent_name, agent_id, payload)) 269 270 async def run(self): 271 delay = self.reconnect_delay 272 while True: 273 try: 274 await self.connect() 275 delay = self.reconnect_delay 276 while True: 277 raw = await self.reader.readline() 278 if not raw: 279 self.log.warning(f"{RED}Server closed connection (empty read){RST}") 280 break 281 line = raw.decode("utf-8", errors="replace").strip() 282 if line: 283 await self.handle_line(line) 284 except asyncio.CancelledError: 285 raise 286 except (BrokenPipeError, ConnectionResetError): 287 self.log.error(f"{RED}Server dropped connection (flood or network){RST}") 288 self.log.info(f"{RED}Reconnecting in %ss...{RST}", delay) 289 await asyncio.sleep(delay) 290 delay = min(delay * self.reconnect_backoff, 300) 291 except Exception as exc: 292 self.log.error(f"{RED}Connection lost{RST}", exc_info=True) 293 self.log.info(f"{RED}Reconnecting in %ss...{RST}", delay) 294 await asyncio.sleep(delay) 295 delay = min(delay * self.reconnect_backoff, 300) 296 finally: 297 if self.writer: 298 self.writer.close() 299 try: 300 await self.writer.wait_closed() 301 except Exception: 302 pass 303 304 305def main(): 306 if not os.getenv("LETTA_API_KEY"): 307 key = load_letta_key_from_settings() 308 if key: 309 os.environ["LETTA_API_KEY"] = key 310 config = load_config() 311 log = setup_logging() 312 bot = IRCBot(config, log) 313 try: 314 asyncio.run(bot.run()) 315 except KeyboardInterrupt: 316 log.info("Shutting down.") 317 sys.exit(0) 318 319 320if __name__ == "__main__": 321 main()