the universal sandbox runtime for agents and humans. pocketenv.io
sandbox openclaw agent claude-code vercel-sandbox deno-sandbox cloudflare-sandbox atproto sprites daytona
7
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 224 lines 7.0 kB view raw
1import { consola } from "consola"; 2import type { Context } from "context"; 3import * as context from "context"; 4import express, { Router } from "express"; 5import { env } from "lib/env"; 6import jwt from "jsonwebtoken"; 7import { eq, or } from "drizzle-orm"; 8import schema from "schema"; 9import * as vercel from "./vercel"; 10import * as modal from "./modal"; 11import * as e2b from "./e2b"; 12import * as runloop from "./runloop"; 13import * as hopx from "./hopx"; 14import { WebSocketServer, type WebSocket } from "ws"; 15import type { IncomingMessage } from "http"; 16 17const router = Router(); 18router.use((req, res, next) => { 19 req.ctx = context.ctx; 20 next(); 21}); 22router.use(express.json()); 23 24router.use((req, res, next) => { 25 req.sandboxId = req.headers["x-sandbox-id"] as string | undefined; 26 const authHeader = req.headers.authorization; 27 const bearer = authHeader?.split("Bearer ")[1]?.trim(); 28 if (bearer && bearer !== "null") { 29 try { 30 const credentials = jwt.verify(bearer, env.JWT_SECRET, { 31 ignoreExpiration: true, 32 }) as { did: string }; 33 34 req.did = credentials.did; 35 } catch (err) { 36 consola.error("Invalid JWT token:", err); 37 } 38 } 39 40 next(); 41}); 42 43async function getSession(ctx: Context, id: string, key = id) { 44 if (ctx.sessions.has(key)) { 45 const existing = ctx.sessions.get(key)!; 46 // If the underlying pty-tunnel socket is closed, evict and recreate. 47 const sock = existing.socket as { readyState?: number }; 48 if (sock.readyState !== undefined && sock.readyState !== 1 /* OPEN */) { 49 consola.info("PTY session stale, recreating", { id, key }); 50 ctx.sessions.delete(key); 51 } else { 52 return existing; 53 } 54 } 55 56 const [record] = await ctx.db 57 .select({ 58 modalAuth: schema.modalAuth.id, 59 e2bAuth: schema.e2bAuth.id, 60 runloopAuth: schema.runloopAuth.id, 61 hopxAuth: schema.hopxAuth.id, 62 }) 63 .from(schema.sandboxes) 64 .leftJoin( 65 schema.modalAuth, 66 eq(schema.modalAuth.sandboxId, schema.sandboxes.id), 67 ) 68 .leftJoin(schema.e2bAuth, eq(schema.e2bAuth.sandboxId, schema.sandboxes.id)) 69 .leftJoin( 70 schema.runloopAuth, 71 eq(schema.runloopAuth.sandboxId, schema.sandboxes.id), 72 ) 73 .leftJoin( 74 schema.hopxAuth, 75 eq(schema.hopxAuth.sandboxId, schema.sandboxes.id), 76 ) 77 .where(or(eq(schema.sandboxes.id, id), eq(schema.sandboxes.sandboxId, id))) 78 .execute(); 79 80 if (record?.modalAuth) return modal.createTerminalSession(ctx, id, key); 81 if (record?.e2bAuth) return e2b.createTerminalSession(ctx, id, key); 82 if (record?.runloopAuth) return runloop.createTerminalSession(ctx, id, key); 83 if (record?.hopxAuth) return hopx.createTerminalSession(ctx, id, key); 84 return vercel.createTerminalSession(ctx, id, key); 85} 86 87router.get("/:id/stream", async (req, res) => { 88 const { id } = req.params; 89 const session = await getSession(req.ctx, id); 90 91 res.setHeader("Content-Type", "text/event-stream"); 92 res.setHeader("Cache-Control", "no-cache, no-transform"); 93 res.setHeader("Connection", "keep-alive"); 94 res.flushHeaders?.(); 95 96 session.clients.add(res); 97 98 const keepAlive = setInterval(() => { 99 res.write(`: ping\n\n`); 100 }, 15000); 101 102 req.on("close", () => { 103 clearInterval(keepAlive); 104 session.clients.delete(res); 105 }); 106}); 107 108router.post("/:id/input", express.text({ type: "*/*" }), async (req, res) => { 109 const { id } = req.params; 110 const session = await getSession(req.ctx, id); 111 112 const input = typeof req.body === "string" ? req.body : ""; 113 session.socket.sendMessage({ 114 type: "message", 115 message: input, 116 }); 117 118 res.status(204).end(); 119}); 120 121router.post("/:id/resize", async (req, res) => { 122 const { id } = req.params; 123 const session = await getSession(req.ctx, id); 124 125 const cols = Number(req.body?.cols); 126 const rows = Number(req.body?.rows); 127 128 if (!Number.isInteger(cols) || !Number.isInteger(rows)) { 129 res.status(400).json({ error: "Invalid cols/rows" }); 130 return; 131 } 132 133 session.socket.sendMessage({ type: "resize", cols, rows }); 134 res.status(204).end(); 135}); 136 137export default router; 138 139export function attachWebSocket(base: string) { 140 const pathRegex = new RegExp(`^${base}/([^/]+)/ws$`); 141 const wss = new WebSocketServer({ noServer: true }); 142 143 wss.on( 144 "connection", 145 async (ws: WebSocket, req: IncomingMessage, id: string) => { 146 const url = new URL(req.url ?? "", "http://localhost"); 147 148 // Auth: query param ?token=<jwt> or Authorization: Bearer <jwt> header 149 const tokenParam = url.searchParams.get("token"); 150 const authHeader = req.headers.authorization; 151 const bearer = tokenParam ?? authHeader?.split("Bearer ")[1]?.trim(); 152 if (bearer && bearer !== "null") { 153 try { 154 jwt.verify(bearer, env.JWT_SECRET, { ignoreExpiration: true }); 155 } catch (err) { 156 consola.error("WS: Invalid JWT token:", err); 157 ws.close(1008, "Invalid token"); 158 return; 159 } 160 } 161 162 const shareId = url.searchParams.get("sessionId") ?? undefined; 163 const key = shareId ?? id; 164 165 // The WS upgrade completes immediately but session creation is async. 166 // Buffer any messages (resize, keystrokes) that arrive before the session 167 // is ready so they can be replayed once the session exists. 168 const pendingMessages: Buffer[] = []; 169 const bufferMessage = (data: Buffer) => pendingMessages.push(data); 170 ws.on("message", bufferMessage); 171 172 let session: Awaited<ReturnType<typeof getSession>>; 173 try { 174 session = await getSession(context.ctx, id, key); 175 } catch (err) { 176 consola.error("WS: Failed to get session:", err); 177 ws.close(1011, "Session error"); 178 return; 179 } 180 181 session.wsClients.add(ws); 182 ws.off("message", bufferMessage); 183 184 const handleMessage = (data: Buffer) => { 185 const text = data.toString("utf-8"); 186 try { 187 const msg = JSON.parse(text); 188 if ( 189 msg?.type === "resize" && 190 Number.isInteger(msg.cols) && 191 Number.isInteger(msg.rows) 192 ) { 193 session.socket.sendMessage({ 194 type: "resize", 195 cols: msg.cols, 196 rows: msg.rows, 197 }); 198 return; 199 } 200 } catch { 201 // not JSON — treat as raw input 202 } 203 session.socket.sendMessage({ type: "message", message: text }); 204 }; 205 206 // Replay messages buffered during session setup (e.g. the initial resize). 207 for (const data of pendingMessages) { 208 handleMessage(data); 209 } 210 211 ws.on("message", (data) => handleMessage(data as Buffer)); 212 213 ws.on("close", () => { 214 session.wsClients.delete(ws); 215 }); 216 217 // The shell's initial prompt was output while wsClients was empty and is 218 // now lost. Send a newline to trigger a fresh prompt redraw. 219 session.socket.sendMessage({ type: "message", message: "\n" }); 220 }, 221 ); 222 223 return { wss, pathRegex }; 224}