A decentralized music tracking and discovery platform built on AT Protocol 🎵 rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz
98
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add WebSocket backpressure handling and logging

Introduce safeSend and waitForBackpressure to avoid write errors and
throttle when socket.bufferedAmount exceeds a 1MB limit. Replace direct
socket.send calls with safeSend, periodically check backpressure during
pagination, and queue/flush logic. Enhance connection close and error
logs to include codes, reasons, and client state for easier debugging.

+95 -43
+95 -43
tap/src/main.ts
··· 9 9 const PAGE_SIZE = 500; 10 10 const PAGE_DELAY_MS = 0; 11 11 const YIELD_EVERY_N_PAGES = 5; 12 + const MAX_BUFFER_SIZE = 1024 * 1024; // 1MB buffer limit 13 + const BACKPRESSURE_CHECK_INTERVAL = 100; // Check every 100 events 12 14 13 15 interface ClientState { 14 16 socket: WebSocket; ··· 18 20 19 21 const connectedClients = new Map<WebSocket, ClientState>(); 20 22 23 + function safeSend(socket: WebSocket, message: string): boolean { 24 + try { 25 + if (socket.readyState === WebSocket.OPEN) { 26 + socket.send(message); 27 + return true; 28 + } 29 + } catch (error) { 30 + logger.error`Failed to send message: ${error}`; 31 + } 32 + return false; 33 + } 34 + 35 + async function waitForBackpressure(socket: WebSocket): Promise<void> { 36 + const bufferedAmount = (socket as any).bufferedAmount; 37 + if (bufferedAmount && bufferedAmount > MAX_BUFFER_SIZE) { 38 + logger.info`⏸️ Backpressure detected (${bufferedAmount} bytes buffered), waiting...`; 39 + // Wait for buffer to drain 40 + await new Promise((resolve) => setTimeout(resolve, 100)); 41 + } 42 + } 43 + 21 44 export function broadcastEvent(evt: SelectEvent) { 22 45 const message = JSON.stringify({ 23 46 ...omit(evt, "createdAt", "record"), ··· 31 54 if (state.isPaginating) { 32 55 state.queue.push(evt); 33 56 } else { 34 - socket.send(message); 57 + safeSend(socket, message); 35 58 } 36 59 } 37 60 } ··· 55 78 queue: [], 56 79 }); 57 80 58 - try { 59 - socket.send( 60 - JSON.stringify({ 61 - type: "connected", 62 - message: "Ready to stream events", 63 - }), 64 - ); 65 - logger.info`📤 Sent connection confirmation`; 66 - } catch (error) { 67 - logger.error`Failed to send connection confirmation: ${error}`; 68 - } 81 + safeSend( 82 + socket, 83 + JSON.stringify({ 84 + type: "connected", 85 + message: "Ready to stream events", 86 + }), 87 + ); 88 + logger.info`📤 Sent connection confirmation`; 69 89 70 90 (async () => { 71 91 try { ··· 100 120 logger.info`📄 Fetching page ${page}... (${totalEvents} events sent so far)`; 101 121 } 102 122 103 - for (const evt of events) { 104 - if (socket.readyState === WebSocket.OPEN) { 105 - socket.send( 106 - JSON.stringify({ 107 - ...omit(evt, "createdAt", "record"), 108 - ...(evt.record && { 109 - record: JSON.parse(evt.record), 110 - }), 123 + for (let i = 0; i < events.length; i++) { 124 + const evt = events[i]; 125 + 126 + if (socket.readyState !== WebSocket.OPEN) { 127 + logger.info`⚠️ Socket closed during pagination at event ${totalEvents}`; 128 + return; 129 + } 130 + 131 + const success = safeSend( 132 + socket, 133 + JSON.stringify({ 134 + ...omit(evt, "createdAt", "record"), 135 + ...(evt.record && { 136 + record: JSON.parse(evt.record), 111 137 }), 112 - ); 138 + }), 139 + ); 140 + 141 + if (success) { 113 142 totalEvents++; 143 + } 144 + 145 + if (totalEvents % BACKPRESSURE_CHECK_INTERVAL === 0) { 146 + await waitForBackpressure(socket); 114 147 } 115 148 } 116 149 ··· 132 165 logger.info`📦 Sending ${queuedCount} queued events...`; 133 166 134 167 for (const evt of clientState.queue) { 135 - if (socket.readyState === WebSocket.OPEN) { 136 - socket.send( 137 - JSON.stringify({ 138 - ...omit(evt, "createdAt", "record"), 139 - ...(evt.record && { 140 - record: JSON.parse(evt.record), 141 - }), 168 + if (socket.readyState !== WebSocket.OPEN) break; 169 + 170 + safeSend( 171 + socket, 172 + JSON.stringify({ 173 + ...omit(evt, "createdAt", "record"), 174 + ...(evt.record && { 175 + record: JSON.parse(evt.record), 142 176 }), 143 - ); 144 - } 177 + }), 178 + ); 145 179 } 146 180 147 181 clientState.queue = []; ··· 152 186 } 153 187 } catch (error) { 154 188 logger.error`Pagination error: ${error}`; 155 - if (socket.readyState === WebSocket.OPEN) { 156 - socket.send( 157 - JSON.stringify({ 158 - type: "error", 159 - message: "Failed to load historical events", 160 - }), 161 - ); 162 - } 189 + logger.error`Stack: ${error instanceof Error ? error.stack : ""}`; 190 + 191 + safeSend( 192 + socket, 193 + JSON.stringify({ 194 + type: "error", 195 + message: "Failed to load historical events", 196 + }), 197 + ); 163 198 164 199 const clientState = connectedClients.get(socket); 165 200 if (clientState) { ··· 170 205 }); 171 206 172 207 socket.addEventListener("message", (event) => { 173 - if (event.data === "ping") { 174 - socket.send("pong"); 208 + try { 209 + if (event.data === "ping") { 210 + safeSend(socket, "pong"); 211 + } 212 + } catch (error) { 213 + logger.error`Error handling message: ${error}`; 175 214 } 176 215 }); 177 216 178 - socket.addEventListener("close", () => { 217 + socket.addEventListener("close", (event) => { 179 218 connectedClients.delete(socket); 180 - logger.info`❌ Client disconnected. Active clients: ${connectedClients.size}`; 219 + logger.info`❌ Client disconnected. Code: ${event.code}, Reason: ${event.reason || "none"}, Clean: ${event.wasClean}`; 220 + logger.info` Active clients: ${connectedClients.size}`; 221 + 222 + if (event.code === 1006) { 223 + logger.error`⚠️ Abnormal closure (1006) detected - connection dropped unexpectedly`; 224 + logger.error` This usually means: backpressure, server crash, or network issue`; 225 + } 181 226 }); 182 227 183 228 socket.addEventListener("error", (error) => { 184 - logger.error`WebSocket error: ${error}`; 229 + logger.error`❌ WebSocket error occurred`; 230 + logger.error` Error: ${error}`; 231 + logger.error` ReadyState: ${socket.readyState}`; 232 + const clientState = connectedClients.get(socket); 233 + if (clientState) { 234 + logger.error` Was paginating: ${clientState.isPaginating}`; 235 + logger.error` Queued events: ${clientState.queue.length}`; 236 + } 185 237 connectedClients.delete(socket); 186 238 }); 187 239