a collection of lightweight TypeScript packages for AT Protocol, the protocol powering Bluesky
atproto bluesky typescript npm
101
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(xrpc-server)!: add drain() to WebSocketConnection for backpressure

the router now calls `await ws.drain()` after every frame it sends so
adapters can gate on the outgoing send buffer. watermarks are
adapter-level concerns (platform-tuned), not router-level, so each
adapter accepts `highWaterMark` / `lowWaterMark` (default 250 KB / 50 KB)
via its factory options:

- Node (`ws`): polls `ws.bufferedAmount` until it drops below the low
watermark.
- Bun: same, using `ws.getBufferedAmount()`.
- Deno: same, using the native `WebSocket.bufferedAmount`.
- Cloudflare Workers: no-op; the runtime does not surface the outgoing
buffer.

without backpressure, a slow client on a high-throughput subscription
(e.g. firehose) could balloon memory in the adapter's send queue.

Mary d9a05fea 94d5ce8d

+156 -6
+16
.changeset/xrpc-server-drain.md
··· 1 + --- 2 + '@atcute/xrpc-server': major 3 + '@atcute/xrpc-server-bun': major 4 + '@atcute/xrpc-server-cloudflare': major 5 + '@atcute/xrpc-server-deno': major 6 + '@atcute/xrpc-server-node': major 7 + --- 8 + 9 + `WebSocketConnection` gains a required `drain(): void | Promise<void>` method. the router awaits it 10 + after every frame it sends so adapters can gate on the outgoing buffer. all four adapters now accept 11 + `highWaterMark` / `lowWaterMark` options (default 250 KB / 50 KB) on their factory functions and 12 + poll `bufferedAmount` to throttle the send loop; the Cloudflare Workers adapter no-ops because the 13 + runtime does not surface the outgoing buffer. 14 + 15 + without backpressure, a slow client on a high-throughput subscription (e.g. firehose) could balloon 16 + memory in the adapter's send queue.
+38 -2
packages/servers/xrpc-server-bun/lib/index.ts
··· 15 15 }; 16 16 } 17 17 18 - export const createBunWebSocket = (): BunWebSocket => { 18 + export interface CreateBunWebSocketOptions { 19 + /** backpressure high water mark in bytes; defaults to 250 KB. */ 20 + highWaterMark?: number; 21 + /** backpressure low water mark in bytes; defaults to 50 KB. */ 22 + lowWaterMark?: number; 23 + } 24 + 25 + export const createBunWebSocket = ({ 26 + highWaterMark = 250_000, 27 + lowWaterMark = 50_000, 28 + }: CreateBunWebSocketOptions = {}): BunWebSocket => { 19 29 let server: Bun.Server<WsData> | undefined; 20 30 21 31 return { ··· 48 58 websocket: { 49 59 async open(ws) { 50 60 const { controller, handler } = ws.data; 61 + const signal = controller.signal; 51 62 52 63 const connection: WebSocketConnection = { 53 - signal: controller.signal, 64 + signal: signal, 54 65 send(data) { 55 66 ws.sendBinary(data); 56 67 }, 68 + async drain() { 69 + if (ws.getBufferedAmount() <= highWaterMark) { 70 + return; 71 + } 72 + 73 + while (!signal.aborted && ws.readyState === 1 && ws.getBufferedAmount() > lowWaterMark) { 74 + await sleep(10, signal); 75 + } 76 + }, 57 77 close(code, reason) { 58 78 ws.close(code, reason); 59 79 }, ··· 74 94 }, 75 95 }; 76 96 }; 97 + 98 + const sleep = (ms: number, signal: AbortSignal): Promise<void> => { 99 + return new Promise((resolve) => { 100 + const timer = setTimeout(() => { 101 + signal.removeEventListener('abort', onAbort); 102 + resolve(); 103 + }, ms); 104 + 105 + const onAbort = () => { 106 + clearTimeout(timer); 107 + resolve(); 108 + }; 109 + 110 + signal.addEventListener('abort', onAbort, { once: true }); 111 + }); 112 + };
+4
packages/servers/xrpc-server-cloudflare/lib/index.ts
··· 11 11 send: (data: Uint8Array) => { 12 12 server.send(data); 13 13 }, 14 + drain: () => { 15 + // Cloudflare Workers do not surface the outgoing WebSocket buffer; 16 + // there is no way to apply backpressure at this layer. 17 + }, 14 18 close: (code?: number, reason?: string) => { 15 19 server.close(code, reason); 16 20 },
+42 -2
packages/servers/xrpc-server-deno/lib/index.ts
··· 1 1 import type { WebSocketAdapter, WebSocketConnection } from '@atcute/xrpc-server'; 2 2 3 - export const createDenoWebSocket = (): WebSocketAdapter => { 3 + export interface CreateDenoWebSocketOptions { 4 + /** backpressure high water mark in bytes; defaults to 250 KB. */ 5 + highWaterMark?: number; 6 + /** backpressure low water mark in bytes; defaults to 50 KB. */ 7 + lowWaterMark?: number; 8 + } 9 + 10 + export const createDenoWebSocket = ({ 11 + highWaterMark = 250_000, 12 + lowWaterMark = 50_000, 13 + }: CreateDenoWebSocketOptions = {}): WebSocketAdapter => { 4 14 return { 5 15 async upgrade(request, handler) { 6 16 const { response, socket } = Deno.upgradeWebSocket(request); 7 17 8 18 const controller = new AbortController(); 19 + const signal = controller.signal; 9 20 const connection: WebSocketConnection = { 10 - signal: controller.signal, 21 + signal: signal, 11 22 send(data: Uint8Array) { 12 23 socket.send(data as Uint8Array<ArrayBuffer>); 13 24 }, 25 + async drain() { 26 + if (socket.bufferedAmount <= highWaterMark) { 27 + return; 28 + } 29 + 30 + while ( 31 + !signal.aborted && 32 + socket.readyState === WebSocket.OPEN && 33 + socket.bufferedAmount > lowWaterMark 34 + ) { 35 + await sleep(10, signal); 36 + } 37 + }, 14 38 close(code?: number, reason?: string) { 15 39 socket.close(code, reason); 16 40 }, ··· 28 52 }, 29 53 }; 30 54 }; 55 + 56 + const sleep = (ms: number, signal: AbortSignal): Promise<void> => { 57 + return new Promise((resolve) => { 58 + const timer = setTimeout(() => { 59 + signal.removeEventListener('abort', onAbort); 60 + resolve(); 61 + }, ms); 62 + 63 + const onAbort = () => { 64 + clearTimeout(timer); 65 + resolve(); 66 + }; 67 + 68 + signal.addEventListener('abort', onAbort, { once: true }); 69 + }); 70 + };
+38 -2
packages/servers/xrpc-server-node/lib/index.ts
··· 15 15 injectWebSocket(server: Server | Http2Server | Http2SecureServer, router: XRPCRouter): void; 16 16 } 17 17 18 + export interface CreateNodeWebSocketOptions { 19 + /** backpressure high water mark in bytes; defaults to 250 KB. */ 20 + highWaterMark?: number; 21 + /** backpressure low water mark in bytes; defaults to 50 KB. */ 22 + lowWaterMark?: number; 23 + } 24 + 18 25 interface WebSocketHandlerContext { 19 26 handler: ((ws: WebSocketConnection) => Promisable<void>) | null; 20 27 } 21 28 22 - export const createNodeWebSocket = (): NodeWebSocket => { 29 + export const createNodeWebSocket = ({ 30 + highWaterMark = 250_000, 31 + lowWaterMark = 50_000, 32 + }: CreateNodeWebSocketOptions = {}): NodeWebSocket => { 23 33 const context = new AsyncLocalStorage<WebSocketHandlerContext>(); 24 34 const wss = new WebSocketServer({ noServer: true }); 25 35 ··· 71 81 wss.emit('connection', ws, request); 72 82 73 83 const controller = new AbortController(); 84 + const signal = controller.signal; 74 85 const connection: WebSocketConnection = { 75 - signal: controller.signal, 86 + signal: signal, 76 87 send(data) { 77 88 return new Promise((resolve, reject) => { 78 89 ws.send(data, (err) => { ··· 84 95 }); 85 96 }); 86 97 }, 98 + async drain() { 99 + if (ws.bufferedAmount <= highWaterMark) { 100 + return; 101 + } 102 + 103 + while (!signal.aborted && ws.readyState === 1 && ws.bufferedAmount > lowWaterMark) { 104 + await sleep(10, signal); 105 + } 106 + }, 87 107 close(code, reason) { 88 108 ws.close(code, reason); 89 109 }, ··· 108 128 }, 109 129 }; 110 130 }; 131 + 132 + const sleep = (ms: number, signal: AbortSignal): Promise<void> => { 133 + return new Promise((resolve) => { 134 + const timer = setTimeout(() => { 135 + signal.removeEventListener('abort', onAbort); 136 + resolve(); 137 + }, ms); 138 + 139 + const onAbort = () => { 140 + clearTimeout(timer); 141 + resolve(); 142 + }; 143 + 144 + signal.addEventListener('abort', onAbort, { once: true }); 145 + }); 146 + };
+5
packages/servers/xrpc-server/README.md
··· 254 254 }); 255 255 ``` 256 256 257 + backpressure is handled per adapter: each adapter's `create*WebSocket()` factory accepts 258 + `highWaterMark` / `lowWaterMark` options (default 250 KB / 50 KB) that throttle the send loop when 259 + the outgoing buffer grows. the Cloudflare Workers adapter does not apply backpressure — the runtime 260 + does not expose the outgoing WebSocket buffer. 261 + 257 262 ### service authentication 258 263 259 264 the `@atcute/xrpc-server/auth` subpackage provides utilities for service-to-service authentication
+2
packages/servers/xrpc-server/lib/main/router.ts
··· 417 417 418 418 const frame = encodeMessageFrame(body, type); 419 419 await ws.send(frame); 420 + const drained = ws.drain(); 421 + if (drained) await drained; 420 422 } 421 423 422 424 ws.close(1000);
+8
packages/servers/xrpc-server/lib/main/types/websocket.ts
··· 3 3 export interface WebSocketConnection { 4 4 signal: AbortSignal; 5 5 send(data: Uint8Array): void | Promise<void>; 6 + /** 7 + * backpressure hook invoked by the router after every frame it sends. 8 + * adapters that can observe the outgoing send buffer (Node `ws`, Bun, Deno) 9 + * should resolve only once the buffer has drained below a healthy threshold. 10 + * adapters without that visibility (e.g. Cloudflare Workers) should return 11 + * synchronously. 12 + */ 13 + drain(): void | Promise<void>; 6 14 close(code?: number, reason?: string): void; 7 15 } 8 16
+3
packages/servers/xrpc-server/lib/main/utils/websocket-mock.ts
··· 79 79 send(data) { 80 80 onMessage.emit(data); 81 81 }, 82 + drain() { 83 + // tests have no outgoing buffer to observe 84 + }, 82 85 close(code = 1000, reason = '') { 83 86 if (!signal.aborted) { 84 87 onClose.emit({ code, reason, wasClean: true });