the universal sandbox runtime for agents and humans. pocketenv.io
sandbox openclaw agent claude-code vercel-sandbox deno-sandbox cloudflare-sandbox atproto sprites daytona
7
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add WebSocket support for SSH/TTY/PTY sessions

+423 -311
+2
apps/api/src/context.ts
··· 13 13 import { workers } from "cloudflare"; 14 14 import { Providers } from "consts"; 15 15 import type { Message } from "pty/pty-tunnel/messages"; 16 + import type { WebSocket } from "ws"; 16 17 import express from "express"; 17 18 18 19 const { DB_PATH } = env; ··· 32 33 export type Session = { 33 34 socket: TerminalSocket; 34 35 clients: Set<express.Response>; 36 + wsClients: Set<WebSocket>; 35 37 }; 36 38 37 39 const sessions = new Map<string, Session>();
+11 -7
apps/api/src/index.ts
··· 7 7 import { createServer } from "lexicon"; 8 8 import chalk from "chalk"; 9 9 import API from "./xrpc"; 10 - import ssh from "./ssh"; 11 - import tty from "./tty"; 12 - import pty from "./pty"; 10 + import ssh, { attachWebSocket as attachSshWebSocket } from "./ssh"; 11 + import tty, { attachWebSocket as attachTtyWebSocket } from "./tty"; 12 + import pty, { attachWebSocket as attachPtyWebSocket } from "./pty"; 13 13 import { createRateLimiter } from "./ratelimiter"; 14 14 15 - let server = createServer({ 15 + let xrpcServer = createServer({ 16 16 validateResponse: false, 17 17 payload: { 18 18 jsonLimit: 100 * 1024, // 100kb ··· 21 21 }, 22 22 }); 23 23 24 - server = API(server, ctx); 24 + xrpcServer = API(xrpcServer, ctx); 25 25 26 26 const app = express(); 27 27 ··· 52 52 }); 53 53 54 54 app.use(bsky); 55 - app.use(server.xrpc.router); 55 + app.use(xrpcServer.xrpc.router); 56 56 app.use("/ssh", ssh); 57 57 app.use("/tty", tty); 58 58 app.use("/pty", pty); 59 59 60 - app.listen(process.env.POCKETENV_XPRC_PORT || 8789, () => { 60 + const server = app.listen(process.env.POCKETENV_XPRC_PORT || 8789, () => { 61 61 consola.log(chalk.greenBright(banner)); 62 62 consola.info( 63 63 `Pocketenv XRPC API is running on port ${process.env.POCKETENV_XPRC_PORT || 8789}`, 64 64 ); 65 65 }); 66 + 67 + attachPtyWebSocket(server, "/pty"); 68 + attachTtyWebSocket(server, "/tty"); 69 + attachSshWebSocket(server, "/ssh");
+4 -1
apps/api/src/pty/e2b/index.ts
··· 45 45 }, 46 46 }; 47 47 48 - const session: Session = { socket, clients: new Set() }; 48 + const session: Session = { socket, clients: new Set(), wsClients: new Set() }; 49 49 50 50 const terminal = await sandbox.pty.create({ 51 51 cols: process.stdout.columns ?? 80, ··· 55 55 for (const res of session.clients) { 56 56 res.write("event: output\n"); 57 57 res.write(`data: ${JSON.stringify({ data: text })}\n\n`); 58 + } 59 + for (const ws of session.wsClients) { 60 + if (ws.readyState === ws.OPEN) ws.send(text); 58 61 } 59 62 }, 60 63 });
+78 -1
apps/api/src/pty/index.ts
··· 9 9 import * as vercel from "./vercel"; 10 10 import * as modal from "./modal"; 11 11 import * as e2b from "./e2b"; 12 + import { WebSocketServer, type WebSocket } from "ws"; 13 + import type { IncomingMessage } from "http"; 14 + import type { Server } from "http"; 15 + import type { Duplex } from "node:stream"; 12 16 13 17 const router = Router(); 14 18 router.use((req, res, next) => { ··· 45 49 e2bAuth: schema.e2bAuth.id, 46 50 }) 47 51 .from(schema.sandboxes) 48 - .leftJoin(schema.modalAuth, eq(schema.modalAuth.sandboxId, schema.sandboxes.id)) 52 + .leftJoin( 53 + schema.modalAuth, 54 + eq(schema.modalAuth.sandboxId, schema.sandboxes.id), 55 + ) 49 56 .leftJoin(schema.e2bAuth, eq(schema.e2bAuth.sandboxId, schema.sandboxes.id)) 50 57 .where(or(eq(schema.sandboxes.id, id), eq(schema.sandboxes.sandboxId, id))) 51 58 .execute(); ··· 106 113 }); 107 114 108 115 export default router; 116 + 117 + export function attachWebSocket(server: Server, base: string) { 118 + const pathRegex = new RegExp(`^${base}/([^/]+)/ws$`); 119 + const wss = new WebSocketServer({ noServer: true }); 120 + 121 + server.on("upgrade", (req: IncomingMessage, socket: Duplex, head: Buffer) => { 122 + const url = new URL(req.url ?? "", "http://localhost"); 123 + const match = url.pathname.match(pathRegex); 124 + if (!match) return; 125 + 126 + wss.handleUpgrade(req, socket, head, (ws) => { 127 + wss.emit("connection", ws, req, match[1]!); 128 + }); 129 + }); 130 + 131 + wss.on("connection", async (ws: WebSocket, req: IncomingMessage, id: string) => { 132 + const url = new URL(req.url ?? "", "http://localhost"); 133 + 134 + // Auth: query param ?token=<jwt> or Authorization: Bearer <jwt> header 135 + const tokenParam = url.searchParams.get("token"); 136 + const authHeader = req.headers.authorization; 137 + const bearer = tokenParam ?? authHeader?.split("Bearer ")[1]?.trim(); 138 + if (bearer && bearer !== "null") { 139 + try { 140 + jwt.verify(bearer, env.JWT_SECRET, { ignoreExpiration: true }); 141 + } catch (err) { 142 + consola.error("WS: Invalid JWT token:", err); 143 + ws.close(1008, "Invalid token"); 144 + return; 145 + } 146 + } 147 + 148 + let session: Awaited<ReturnType<typeof getSession>>; 149 + try { 150 + session = await getSession(context.ctx, id); 151 + } catch (err) { 152 + consola.error("WS: Failed to get session:", err); 153 + ws.close(1011, "Session error"); 154 + return; 155 + } 156 + 157 + session.wsClients.add(ws); 158 + 159 + ws.on("message", (data) => { 160 + const text = data.toString("utf-8"); 161 + try { 162 + const msg = JSON.parse(text); 163 + if ( 164 + msg?.type === "resize" && 165 + Number.isInteger(msg.cols) && 166 + Number.isInteger(msg.rows) 167 + ) { 168 + session.socket.sendMessage({ 169 + type: "resize", 170 + cols: msg.cols, 171 + rows: msg.rows, 172 + }); 173 + return; 174 + } 175 + } catch { 176 + // not JSON — treat as raw input 177 + } 178 + session.socket.sendMessage({ type: "message", message: text }); 179 + }); 180 + 181 + ws.on("close", () => { 182 + session.wsClients.delete(ws); 183 + }); 184 + }); 185 + }
+6 -3
apps/api/src/pty/modal/index.ts
··· 169 169 const session: Session = { 170 170 socket, 171 171 clients: new Set(), 172 + wsClients: new Set(), 172 173 }; 173 174 174 175 socket.addEventListener("message", async ({ data }) => { 176 + const text = data.toString("utf-8"); 175 177 for (const res of session.clients) { 176 178 res.write(`event: output\n`); 177 - res.write( 178 - `data: ${JSON.stringify({ data: data.toString("utf-8") })}\n\n`, 179 - ); 179 + res.write(`data: ${JSON.stringify({ data: text })}\n\n`); 180 + } 181 + for (const ws of session.wsClients) { 182 + if (ws.readyState === ws.OPEN) ws.send(text); 180 183 } 181 184 }); 182 185
+6 -3
apps/api/src/pty/vercel/index.ts
··· 165 165 const session: Session = { 166 166 socket, 167 167 clients: new Set(), 168 + wsClients: new Set(), 168 169 }; 169 170 170 171 socket.addEventListener("message", async ({ data }) => { 172 + const text = data.toString("utf-8"); 171 173 for (const res of session.clients) { 172 174 res.write(`event: output\n`); 173 - res.write( 174 - `data: ${JSON.stringify({ data: data.toString("utf-8") })}\n\n`, 175 - ); 175 + res.write(`data: ${JSON.stringify({ data: text })}\n\n`); 176 + } 177 + for (const ws of session.wsClients) { 178 + if (ws.readyState === ws.OPEN) ws.send(text); 176 179 } 177 180 }); 178 181
+83
apps/api/src/ssh/index.ts
··· 5 5 import jwt from "jsonwebtoken"; 6 6 import { env } from "lib/env"; 7 7 import generateJwt from "lib/generateJwt"; 8 + import { WebSocketServer, type WebSocket } from "ws"; 9 + import type { IncomingMessage } from "http"; 10 + import type { Server } from "http"; 11 + import type { Duplex } from "node:stream"; 8 12 9 13 interface SSHSession { 10 14 client: Client; 11 15 stream: NodeJS.ReadWriteStream | null; 12 16 sseRes: import("express").Response | null; 13 17 buffer: string[]; 18 + wsClients: Set<WebSocket>; 14 19 } 15 20 16 21 const sessions = new Map<string, SSHSession>(); ··· 67 72 stream: null, 68 73 sseRes: null, 69 74 buffer: [], 75 + wsClients: new Set(), 70 76 }; 71 77 72 78 sessions.set(sessionId, session); ··· 91 97 } else { 92 98 session.buffer.push(encoded); 93 99 } 100 + for (const ws of session.wsClients) { 101 + if (ws.readyState === ws.OPEN) ws.send(encoded); 102 + } 94 103 }); 95 104 96 105 stream.on("close", () => { ··· 99 108 session.sseRes.write(`event: close\ndata: closed\n\n`); 100 109 session.sseRes.end(); 101 110 } 111 + for (const ws of session.wsClients) { 112 + if (ws.readyState === ws.OPEN) ws.close(1000, "closed"); 113 + } 114 + session.wsClients.clear(); 102 115 client.end(); 103 116 sessions.delete(sessionId); 104 117 }); ··· 109 122 session.sseRes.write(`data: ${encoded}\n\n`); 110 123 } else { 111 124 session.buffer.push(encoded); 125 + } 126 + for (const ws of session.wsClients) { 127 + if (ws.readyState === ws.OPEN) ws.send(encoded); 112 128 } 113 129 }); 114 130 ··· 124 140 ); 125 141 session.sseRes.end(); 126 142 } 143 + for (const ws of session.wsClients) { 144 + if (ws.readyState === ws.OPEN) ws.close(1011, err.message); 145 + } 146 + session.wsClients.clear(); 127 147 sessions.delete(sessionId); 128 148 // Only respond if headers haven't been sent 129 149 if (!res.headersSent) { ··· 251 271 }); 252 272 253 273 export default router; 274 + 275 + export function attachWebSocket(server: Server, base: string) { 276 + const pathRegex = new RegExp(`^${base}/([^/]+)/ws$`); 277 + const wss = new WebSocketServer({ noServer: true }); 278 + 279 + server.on("upgrade", (req: IncomingMessage, socket: Duplex, head: Buffer) => { 280 + const url = new URL(req.url ?? "", "http://localhost"); 281 + const match = url.pathname.match(pathRegex); 282 + if (!match) return; 283 + 284 + wss.handleUpgrade(req, socket, head, (ws) => { 285 + wss.emit("connection", ws, req, match[1]!); 286 + }); 287 + }); 288 + 289 + wss.on("connection", async (ws: WebSocket, req: IncomingMessage, sessionId: string) => { 290 + const url = new URL(req.url ?? "", "http://localhost"); 291 + const tokenParam = url.searchParams.get("token"); 292 + const authHeader = req.headers.authorization; 293 + const bearer = tokenParam ?? authHeader?.split("Bearer ")[1]?.trim(); 294 + if (bearer && bearer !== "null") { 295 + try { 296 + jwt.verify(bearer, env.JWT_SECRET, { ignoreExpiration: true }); 297 + } catch (err) { 298 + consola.error("WS: Invalid JWT token:", err); 299 + ws.close(1008, "Invalid token"); 300 + return; 301 + } 302 + } 303 + 304 + const session = sessions.get(sessionId); 305 + if (!session) { 306 + ws.close(1011, "Session not found"); 307 + return; 308 + } 309 + 310 + session.wsClients.add(ws); 311 + 312 + // Flush buffered output that arrived before the WS client connected 313 + for (const encoded of session.buffer) { 314 + ws.send(encoded); 315 + } 316 + 317 + ws.on("message", (data) => { 318 + if (!session.stream) return; 319 + const text = data.toString("utf-8"); 320 + try { 321 + const msg = JSON.parse(text); 322 + if (msg?.type === "resize" && Number.isInteger(msg.cols) && Number.isInteger(msg.rows)) { 323 + (session.stream as any).setWindow(msg.rows, msg.cols, 0, 0); 324 + return; 325 + } 326 + } catch { 327 + // not JSON — treat as raw input 328 + } 329 + session.stream.write(text); 330 + }); 331 + 332 + ws.on("close", () => { 333 + session.wsClients.delete(ws); 334 + }); 335 + }); 336 + }
+77
apps/api/src/tty/index.tsx
··· 9 9 import schema from "schema"; 10 10 import decrypt from "lib/decrypt"; 11 11 import path from "node:path"; 12 + import { WebSocketServer, type WebSocket } from "ws"; 13 + import type { IncomingMessage } from "http"; 14 + import type { Server } from "http"; 15 + import type { Duplex } from "node:stream"; 12 16 13 17 const router = Router(); 14 18 router.use((req, res, next) => { ··· 39 43 type Session = { 40 44 cmd: any; 41 45 clients: Set<express.Response>; 46 + wsClients: Set<WebSocket>; 42 47 }; 43 48 44 49 const sessions = new Map<string, Session>(); ··· 278 283 const session: Session = { 279 284 cmd, 280 285 clients: new Set(), 286 + wsClients: new Set(), 281 287 }; 282 288 283 289 cmd.stdout.on("data", (chunk: Buffer | string) => { ··· 287 293 res.write(`event: output\n`); 288 294 res.write(`data: ${JSON.stringify({ data })}\n\n`); 289 295 } 296 + for (const ws of session.wsClients) { 297 + if (ws.readyState === ws.OPEN) ws.send(data); 298 + } 290 299 }); 291 300 292 301 cmd.on?.("exit", (code: number) => { ··· 294 303 res.write(`event: exit\n`); 295 304 res.write(`data: ${JSON.stringify({ code })}\n\n`); 296 305 } 306 + for (const ws of session.wsClients) { 307 + if (ws.readyState === ws.OPEN) ws.close(1000, "exit"); 308 + } 297 309 session.clients.clear(); 310 + session.wsClients.clear(); 298 311 sessions.delete(id); 299 312 }); 300 313 ··· 303 316 res.write(`event: error\n`); 304 317 res.write(`data: ${JSON.stringify({ message: err.message })}\n\n`); 305 318 } 319 + for (const ws of session.wsClients) { 320 + if (ws.readyState === ws.OPEN) ws.close(1011, err.message); 321 + } 306 322 session.clients.clear(); 323 + session.wsClients.clear(); 307 324 sessions.delete(id); 308 325 }); 309 326 ··· 363 380 }); 364 381 365 382 export default router; 383 + 384 + export function attachWebSocket(server: Server, base: string) { 385 + const pathRegex = new RegExp(`^${base}/([^/]+)/ws$`); 386 + const wss = new WebSocketServer({ noServer: true }); 387 + 388 + server.on("upgrade", (req: IncomingMessage, socket: Duplex, head: Buffer) => { 389 + const url = new URL(req.url ?? "", "http://localhost"); 390 + const match = url.pathname.match(pathRegex); 391 + if (!match) return; 392 + 393 + wss.handleUpgrade(req, socket, head, (ws) => { 394 + wss.emit("connection", ws, req, match[1]!); 395 + }); 396 + }); 397 + 398 + wss.on("connection", async (ws: WebSocket, req: IncomingMessage, id: string) => { 399 + const url = new URL(req.url ?? "", "http://localhost"); 400 + const tokenParam = url.searchParams.get("token"); 401 + const authHeader = req.headers.authorization; 402 + const bearer = tokenParam ?? authHeader?.split("Bearer ")[1]?.trim(); 403 + if (bearer && bearer !== "null") { 404 + try { 405 + jwt.verify(bearer, env.JWT_SECRET, { ignoreExpiration: true }); 406 + } catch (err) { 407 + consola.error("WS: Invalid JWT token:", err); 408 + ws.close(1008, "Invalid token"); 409 + return; 410 + } 411 + } 412 + 413 + let session: Session; 414 + try { 415 + session = await getSession(context.ctx, id); 416 + } catch (err) { 417 + consola.error("WS: Failed to get session:", err); 418 + ws.close(1011, "Session error"); 419 + return; 420 + } 421 + 422 + session.wsClients.add(ws); 423 + 424 + ws.on("message", (data) => { 425 + const text = data.toString("utf-8"); 426 + try { 427 + const msg = JSON.parse(text); 428 + if (msg?.type === "resize" && Number.isInteger(msg.cols) && Number.isInteger(msg.rows)) { 429 + session.cmd.resize(msg.cols, msg.rows); 430 + return; 431 + } 432 + } catch { 433 + // not JSON — treat as raw input 434 + } 435 + session.cmd.stdin.write(text); 436 + }); 437 + 438 + ws.on("close", () => { 439 + session.wsClients.delete(ws); 440 + }); 441 + }); 442 + }
+102 -143
apps/cli/src/cmd/ssh/terminal.ts
··· 1 + import WebSocket from "ws"; 1 2 import chalk from "chalk"; 2 3 import consola from "consola"; 3 4 import getAccessToken from "../../lib/getAccessToken"; 4 5 import { env } from "../../lib/env"; 5 6 import type { Sandbox } from "../../types/sandbox"; 6 - import { EventSource } from "eventsource"; 7 - import type { ErrorEvent } from "eventsource"; 8 7 import axios from "axios"; 9 8 10 - async function sendInput( 11 - apiUrl: string, 12 - sessionId: string, 13 - data: string | Buffer, 14 - token: string, 15 - ): Promise<void> { 16 - try { 17 - await axios.post( 18 - `${apiUrl}/ssh/input/${sessionId}`, 19 - { data: data instanceof Buffer ? data.toString("utf-8") : data }, 20 - { 21 - headers: { 22 - "Content-Type": "application/json", 23 - Authorization: `Bearer ${token}`, 24 - }, 25 - }, 26 - ); 27 - } catch { 28 - // session may have closed — swallow silently 29 - } 9 + // ── Protocol ────────────────────────────────────────────────────────────────── 10 + // 11 + // Step 1: POST /ssh/connect → { sessionId } 12 + // 13 + // Step 2: WS at /ssh/:sessionId/ws 14 + // Server → Client: 15 + // - Text frame base64-encoded PTY output 16 + // - Close SSH session ended 17 + // 18 + // Client → Server: 19 + // - Text frame raw keystroke bytes (UTF-8) 20 + // - Text frame JSON { type: "resize", cols: number, rows: number } 21 + 22 + function toWsUrl(httpUrl: string, path: string, token: string): string { 23 + const base = httpUrl.replace(/^http(s?)/, (_, s) => `ws${s}`); 24 + const url = new URL(`${base}${path}`); 25 + url.searchParams.set("token", token); 26 + return url.toString(); 30 27 } 31 28 32 - async function sendResize( 33 - apiUrl: string, 34 - sessionId: string, 35 - cols: number, 36 - rows: number, 37 - token: string, 38 - ): Promise<void> { 29 + async function terminal(sandbox: Sandbox): Promise<void> { 30 + const token = await getAccessToken(); 31 + const authToken = env.POCKETENV_TOKEN || token; 32 + const apiUrl = env.POCKETENV_API_URL; 33 + 34 + let cols = process.stdout.columns ?? 220; 35 + let rows = process.stdout.rows ?? 50; 36 + 37 + consola.info(`Connecting to ${chalk.cyanBright(sandbox.name)} via SSH…`); 38 + 39 + process.stdout.write(`\x1b[35mConnecting to SSH session...\x1b[0m\r\n`); 40 + 41 + // Step 1: create the SSH session 42 + let sessionId: string; 39 43 try { 40 - await axios.post( 41 - `${apiUrl}/ssh/resize/${sessionId}`, 44 + const res = await axios.post<{ sessionId: string }>( 45 + `${apiUrl}/ssh/connect`, 42 46 { cols, rows }, 43 47 { 44 48 headers: { 45 49 "Content-Type": "application/json", 46 - Authorization: `Bearer ${token}`, 50 + "X-Sandbox-Id": sandbox.id, 51 + Authorization: `Bearer ${authToken}`, 47 52 }, 48 53 }, 49 54 ); 50 - } catch { 51 - // ignore transient resize errors 55 + sessionId = res.data.sessionId; 56 + } catch (err: unknown) { 57 + process.stdout.write("\r\x1b[K"); 58 + const message = 59 + axios.isAxiosError(err) && err.response?.data 60 + ? (err.response.data as { message?: string; error?: string }).message ?? 61 + (err.response.data as { message?: string; error?: string }).error ?? 62 + String(err) 63 + : String(err); 64 + process.stderr.write( 65 + `\x1b[38;5;203mSSH connection failed: ${message}\x1b[0m\r\n`, 66 + ); 67 + process.exit(1); 52 68 } 53 - } 54 69 55 - function makeAuthFetch( 56 - token: string, 57 - ): (url: string | URL, init: RequestInit) => Promise<Response> { 58 - return (url: string | URL, init: RequestInit): Promise<Response> => { 59 - const headers = new Headers((init.headers as Record<string, string>) ?? {}); 60 - headers.set("Authorization", `Bearer ${token}`); 61 - return fetch(url, { ...init, headers }); 62 - }; 63 - } 64 - 65 - async function terminal(sandbox: Sandbox): Promise<void> { 66 - const token = await getAccessToken(); 67 - const authToken = env.POCKETENV_TOKEN || token; 68 - const apiUrl = env.POCKETENV_API_URL; 69 - 70 - let cols = process.stdout.columns ?? 220; 71 - let rows = process.stdout.rows ?? 50; 72 - 73 - consola.info( 74 - `Connecting to ${chalk.cyanBright(sandbox.name)} via SSH…`, 75 - ); 70 + // Step 2: open WebSocket to /ssh/:sessionId/ws 71 + const wsUrl = toWsUrl(apiUrl, `/ssh/${sessionId}/ws`, authToken); 72 + const ws = new WebSocket(wsUrl, { 73 + headers: { "User-Agent": "pocketenv-cli" }, 74 + }); 76 75 77 76 let exiting = false; 78 - let es: EventSource | null = null; 79 - let sessionId: string | null = null; 80 77 let stdinAttached = false; 81 78 79 + function sendResize(c: number, r: number): void { 80 + if (ws.readyState === WebSocket.OPEN) { 81 + ws.send(JSON.stringify({ type: "resize", cols: c, rows: r })); 82 + } 83 + } 84 + 82 85 function teardown(code = 0): void { 83 86 if (exiting) return; 84 87 exiting = true; ··· 87 90 try { 88 91 process.stdin.setRawMode(false); 89 92 } catch { 90 - // ignore 93 + // already restored 91 94 } 92 95 } 93 96 process.stdin.pause(); 94 97 95 - if (es) { 96 - es.close(); 97 - es = null; 98 + if ( 99 + ws.readyState === WebSocket.OPEN || 100 + ws.readyState === WebSocket.CONNECTING 101 + ) { 102 + ws.close(1000, "client disconnect"); 98 103 } 99 104 100 - if (sessionId) { 101 - const sid = sessionId; 102 - sessionId = null; 103 - axios 104 - .delete(`${apiUrl}/ssh/disconnect/${sid}`, { 105 - headers: { Authorization: `Bearer ${authToken}` }, 106 - }) 107 - .catch(() => {}); 108 - } 105 + axios 106 + .delete(`${apiUrl}/ssh/disconnect/${sessionId}`, { 107 + headers: { Authorization: `Bearer ${authToken}` }, 108 + }) 109 + .catch(() => {}); 109 110 110 111 process.exit(code); 111 112 } 112 113 113 - function attachStdin(sid: string): void { 114 + function attachStdin(): void { 114 115 if (stdinAttached) return; 115 116 stdinAttached = true; 116 117 ··· 125 126 teardown(0); 126 127 return; 127 128 } 128 - sendInput(apiUrl, sid, chunk, authToken); 129 + if (ws.readyState === WebSocket.OPEN) { 130 + ws.send(chunk.toString("utf-8")); 131 + } 129 132 }); 130 133 131 134 process.stdout.on("resize", () => { 132 135 cols = process.stdout.columns ?? cols; 133 136 rows = process.stdout.rows ?? rows; 134 - sendResize(apiUrl, sid, cols, rows, authToken); 137 + sendResize(cols, rows); 135 138 }); 136 139 } 137 140 138 - process.stdout.write(`\x1b[35mConnecting to SSH session...\x1b[0m\r\n`); 139 - 140 - // Step 1: POST /ssh/connect to obtain a sessionId 141 - let connectResponse: { sessionId: string }; 142 - try { 143 - const res = await axios.post<{ sessionId: string }>( 144 - `${apiUrl}/ssh/connect`, 145 - { cols, rows }, 146 - { 147 - headers: { 148 - "Content-Type": "application/json", 149 - "X-Sandbox-Id": sandbox.id, 150 - Authorization: `Bearer ${authToken}`, 151 - }, 152 - }, 153 - ); 154 - connectResponse = res.data; 155 - } catch (err: unknown) { 141 + ws.on("open", () => { 156 142 process.stdout.write("\r\x1b[K"); 157 - const message = 158 - axios.isAxiosError(err) && err.response?.data 159 - ? (err.response.data as { message?: string; error?: string }) 160 - .message ?? 161 - (err.response.data as { message?: string; error?: string }).error ?? 162 - String(err) 163 - : String(err); 164 - process.stderr.write( 165 - `\x1b[38;5;203mSSH connection failed: ${message}\x1b[0m\r\n`, 166 - ); 167 - process.exit(1); 168 - } 169 - 170 - sessionId = connectResponse.sessionId; 171 - 172 - // Erase the "Connecting…" line 173 - process.stdout.write("\r\x1b[K"); 174 - 175 - // Step 2: open the SSE stream 176 - es = new EventSource(`${apiUrl}/ssh/stream/${sessionId}`, { 177 - fetch: makeAuthFetch(authToken), 143 + sendResize(cols, rows); 144 + attachStdin(); 178 145 }); 179 146 180 - es.addEventListener("connected", () => { 181 - const sid = sessionId!; 182 - sendResize(apiUrl, sid, cols, rows, authToken).then(() => { 183 - attachStdin(sid); 184 - }); 185 - }); 186 - 187 - // Default `message` events carry base64-encoded PTY output 188 - es.onmessage = (event: MessageEvent) => { 147 + ws.on("message", (raw: WebSocket.RawData, isBinary: boolean) => { 148 + if (isBinary) { 149 + process.stdout.write(raw as Buffer); 150 + return; 151 + } 152 + // base64-encoded SSH output 189 153 try { 190 - const bytes = Buffer.from(event.data as string, "base64"); 154 + const bytes = Buffer.from(raw.toString(), "base64"); 191 155 process.stdout.write(bytes); 192 156 } catch { 193 - process.stdout.write(event.data as string); 157 + process.stdout.write(raw.toString()); 194 158 } 195 - }; 196 - 197 - es.addEventListener("close", () => { 198 - process.stderr.write(`\r\n${chalk.dim("SSH session closed.")}\r\n`); 199 - teardown(0); 200 159 }); 201 160 202 - es.onerror = (err: ErrorEvent) => { 203 - if (es && es.readyState === EventSource.CLOSED) { 204 - if (!err.message) { 205 - teardown(0); 206 - } else { 161 + ws.on("close", (code, reason) => { 162 + if (!exiting) { 163 + process.stderr.write(`\r\n${chalk.dim("SSH session closed.")}\r\n`); 164 + if (reason.length) { 207 165 process.stderr.write( 208 - `\r\n${chalk.red(`SSH connection lost (${err.message})`)}\r\n`, 166 + `${chalk.yellow("Connection closed")} (${code} – ${reason})\r\n`, 209 167 ); 210 - teardown(1); 211 168 } 169 + teardown(0); 212 170 } 213 - }; 171 + }); 172 + 173 + ws.on("error", (err: Error) => { 174 + consola.error("WebSocket error:", err.message); 175 + teardown(1); 176 + }); 214 177 215 178 process.on("SIGINT", () => teardown(0)); 216 179 process.on("SIGTERM", () => teardown(0)); 217 180 218 181 await new Promise<void>((resolve) => { 219 - const poll = setInterval(() => { 220 - if (exiting) { 221 - clearInterval(poll); 222 - resolve(); 223 - } 224 - }, 200); 182 + ws.on("close", resolve); 183 + ws.on("error", () => resolve()); 225 184 }); 226 185 } 227 186
+54 -153
apps/cli/src/cmd/ssh/tty.ts
··· 1 + import WebSocket from "ws"; 1 2 import chalk from "chalk"; 2 3 import consola from "consola"; 3 4 import getAccessToken from "../../lib/getAccessToken"; 4 5 import { env } from "../../lib/env"; 5 6 import type { Sandbox } from "../../types/sandbox"; 6 - import { EventSource } from "eventsource"; 7 - import type { ErrorEvent } from "eventsource"; 8 - import axios from "axios"; 9 7 10 - // ── Protocol (mirrors TtyTerminal web component) ────────────────────────────── 8 + // ── Protocol ────────────────────────────────────────────────────────────────── 11 9 // 12 - // Server → Client (SSE stream at GET /tty/:id/stream): 13 - // event: output data: { "data": "<string>" } raw PTY output chunk 14 - // event: exit data: { "code": <number> } remote shell exited 10 + // Server → Client (WS at /tty/:id/ws or /pty/:id/ws): 11 + // - Text frame raw PTY output, write directly to stdout 12 + // - Close shell exited or session error 15 13 // 16 14 // Client → Server: 17 - // POST /tty/:id/input Content-Type: text/plain raw keystroke bytes 18 - // POST /tty/:id/resize Content-Type: application/json { cols, rows } 19 - 20 - async function sendInput( 21 - ttyUrl: string, 22 - sandboxId: string, 23 - data: string | Buffer, 24 - token: string, 25 - ): Promise<void> { 26 - try { 27 - await axios.post( 28 - `${ttyUrl}/${sandboxId}/input`, 29 - data instanceof Buffer ? data.toString("utf-8") : data, 30 - { 31 - headers: { 32 - "Content-Type": "text/plain", 33 - Authorization: `Bearer ${token}`, 34 - }, 35 - }, 36 - ); 37 - } catch { 38 - // session may have closed — swallow the error silently 39 - } 40 - } 41 - 42 - async function sendResize( 43 - ttyUrl: string, 44 - sandboxId: string, 45 - cols: number, 46 - rows: number, 47 - token: string, 48 - ): Promise<void> { 49 - try { 50 - await axios.post( 51 - `${ttyUrl}/${sandboxId}/resize`, 52 - { cols, rows }, 53 - { 54 - headers: { 55 - "Content-Type": "application/json", 56 - Authorization: `Bearer ${token}`, 57 - }, 58 - }, 59 - ); 60 - } catch { 61 - // ignore transient resize errors 62 - } 63 - } 15 + // - Text frame raw keystroke bytes (UTF-8) 16 + // - Text frame JSON { type: "resize", cols: number, rows: number } 64 17 65 - /** 66 - * Build a custom fetch function that injects the Authorization header into 67 - * every SSE request. The eventsource v3 package uses a fetch-based 68 - * implementation and exposes this hook via `EventSourceInit.fetch`. 69 - */ 70 - function makeAuthFetch( 71 - token: string, 72 - ): (url: string | URL, init: RequestInit) => Promise<Response> { 73 - return (url: string | URL, init: RequestInit): Promise<Response> => { 74 - const headers = new Headers((init.headers as Record<string, string>) ?? {}); 75 - headers.set("Authorization", `Bearer ${token}`); 76 - return fetch(url, { ...init, headers }); 77 - }; 18 + function toWsUrl(httpUrl: string, path: string, token: string): string { 19 + const base = httpUrl.replace(/^http(s?)/, (_, s) => `ws${s}`); 20 + const url = new URL(`${base}${path}`); 21 + url.searchParams.set("token", token); 22 + return url.toString(); 78 23 } 79 24 80 25 async function ssh(sandbox: Sandbox, tty: boolean = false): Promise<void> { 81 26 const token = await getAccessToken(); 82 27 const authToken = env.POCKETENV_TOKEN || token; 83 28 84 - const ttyUrl = tty ? env.POCKETENV_TTY_URL : env.POCKETENV_PTY_URL; 29 + const baseUrl = tty ? env.POCKETENV_TTY_URL : env.POCKETENV_PTY_URL; 30 + const wsUrl = toWsUrl(baseUrl, `/${sandbox.id}/ws`, authToken); 85 31 86 32 let cols = process.stdout.columns ?? 220; 87 33 let rows = process.stdout.rows ?? 50; 88 34 89 35 consola.info( 90 - `Connecting to ${chalk.cyanBright(sandbox.name)} via TTY stream…`, 36 + `Connecting to ${chalk.cyanBright(sandbox.name)} via ${tty ? "TTY" : "PTY"} WebSocket…`, 91 37 ); 92 38 39 + const ws = new WebSocket(wsUrl, { 40 + headers: { "User-Agent": "pocketenv-cli" }, 41 + }); 42 + 93 43 let exiting = false; 94 - let es: EventSource | null = null; 95 44 let stdinAttached = false; 96 45 46 + function sendResize(c: number, r: number): void { 47 + if (ws.readyState === WebSocket.OPEN) { 48 + ws.send(JSON.stringify({ type: "resize", cols: c, rows: r })); 49 + } 50 + } 51 + 97 52 function teardown(code = 0): void { 98 53 if (exiting) return; 99 54 exiting = true; ··· 102 57 try { 103 58 process.stdin.setRawMode(false); 104 59 } catch { 105 - // ignore – may already be restored by the time teardown runs 60 + // already restored 106 61 } 107 62 } 108 63 process.stdin.pause(); 109 64 110 - if (es) { 111 - es.close(); 112 - es = null; 65 + if ( 66 + ws.readyState === WebSocket.OPEN || 67 + ws.readyState === WebSocket.CONNECTING 68 + ) { 69 + ws.close(1000, "client disconnect"); 113 70 } 114 71 115 72 process.exit(code); ··· 119 76 if (stdinAttached) return; 120 77 stdinAttached = true; 121 78 122 - // Switch stdin to raw mode — every keystroke is forwarded immediately, 123 - // with no local echo or line-buffering. 124 79 if (process.stdin.isTTY) { 125 80 process.stdin.setRawMode(true); 126 81 } 127 - 128 - // Keep stdin flowing as a raw binary stream. 129 82 process.stdin.resume(); 130 83 131 - // stdin → POST /tty/:id/input 132 - // In raw mode the OS never raises SIGINT — Ctrl+C arrives as a raw byte 133 - // in the data stream and is forwarded to the remote shell as-is. 134 - // We use Ctrl+K (\x0b) as a local-only escape hatch to avoid conflicting 135 - // with Ctrl+C semantics inside the remote shell. 136 84 process.stdin.on("data", (chunk: Buffer) => { 137 85 if (chunk.includes(0x0b)) { 138 - // Ctrl+K pressed — tear down immediately without waiting for the server. 86 + // Ctrl+K — local escape hatch 139 87 teardown(0); 140 88 return; 141 89 } 142 - sendInput(ttyUrl, sandbox.id, chunk, authToken); 90 + if (ws.readyState === WebSocket.OPEN) { 91 + ws.send(chunk.toString("utf-8")); 92 + } 143 93 }); 144 94 145 - // Terminal resize → notify the remote PTY. 146 95 process.stdout.on("resize", () => { 147 96 cols = process.stdout.columns ?? cols; 148 97 rows = process.stdout.rows ?? rows; 149 - sendResize(ttyUrl, sandbox.id, cols, rows, authToken); 98 + sendResize(cols, rows); 150 99 }); 151 100 } 152 101 153 - // Mirror TtyTerminal: print a magenta "Connecting…" hint before the stream 154 - // opens, then erase it once the `open` event fires. 155 102 process.stdout.write(`\x1b[35mConnecting to terminal...\x1b[0m\r\n`); 156 103 157 - // Open the SSE stream. 158 - // eventsource v3 is fetch-based, so we inject the Authorization header via 159 - // a custom fetch implementation instead of an `headers` init option. 160 - es = new EventSource(`${ttyUrl}/${sandbox.id}/stream`, { 161 - fetch: makeAuthFetch(authToken), 104 + ws.on("open", () => { 105 + process.stdout.write("\r\x1b[K"); 106 + sendResize(cols, rows); 107 + attachStdin(); 162 108 }); 163 109 164 - es.addEventListener("open", () => { 165 - // Erase the "Connecting…" line (carriage-return + erase-to-end-of-line), 166 - // exactly as TtyTerminal does with `instance.write("\r\x1b[K")`. 167 - process.stdout.write("\r\x1b[K"); 168 - 169 - // Sync terminal dimensions immediately after connecting, then attach stdin. 170 - sendResize(ttyUrl, sandbox.id, cols, rows, authToken).then(() => { 171 - attachStdin(); 172 - }); 110 + ws.on("message", (raw: WebSocket.RawData, isBinary: boolean) => { 111 + process.stdout.write(isBinary ? (raw as Buffer) : raw.toString("utf-8")); 173 112 }); 174 113 175 - // `event: output` data: { "data": "..." } 176 - es.addEventListener("output", (event: MessageEvent) => { 177 - try { 178 - const { data } = JSON.parse(event.data as string) as { data: string }; 179 - process.stdout.write(data); 180 - } catch { 181 - // Fall back to writing the raw SSE data if the JSON wrapper is absent. 182 - process.stdout.write(event.data as string); 114 + ws.on("close", (code, reason) => { 115 + if (!exiting) { 116 + const msg = reason.length ? ` (${code} – ${reason})` : ""; 117 + if (msg) { 118 + process.stderr.write(`\r\n${chalk.yellow("Connection closed")}${msg}\r\n`); 119 + } 120 + teardown(0); 183 121 } 184 122 }); 185 123 186 - // `event: exit` data: { "code": <number> } 187 - es.addEventListener("exit", (event: MessageEvent) => { 188 - let code = 0; 189 - try { 190 - const parsed = JSON.parse(event.data as string) as { code: number }; 191 - code = parsed.code ?? 0; 192 - process.stderr.write( 193 - `\r\n${chalk.dim(`Process exited with code ${code}`)}\r\n`, 194 - ); 195 - } catch { 196 - process.stderr.write(`\r\n${chalk.dim("Process exited.")}\r\n`); 197 - } 198 - teardown(code); 124 + ws.on("error", (err: Error) => { 125 + consola.error("WebSocket error:", err.message); 126 + teardown(1); 199 127 }); 200 128 201 - // `onerror` receives an `ErrorEvent` (eventsource v3 type). 202 - // readyState === 2 (CLOSED) means the stream is gone and the client will 203 - // NOT auto-retry. readyState === 0 (CONNECTING) is an auto-retry — let it. 204 - es.onerror = (err: ErrorEvent) => { 205 - // The eventsource package exposes readyState on the EventSource instance. 206 - if (es && es.readyState === EventSource.CLOSED) { 207 - // If the shell exited cleanly the server will close the SSE stream with 208 - // no error message. Treat a message-less close as a graceful exit (code 209 - // 0) rather than a connection error, so the user isn't shown a red 210 - // "connection lost" banner after a normal `exit`. 211 - if (!err.message) { 212 - teardown(0); 213 - } else { 214 - process.stderr.write( 215 - `\r\n${chalk.red(`Terminal connection lost (${err.message})`)}\r\n`, 216 - ); 217 - teardown(1); 218 - } 219 - } 220 - }; 221 - 222 129 process.on("SIGINT", () => teardown(0)); 223 130 process.on("SIGTERM", () => teardown(0)); 224 131 225 - // Block until teardown() fires (which calls process.exit, but the Promise 226 - // is here as a safety net for future refactors that remove process.exit). 227 132 await new Promise<void>((resolve) => { 228 - const poll = setInterval(() => { 229 - if (exiting) { 230 - clearInterval(poll); 231 - resolve(); 232 - } 233 - }, 200); 133 + ws.on("close", resolve); 134 + ws.on("error", () => resolve()); 234 135 }); 235 136 } 236 137