Suite of AT Protocol TypeScript libraries built on web standards
21
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(ws): websocket edges

+35 -13
+1 -1
sync/deno.json
··· 1 1 { 2 2 "name": "@atp/sync", 3 - "version": "0.1.0-alpha.7", 3 + "version": "0.1.0-alpha.8", 4 4 "exports": "./mod.ts", 5 5 "license": "MIT", 6 6 "imports": {
+1 -1
xrpc-server/deno.json
··· 1 1 { 2 2 "name": "@atp/xrpc-server", 3 - "version": "0.1.0-alpha.8", 3 + "version": "0.1.0-alpha.9", 4 4 "exports": "./mod.ts", 5 5 "license": "MIT", 6 6 "imports": {
+26 -1
xrpc-server/stream/stream.ts
··· 22 22 * - Yields Uint8Array 23 23 * - Cleans up listeners on close/error/return() 24 24 */ 25 - export function iterateBinary(ws: WebSocket): AsyncIterable<Uint8Array> { 25 + export function iterateBinary( 26 + ws: WebSocket, 27 + signal?: AbortSignal, 28 + ): AsyncIterable<Uint8Array> { 26 29 const queue: (Uint8Array | Error | null)[] = []; 27 30 let resolve: ((v: IteratorResult<Uint8Array>) => void) | null = null; 28 31 ··· 65 68 const err = (ev as ErrorEvent).error ?? new Error("WebSocket error"); 66 69 queue.push(err); 67 70 pump(); 71 + cleanup(); 68 72 }; 69 73 70 74 const onClose = () => { 71 75 queue.push(null); 72 76 pump(); 77 + cleanup(); 73 78 }; 74 79 75 80 ws.addEventListener("message", onMessage); 76 81 ws.addEventListener("error", onError); 77 82 ws.addEventListener("close", onClose); 83 + if (signal?.aborted) { 84 + queue.push(makeAbortError(signal.reason)); 85 + pump(); 86 + cleanup(); 87 + } else if (signal) { 88 + signal.addEventListener("abort", onAbort); 89 + } 78 90 79 91 const iterator: AsyncIterator<Uint8Array> = { 80 92 next() { ··· 104 116 ws.removeEventListener("message", onMessage); 105 117 ws.removeEventListener("error", onError); 106 118 ws.removeEventListener("close", onClose); 119 + signal?.removeEventListener("abort", onAbort); 107 120 } 108 121 109 122 return { ··· 111 124 return iterator; 112 125 }, 113 126 }; 127 + 128 + function onAbort() { 129 + queue.push(makeAbortError(signal?.reason)); 130 + pump(); 131 + cleanup(); 132 + } 133 + } 134 + 135 + function makeAbortError(reason: unknown): Error { 136 + const err = new DOMException("Aborted", "AbortError"); 137 + err.cause = reason; 138 + return err; 114 139 } 115 140 116 141 /** Iterate by low-level Frame (binary in → Frame out) */
+7 -10
xrpc-server/stream/websocket-keepalive.ts
··· 33 33 34 34 // Reconnect hook 35 35 onReconnectError?: (error: unknown, n: number, initialSetup: boolean) => void; 36 + onReconnect?: () => void; 36 37 37 38 createSocket?: (url: string, protocols?: string | string[]) => WebSocket; 38 39 protocols?: string | string[]; ··· 72 73 forwardSignal(this.opts.signal, ac); 73 74 } 74 75 75 - // Track liveness (application-level heartbeat) 76 - this.startHeartbeat(ws, ac); 77 - 78 76 // When the socket opens, reset backoff. 79 77 ws.addEventListener( 80 78 "open", 81 79 () => { 80 + if (!this.initialSetup) { 81 + this.opts.onReconnect?.(); 82 + } 82 83 this.initialSetup = false; 83 84 this.reconnects = 0; 85 + this.startHeartbeat(ws, ac); 84 86 }, 85 87 { once: true }, 86 88 ); ··· 102 104 103 105 try { 104 106 // Iterate incoming binary chunks 105 - for await (const chunk of iterateBinary(ws)) { 107 + for await (const chunk of iterateBinary(ws, ac.signal)) { 106 108 yield chunk; 107 109 } 108 110 } catch (error) { ··· 162 164 // No pong/traffic since last tick → consider dead and close. 163 165 ws.close(1000); 164 166 // Abort the iterator with a recognizable shape like before. 165 - const domErr = new DOMException("Aborted", "AbortError"); 166 - domErr.cause = new DisconnectError( 167 - CloseCode.Abnormal, 168 - "HeartbeatTimeout", 169 - ); 170 - ac.abort(domErr); 167 + ac.abort(new AbnormalCloseError("HeartbeatTimeout")); 171 168 return; 172 169 } 173 170 isAlive = false;