A decentralized music tracking and discovery platform built on AT Protocol ๐ŸŽต rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz
98
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add WS test client and improve pagination/batching

+189 -47
+72
tap/scripts/test-client.ts
··· 1 + #!/usr/bin/env -S deno run --allow-net 2 + 3 + const WS_URL = Deno.env.get("WS_URL") || "ws://localhost:2481"; 4 + 5 + console.log(`๐Ÿ”Œ Connecting to ${WS_URL}...`); 6 + 7 + const ws = new WebSocket(WS_URL); 8 + 9 + let messageCount = 0; 10 + let startTime = Date.now(); 11 + 12 + ws.onopen = () => { 13 + console.log("โœ… WebSocket connection opened"); 14 + console.log(` readyState: ${ws.readyState}`); 15 + 16 + // Send ping immediately 17 + console.log("๐Ÿ“ค Sending ping..."); 18 + ws.send("ping"); 19 + 20 + // Send ping every 5 seconds 21 + setInterval(() => { 22 + if (ws.readyState === WebSocket.OPEN) { 23 + console.log("๐Ÿ“ค Sending ping..."); 24 + ws.send("ping"); 25 + } 26 + }, 5000); 27 + }; 28 + 29 + ws.onmessage = (event) => { 30 + messageCount++; 31 + const elapsed = ((Date.now() - startTime) / 1000).toFixed(2); 32 + 33 + try { 34 + const data = JSON.parse(event.data); 35 + if (data.type === "connected") { 36 + console.log(`๐Ÿ“จ [${elapsed}s] Connection confirmed: ${data.message}`); 37 + } else if (data.type === "error") { 38 + console.log(`โŒ [${elapsed}s] Error: ${data.message}`); 39 + } else { 40 + console.log(`๐Ÿ“จ [${elapsed}s] Message #${messageCount}:`, data); 41 + } 42 + } catch { 43 + // Not JSON, just log as text 44 + console.log(`๐Ÿ“จ [${elapsed}s] Message #${messageCount}: ${event.data}`); 45 + } 46 + 47 + if (messageCount % 100 === 0) { 48 + console.log(`๐Ÿ“Š Progress: ${messageCount} messages received in ${elapsed}s`); 49 + } 50 + }; 51 + 52 + ws.onerror = (error) => { 53 + console.error("โŒ WebSocket error:", error); 54 + }; 55 + 56 + ws.onclose = (event) => { 57 + const elapsed = ((Date.now() - startTime) / 1000).toFixed(2); 58 + console.log(`โŒ WebSocket closed after ${elapsed}s`); 59 + console.log(` Code: ${event.code}`); 60 + console.log(` Reason: ${event.reason || "No reason provided"}`); 61 + console.log(` Clean: ${event.wasClean}`); 62 + console.log(` Total messages received: ${messageCount}`); 63 + Deno.exit(event.wasClean ? 0 : 1); 64 + }; 65 + 66 + // Handle Ctrl+C gracefully 67 + Deno.addSignalListener("SIGINT", () => { 68 + console.log("\n๐Ÿ›‘ Closing connection..."); 69 + ws.close(); 70 + }); 71 + 72 + console.log("โณ Waiting for messages... (Press Ctrl+C to exit)");
+108 -43
tap/src/main.ts
··· 6 6 import { omit } from "@es-toolkit/es-toolkit/compat"; 7 7 import type { SelectEvent } from "./schema/event.ts"; 8 8 9 - const PAGE_SIZE = 50; 9 + const PAGE_SIZE = 10; 10 + const PAGE_DELAY_MS = 1; 10 11 11 12 interface ClientState { 12 13 socket: WebSocket; ··· 44 45 45 46 const { socket, response } = Deno.upgradeWebSocket(req); 46 47 47 - socket.addEventListener("open", async () => { 48 - logger.info`โœ… Connected to Tap!`; 48 + socket.addEventListener("open", () => { 49 + logger.info`โœ… Client connected! Socket state: ${socket.readyState}`; 49 50 50 51 connectedClients.set(socket, { 51 52 socket, ··· 53 54 queue: [], 54 55 }); 55 56 56 - let page = 0; 57 - let hasMore = true; 57 + try { 58 + socket.send( 59 + JSON.stringify({ 60 + type: "connected", 61 + message: "Ready to stream events", 62 + }), 63 + ); 64 + logger.info`๐Ÿ“ค Sent connection confirmation`; 65 + } catch (error) { 66 + logger.error`Failed to send connection confirmation: ${error}`; 67 + } 58 68 59 - while (hasMore) { 60 - const events = await ctx.db 61 - .select() 62 - .from(schema.events) 63 - .orderBy(asc(schema.events.createdAt)) 64 - .offset(page * PAGE_SIZE) 65 - .limit(PAGE_SIZE) 66 - .execute(); 69 + (async () => { 70 + try { 71 + // Small delay to ensure connection is fully established 72 + await new Promise((resolve) => setTimeout(resolve, 100)); 73 + 74 + let page = 0; 75 + let hasMore = true; 76 + let totalEvents = 0; 77 + 78 + logger.info`๐Ÿ“– Starting pagination...`; 79 + 80 + try { 81 + const testQuery = await ctx.db 82 + .select() 83 + .from(schema.events) 84 + .limit(1) 85 + .execute(); 86 + logger.info`โœ… Database test query successful, found ${testQuery.length} sample event(s)`; 87 + } catch (dbError) { 88 + logger.error`โŒ Database test query failed: ${dbError}`; 89 + throw dbError; 90 + } 91 + 92 + while (hasMore && socket.readyState === WebSocket.OPEN) { 93 + logger.info`๐Ÿ“„ Fetching page ${page}...`; 94 + 95 + const events = await ctx.db 96 + .select() 97 + .from(schema.events) 98 + .orderBy(asc(schema.events.createdAt)) 99 + .offset(page * PAGE_SIZE) 100 + .limit(PAGE_SIZE) 101 + .execute(); 102 + 103 + logger.info`๐Ÿ“„ Got ${events.length} events from page ${page}`; 104 + 105 + for (const evt of events) { 106 + if (socket.readyState === WebSocket.OPEN) { 107 + socket.send( 108 + JSON.stringify({ 109 + ...omit(evt, "createdAt", "record"), 110 + ...(evt.record && { 111 + record: JSON.parse(evt.record), 112 + }), 113 + }), 114 + ); 115 + totalEvents++; 116 + } 117 + } 118 + 119 + hasMore = events.length === PAGE_SIZE; 120 + page++; 121 + 122 + // Yield to event loop between pages 123 + if (hasMore) { 124 + await new Promise((resolve) => setTimeout(resolve, PAGE_DELAY_MS)); 125 + } 126 + } 67 127 68 - for (const evt of events) { 69 - socket.send( 70 - JSON.stringify({ 71 - ...omit(evt, "createdAt", "record"), 72 - ...(evt.record && { 73 - record: JSON.parse(evt.record), 74 - }), 75 - }), 76 - ); 77 - } 128 + logger.info`๐Ÿ“ค Sent all historical events: ${totalEvents} total (${page} pages)`; 78 129 79 - hasMore = events.length === PAGE_SIZE; 80 - page++; 81 - } 130 + const clientState = connectedClients.get(socket); 131 + if (clientState && socket.readyState === WebSocket.OPEN) { 132 + const queuedCount = clientState.queue.length; 82 133 83 - logger.info`๐Ÿ“ค Sent all historical events (${page} pages)`; 134 + if (queuedCount > 0) { 135 + logger.info`๐Ÿ“ฆ Sending ${queuedCount} queued events...`; 84 136 85 - const clientState = connectedClients.get(socket); 86 - if (clientState) { 87 - const queuedCount = clientState.queue.length; 137 + for (const evt of clientState.queue) { 138 + if (socket.readyState === WebSocket.OPEN) { 139 + socket.send( 140 + JSON.stringify({ 141 + ...omit(evt, "createdAt", "record"), 142 + ...(evt.record && { 143 + record: JSON.parse(evt.record), 144 + }), 145 + }), 146 + ); 147 + } 148 + } 88 149 89 - if (queuedCount > 0) { 90 - logger.info`๐Ÿ“ฆ Sending ${queuedCount} queued events...`; 150 + clientState.queue = []; 151 + } 91 152 92 - for (const evt of clientState.queue) { 153 + clientState.isPaginating = false; 154 + logger.info`๐Ÿ”„ Now streaming real-time events...`; 155 + } 156 + } catch (error) { 157 + logger.error`Pagination error: ${error}`; 158 + if (socket.readyState === WebSocket.OPEN) { 93 159 socket.send( 94 160 JSON.stringify({ 95 - ...omit(evt, "createdAt", "record"), 96 - ...(evt.record && { 97 - record: JSON.parse(evt.record), 98 - }), 161 + type: "error", 162 + message: "Failed to load historical events", 99 163 }), 100 164 ); 101 165 } 102 166 103 - clientState.queue = []; 167 + const clientState = connectedClients.get(socket); 168 + if (clientState) { 169 + clientState.isPaginating = false; 170 + } 104 171 } 105 - 106 - clientState.isPaginating = false; 107 - } 108 - 109 - logger.info`๐Ÿ”„ Now streaming real-time events...`; 172 + })(); 110 173 }); 111 174 112 175 socket.addEventListener("message", (event) => { 176 + logger.info`๐Ÿ“จ Received message: ${event.data}`; 113 177 if (event.data === "ping") { 114 178 socket.send("pong"); 179 + logger.info`๐Ÿ“ค Sent pong`; 115 180 } 116 181 }); 117 182
+9 -4
tap/src/tap.ts
··· 15 15 const tap = new Tap(TAP_WS_URL); 16 16 const indexer = new SimpleIndexer(); 17 17 18 - // Batch buffers 19 18 let eventBatch: InsertEvent[] = []; 20 19 let batchTimer: number | null = null; 20 + let isFlushingBatch = false; 21 21 22 22 async function flushBatch() { 23 - if (eventBatch.length === 0) return; 23 + if (eventBatch.length === 0 || isFlushingBatch) return; 24 24 25 + isFlushingBatch = true; 25 26 const toInsert = [...eventBatch]; 26 27 eventBatch = []; 27 28 28 29 try { 30 + logger.info`๐Ÿ”„ Flushing batch of ${toInsert.length} events...`; 31 + 29 32 const results = await ctx.db 30 33 .insert(schema.events) 31 34 .values(toInsert) ··· 40 43 logger.info`๐Ÿ“ Batch inserted ${results.length} events`; 41 44 } catch (error) { 42 45 logger.error`Failed to insert batch: ${error}`; 46 + } finally { 47 + isFlushingBatch = false; 43 48 } 44 49 } 45 50 ··· 51 56 } 52 57 53 58 if (eventBatch.length >= BATCH_SIZE) { 54 - flushBatch(); 59 + flushBatch().catch((err) => logger.error`Flush error: ${err}`); 55 60 } else { 56 61 batchTimer = setTimeout(() => { 57 - flushBatch(); 62 + flushBatch().catch((err) => logger.error`Flush error: ${err}`); 58 63 batchTimer = null; 59 64 }, BATCH_TIMEOUT_MS); 60 65 }