A decentralized music tracking and discovery platform built on AT Protocol ๐ŸŽต rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz
98
fork

Configure Feed

Select the types of activity you want to include in your feed.

Improve WebSocket backpressure handling and logs

Add pacing and diagnostic features to server and client
- Client: SLOW_MODE, heartbeat handling, reduced log spam, ping timing,
and message rate/progress metrics
- Server: reduce PAGE_SIZE, add MESSAGE_DELAY_MS, heartbeat interval,
safer send with error logging, more frequent backpressure checks and
clearer 1006 diagnostics

+114 -14
+54 -6
tap/scripts/test-client.ts
··· 1 1 #!/usr/bin/env -S deno run --allow-net 2 2 3 3 const WS_URL = Deno.env.get("WS_URL") || "ws://localhost:2481"; 4 + const SLOW_MODE = Deno.env.get("SLOW_MODE") === "true"; // Set SLOW_MODE=true to slow down 4 5 5 6 console.log(`๐Ÿ”Œ Connecting to ${WS_URL}...`); 7 + if (SLOW_MODE) { 8 + console.log(`๐ŸŒ SLOW MODE enabled - will process messages slowly`); 9 + } 6 10 7 11 const ws = new WebSocket(WS_URL); 8 12 9 13 let messageCount = 0; 10 14 let startTime = Date.now(); 15 + let lastMessageTime = Date.now(); 11 16 12 17 ws.onopen = () => { 13 18 console.log("โœ… WebSocket connection opened"); 14 19 console.log(` readyState: ${ws.readyState}`); 20 + console.log(` Time: ${new Date().toISOString()}`); 15 21 16 22 // Send ping immediately 17 23 console.log("๐Ÿ“ค Sending ping..."); ··· 20 26 // Send ping every 5 seconds 21 27 setInterval(() => { 22 28 if (ws.readyState === WebSocket.OPEN) { 23 - console.log("๐Ÿ“ค Sending ping..."); 29 + const now = Date.now(); 30 + const timeSinceLastMessage = now - lastMessageTime; 31 + console.log( 32 + `๐Ÿ“ค Sending ping... (${timeSinceLastMessage}ms since last message)`, 33 + ); 24 34 ws.send("ping"); 25 35 } 26 36 }, 5000); 27 37 }; 28 38 29 - ws.onmessage = (event) => { 39 + ws.onmessage = async (event) => { 30 40 messageCount++; 41 + lastMessageTime = Date.now(); 31 42 const elapsed = ((Date.now() - startTime) / 1000).toFixed(2); 32 43 33 44 try { 34 45 const data = JSON.parse(event.data); 35 46 if (data.type === "connected") { 36 47 console.log(`๐Ÿ“จ [${elapsed}s] Connection confirmed: ${data.message}`); 48 + } else if (data.type === "heartbeat") { 49 + console.log(`๐Ÿ’“ [${elapsed}s] Heartbeat received`); 37 50 } else if (data.type === "error") { 38 51 console.log(`โŒ [${elapsed}s] Error: ${data.message}`); 39 52 } else { 40 - console.log(`๐Ÿ“จ [${elapsed}s] Message #${messageCount}:`, data); 53 + if (messageCount % 100 === 0 || messageCount <= 10) { 54 + console.log(`๐Ÿ“จ [${elapsed}s] Message #${messageCount}:`, data); 55 + } 41 56 } 42 57 } catch { 43 58 // Not JSON, just log as text ··· 45 60 } 46 61 47 62 if (messageCount % 100 === 0) { 48 - console.log(`๐Ÿ“Š Progress: ${messageCount} messages received in ${elapsed}s`); 63 + const rate = (messageCount / parseFloat(elapsed)).toFixed(2); 64 + console.log( 65 + `๐Ÿ“Š Progress: ${messageCount} messages received in ${elapsed}s (${rate} msg/s)`, 66 + ); 67 + } 68 + 69 + // In slow mode, add delay to simulate slow client 70 + if (SLOW_MODE && messageCount % 10 === 0) { 71 + await new Promise((resolve) => setTimeout(resolve, 10)); 49 72 } 50 73 }; 51 74 52 75 ws.onerror = (error) => { 53 - console.error("โŒ WebSocket error:", error); 76 + console.error("โŒ WebSocket error occurred"); 77 + console.error(" Error:", error); 78 + console.error(" Time:", new Date().toISOString()); 79 + console.error(" Messages received so far:", messageCount); 54 80 }; 55 81 56 82 ws.onclose = (event) => { 57 83 const elapsed = ((Date.now() - startTime) / 1000).toFixed(2); 58 - console.log(`โŒ WebSocket closed after ${elapsed}s`); 84 + const rate = 85 + messageCount > 0 ? (messageCount / parseFloat(elapsed)).toFixed(2) : "0"; 86 + 87 + console.log(`\nโŒ WebSocket closed after ${elapsed}s`); 59 88 console.log(` Code: ${event.code}`); 60 89 console.log(` Reason: ${event.reason || "No reason provided"}`); 61 90 console.log(` Clean: ${event.wasClean}`); 62 91 console.log(` Total messages received: ${messageCount}`); 92 + console.log(` Average rate: ${rate} messages/second`); 93 + console.log(` Time: ${new Date().toISOString()}`); 94 + 95 + if (event.code === 1006) { 96 + console.error(`\nโš ๏ธ ERROR 1006: Abnormal Closure`); 97 + console.error( 98 + ` This means the connection dropped without proper close frame.`, 99 + ); 100 + console.error(` Last message was ${Date.now() - lastMessageTime}ms ago`); 101 + console.error(` Possible causes:`); 102 + console.error(` - Server sent messages too fast (backpressure)`); 103 + console.error(` - Server crashed or panicked`); 104 + console.error(` - Network timeout or interruption`); 105 + console.error(` - Client couldn't keep up with message rate`); 106 + console.error( 107 + `\n Try running with: SLOW_MODE=true deno run --allow-net scripts/test-client.ts`, 108 + ); 109 + } 110 + 63 111 Deno.exit(event.wasClean ? 0 : 1); 64 112 }; 65 113
+60 -8
tap/src/main.ts
··· 6 6 import { omit } from "@es-toolkit/es-toolkit/compat"; 7 7 import type { SelectEvent } from "./schema/event.ts"; 8 8 9 - const PAGE_SIZE = 500; 10 - const PAGE_DELAY_MS = 0; 11 - const YIELD_EVERY_N_PAGES = 5; 12 - const MAX_BUFFER_SIZE = 1024 * 1024; // 1MB buffer limit 13 - const BACKPRESSURE_CHECK_INTERVAL = 100; // Check every 100 events 9 + const PAGE_SIZE = 50; 10 + const PAGE_DELAY_MS = 10; 11 + const YIELD_EVERY_N_PAGES = 1; // Yield after every page 12 + const MAX_BUFFER_SIZE = 64 * 1024; // 64KB buffer limit (more conservative) 13 + const BACKPRESSURE_CHECK_INTERVAL = 10; // Check every 10 events 14 + const MESSAGE_DELAY_MS = 1; // Add tiny delay between messages 15 + const VERBOSE_LOGGING = false; // Set to true for detailed message tracking 14 16 15 17 interface ClientState { 16 18 socket: WebSocket; ··· 20 22 21 23 const connectedClients = new Map<WebSocket, ClientState>(); 22 24 23 - function safeSend(socket: WebSocket, message: string): boolean { 25 + // Helper function to safely send message with error handling 26 + function safeSend( 27 + socket: WebSocket, 28 + message: string, 29 + eventCount?: number, 30 + ): boolean { 24 31 try { 25 32 if (socket.readyState === WebSocket.OPEN) { 26 33 socket.send(message); 34 + if ( 35 + VERBOSE_LOGGING && 36 + eventCount !== undefined && 37 + eventCount % 50 === 0 38 + ) { 39 + logger.info`๐Ÿ“ค Sent ${eventCount} events, readyState: ${socket.readyState}`; 40 + } 27 41 return true; 42 + } else { 43 + logger.error`โŒ Cannot send - socket readyState: ${socket.readyState}`; 28 44 } 29 45 } catch (error) { 30 46 logger.error`Failed to send message: ${error}`; 47 + logger.error`Socket readyState: ${socket.readyState}`; 31 48 } 32 49 return false; 33 50 } ··· 87 104 ); 88 105 logger.info`๐Ÿ“ค Sent connection confirmation`; 89 106 107 + const heartbeatInterval = setInterval(() => { 108 + if (socket.readyState === WebSocket.OPEN) { 109 + safeSend( 110 + socket, 111 + JSON.stringify({ type: "heartbeat", timestamp: Date.now() }), 112 + ); 113 + } else { 114 + clearInterval(heartbeatInterval); 115 + } 116 + }, 10000); 117 + 90 118 (async () => { 91 119 try { 92 120 let page = 0; ··· 136 164 record: JSON.parse(evt.record), 137 165 }), 138 166 }), 167 + totalEvents, 139 168 ); 140 169 141 170 if (success) { 142 171 totalEvents++; 172 + } else { 173 + logger.error`โŒ Failed to send event at index ${totalEvents}, stopping pagination`; 174 + return; 175 + } 176 + 177 + if (MESSAGE_DELAY_MS > 0 && i % 5 === 0) { 178 + await new Promise((resolve) => 179 + setTimeout(resolve, MESSAGE_DELAY_MS), 180 + ); 143 181 } 144 182 145 183 if (totalEvents % BACKPRESSURE_CHECK_INTERVAL === 0) { ··· 201 239 clientState.isPaginating = false; 202 240 } 203 241 } 204 - })(); 242 + })().catch((err) => { 243 + logger.error`Unhandled error in pagination loop: ${err}`; 244 + logger.error`Stack: ${err instanceof Error ? err.stack : ""}`; 245 + }); 205 246 }); 206 247 207 248 socket.addEventListener("message", (event) => { ··· 215 256 }); 216 257 217 258 socket.addEventListener("close", (event) => { 259 + const clientState = connectedClients.get(socket); 218 260 connectedClients.delete(socket); 261 + 219 262 logger.info`โŒ Client disconnected. Code: ${event.code}, Reason: ${event.reason || "none"}, Clean: ${event.wasClean}`; 220 263 logger.info` Active clients: ${connectedClients.size}`; 221 264 265 + if (clientState) { 266 + logger.info` Was paginating: ${clientState.isPaginating}`; 267 + logger.info` Queued events: ${clientState.queue.length}`; 268 + } 269 + 222 270 if (event.code === 1006) { 223 271 logger.error`โš ๏ธ Abnormal closure (1006) detected - connection dropped unexpectedly`; 224 - logger.error` This usually means: backpressure, server crash, or network issue`; 272 + logger.error` Possible causes:`; 273 + logger.error` - Client overwhelmed with messages (try reducing PAGE_SIZE)`; 274 + logger.error` - Network timeout or interruption`; 275 + logger.error` - Server sent messages too fast`; 276 + logger.error` - Uncaught exception in message handling`; 225 277 } 226 278 }); 227 279