A decentralized music tracking and discovery platform built on AT Protocol 馃幍 rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz
96
fork

Configure Feed

Select the types of activity you want to include in your feed.

at feat/pgpull 276 lines 9.7 kB view raw
1import chalk from "chalk"; 2import { ctx } from "context"; 3import { and, eq } from "drizzle-orm"; 4import type { Context } from "hono"; 5import jwt from "jsonwebtoken"; 6import { env } from "lib/env"; 7import { createHash } from "node:crypto"; 8import lovedTracks from "schema/loved-tracks"; 9import tracks from "schema/tracks"; 10import users from "schema/users"; 11import { v4 as uuidv4 } from "uuid"; 12import z from "zod"; 13 14// Define the schema for the incoming message 15const ControlMessageSchema = z.object({ 16 type: z.string(), 17 target: z.string().optional(), 18 action: z.string(), 19 args: z.any().optional(), 20 token: z.string(), 21}); 22 23type ControlMessage = z.infer<typeof ControlMessageSchema>; 24 25const RegisterDeviceSchema = z.object({ 26 type: z.literal("register"), 27 clientName: z.string(), 28 token: z.string(), 29}); 30 31type RegisterDeviceMessage = z.infer<typeof RegisterDeviceSchema>; 32 33const MessageSchema = z.object({ 34 type: z.literal("message"), 35 data: z.any(), 36 device_id: z.string(), 37 token: z.string(), 38}); 39 40type Message = z.infer<typeof MessageSchema>; 41 42const devices: Record<string, WebSocket> = {}; 43const deviceNames: Record<string, string> = {}; 44const userDevices: Record<string, string[]> = {}; 45 46function handleWebsocket(c: Context) { 47 return { 48 async onMessage(event, ws) { 49 try { 50 if (event.data === "ping") { 51 ws.send("pong"); 52 return; 53 } 54 const message = JSON.parse(event.data); 55 const controlMessage = ControlMessageSchema.safeParse(message); 56 const registerMessage = RegisterDeviceSchema.safeParse(message); 57 const deviceMessage = MessageSchema.safeParse(message); 58 59 if (deviceMessage.success) { 60 const { data, device_id, token } = deviceMessage.data; 61 const { did } = jwt.verify(token, env.JWT_SECRET, { 62 ignoreExpiration: true, 63 }); 64 // broadcast to all devices 65 userDevices[did].forEach(async (id) => { 66 const targetDevice = devices[id]; 67 if (targetDevice) { 68 // check if message is a track or a status 69 // otherwise, it's a status 70 if (data.type === "track") { 71 const sha256 = createHash("sha256") 72 .update( 73 `${data.title} - ${data.artist} - ${data.album}`.toLowerCase(), 74 ) 75 .digest("hex"); 76 const [cachedTrack, cachedLikes] = await Promise.all([ 77 ctx.redis.get(`track:${sha256}`), 78 ctx.redis.get(`likes:${did}:${sha256}`), 79 ]); 80 81 if (cachedLikes) { 82 const cachedData = JSON.parse(cachedLikes); 83 data.liked = cachedData.liked; 84 } else { 85 const [likes] = await ctx.db 86 .select() 87 .from(lovedTracks) 88 .leftJoin(tracks, eq(lovedTracks.trackId, tracks.id)) 89 .leftJoin(users, eq(lovedTracks.userId, users.id)) 90 .where(and(eq(users.did, did), eq(tracks.sha256, sha256))) 91 .execute(); 92 data.liked = likes ? true : false; 93 await ctx.redis.setEx( 94 `likes:${did}:${sha256}`, 95 2, 96 JSON.stringify({ liked: data.liked }), 97 ); 98 } 99 100 // Check if the track is cached, 101 // if not, fetch it from the database 102 // and cache it for 10 seconds 103 if (cachedTrack) { 104 const cachedData = JSON.parse(cachedTrack); 105 data.album_art = cachedData.albumArt; 106 data.song_uri = cachedData.uri; 107 data.album_uri = cachedData.albumUri; 108 data.artist_uri = cachedData.artistUri; 109 await ctx.redis.setEx( 110 `nowplaying:${did}`, 111 3, 112 JSON.stringify({ 113 ...data, 114 sha256, 115 liked: data.liked, 116 }), 117 ); 118 } else { 119 const [track] = await ctx.db 120 .select() 121 .from(tracks) 122 .where(eq(tracks.sha256, sha256)) 123 .execute(); 124 if (track) { 125 data.album_art = track.albumArt; 126 data.song_uri = track.uri; 127 data.album_uri = track.albumUri; 128 data.artist_uri = track.artistUri; 129 await Promise.all([ 130 ctx.redis.setEx( 131 `track:${sha256}`, 132 10, 133 JSON.stringify({ 134 albumArt: track.albumArt, 135 uri: track.uri, 136 albumUri: track.albumUri, 137 artistUri: track.artistUri, 138 liked: data.liked, 139 }), 140 ), 141 ctx.redis.setEx( 142 `nowplaying:${did}`, 143 3, 144 JSON.stringify({ 145 ...data, 146 sha256, 147 liked: data.liked, 148 }), 149 ), 150 ]); 151 } 152 } 153 } else { 154 await ctx.redis.setEx( 155 `nowplaying:${did}:status`, 156 3, 157 `${data.status}`, 158 ); 159 } 160 161 targetDevice.send( 162 JSON.stringify({ 163 type: "message", 164 data, 165 device_id, 166 }), 167 ); 168 } 169 }); 170 } 171 172 if (controlMessage.success) { 173 const { type, target, action, args, token } = controlMessage.data; 174 const { did } = jwt.verify(token, env.JWT_SECRET, { 175 ignoreExpiration: true, 176 }); 177 console.log( 178 `Control message: ${chalk.greenBright(type)}, ${chalk.greenBright(target)}, ${chalk.greenBright(action)}, ${chalk.greenBright(args)}, ${chalk.greenBright("***")}`, 179 ); 180 // Handle control message 181 const deviceId = userDevices[did]?.find((id) => id === target); 182 if (deviceId) { 183 const targetDevice = devices[deviceId]; 184 if (targetDevice) { 185 targetDevice.send(JSON.stringify({ type, action, args })); 186 console.log( 187 `Control message sent to device: ${chalk.greenBright(deviceId)}, ${chalk.greenBright(target)}`, 188 ); 189 return; 190 } 191 console.error(`Device not found: ${target}`); 192 return; 193 } 194 userDevices[did]?.forEach((id) => { 195 const targetDevice = devices[id]; 196 if (targetDevice) { 197 targetDevice.send(JSON.stringify({ type, action, args })); 198 console.log( 199 `Control message sent to all devices: ${chalk.greenBright(id)}, ${chalk.greenBright(target)}`, 200 ); 201 } 202 }); 203 204 console.error(`Device ID not found for target: ${target}`); 205 return; 206 } 207 208 if (registerMessage.success) { 209 const { type, clientName, token } = registerMessage.data; 210 console.log( 211 `Register message: ${chalk.greenBright(type)}, ${chalk.greenBright(clientName)}, ${chalk.greenBright("****")}`, 212 ); 213 // Handle register Message 214 const { did } = jwt.verify(token, env.JWT_SECRET, { 215 ignoreExpiration: true, 216 }); 217 const deviceId = uuidv4(); 218 ws.deviceId = deviceId; 219 ws.did = did; 220 devices[deviceId] = ws; 221 deviceNames[deviceId] = clientName; 222 userDevices[did] = [...(userDevices[did] || []), deviceId]; 223 console.log( 224 `Device registered: ${chalk.greenBright(deviceId)}, ${chalk.greenBright(clientName)}`, 225 ); 226 227 // broadcast to all devices 228 userDevices[did] 229 .filter((id) => id !== deviceId) 230 .forEach((id) => { 231 const targetDevice = devices[id]; 232 if (targetDevice) { 233 targetDevice.send( 234 JSON.stringify({ 235 type: "device_registered", 236 deviceId, 237 clientName, 238 }), 239 ); 240 } 241 }); 242 243 ws.send(JSON.stringify({ status: "registered", deviceId })); 244 return; 245 } 246 } catch (e) { 247 console.error("Error parsing message:", e); 248 } 249 }, 250 onClose: (_, ws) => { 251 console.log("Connection closed"); 252 // remove device from devices 253 const deviceId = ws.deviceId; 254 const did = ws.did; 255 if (deviceId && devices[deviceId]) { 256 delete devices[deviceId]; 257 console.log(`Device removed: ${chalk.redBright(deviceId)}`); 258 } 259 if (did && userDevices[did]) { 260 userDevices[did] = userDevices[did].filter((id) => id !== deviceId); 261 if (userDevices[did].length === 0) { 262 delete userDevices[did]; 263 } 264 } 265 if (deviceId && deviceNames[deviceId]) { 266 const clientName = deviceNames[deviceId]; 267 delete deviceNames[deviceId]; 268 console.log( 269 `Device name removed: ${chalk.redBright(deviceId)}, ${chalk.redBright(clientName)}`, 270 ); 271 } 272 }, 273 }; 274} 275 276export default handleWebsocket;