A decentralized music tracking and discovery platform built on AT Protocol 馃幍 rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz
96
fork

Configure Feed

Select the types of activity you want to include in your feed.

at feat/pgpull 541 lines 14 kB view raw
1import { serve } from "@hono/node-server"; 2import { createNodeWebSocket } from "@hono/node-ws"; 3import { trace } from "@opentelemetry/api"; 4import { equals } from "@xata.io/client"; 5import { ctx } from "context"; 6import { Hono } from "hono"; 7import { cors } from "hono/cors"; 8import jwt from "jsonwebtoken"; 9import { createAgent } from "lib/agent"; 10import { 11 getLovedTracks, 12 likeTrack, 13 unLikeTrack, 14} from "lovedtracks/lovedtracks.service"; 15import { scrobbleTrack } from "nowplaying/nowplaying.service"; 16import { rateLimiter } from "ratelimiter"; 17import subscribe from "subscribers"; 18import { saveTrack } from "tracks/tracks.service"; 19import { trackSchema } from "types/track"; 20import handleWebsocket from "websocket/handler"; 21import apikeys from "./apikeys/app"; 22import bsky from "./bsky/app"; 23import dropbox from "./dropbox/app"; 24import googledrive from "./googledrive/app"; 25import { env } from "./lib/env"; 26import { requestCounter, requestDuration } from "./metrics"; 27import "./profiling"; 28import search from "./search/app"; 29import spotify from "./spotify/app"; 30import "./tracing"; 31import users from "./users/app"; 32import webscrobbler from "./webscrobbler/app"; 33 34subscribe(ctx); 35 36const app = new Hono(); 37const { injectWebSocket, upgradeWebSocket } = createNodeWebSocket({ app }); 38 39app.use( 40 "*", 41 rateLimiter({ 42 limit: 1000, 43 window: 30, // 馃憟 30 seconds 44 }), 45); 46 47app.use("*", async (c, next) => { 48 const span = trace.getActiveSpan(); 49 span?.setAttribute("http.route", c.req.path); 50 await next(); 51}); 52 53app.use("*", async (c, next) => { 54 const start = Date.now(); 55 await next(); 56 const duration = (Date.now() - start) / 1000; 57 requestDuration.record(duration, { 58 route: c.req.path, 59 method: c.req.method, 60 }); 61}); 62 63app.use(cors()); 64 65app.route("/", bsky); 66 67app.route("/spotify", spotify); 68 69app.route("/dropbox", dropbox); 70 71app.route("/googledrive", googledrive); 72 73app.route("/apikeys", apikeys); 74 75app.get("/ws", upgradeWebSocket(handleWebsocket)); 76 77app.get("/", async (c) => { 78 return c.json({ status: "ok" }); 79}); 80 81app.post("/now-playing", async (c) => { 82 requestCounter.add(1, { method: "POST", route: "/now-playing" }); 83 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 84 85 if (!bearer || bearer === "null") { 86 c.status(401); 87 return c.text("Unauthorized"); 88 } 89 90 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 91 ignoreExpiration: true, 92 }); 93 94 const user = await ctx.client.db.users.filter("did", equals(did)).getFirst(); 95 if (!user) { 96 c.status(401); 97 return c.text("Unauthorized"); 98 } 99 100 const body = await c.req.json(); 101 const parsed = trackSchema.safeParse(body); 102 103 if (parsed.error) { 104 c.status(400); 105 return c.text("Invalid track data: " + parsed.error.message); 106 } 107 const track = parsed.data; 108 109 const agent = await createAgent(ctx.oauthClient, did); 110 if (!agent) { 111 c.status(401); 112 return c.text("Unauthorized"); 113 } 114 115 await scrobbleTrack(ctx, track, agent, user.did); 116 117 return c.json({ status: "ok" }); 118}); 119 120app.get("/now-playing", async (c) => { 121 requestCounter.add(1, { method: "GET", route: "/now-playing" }); 122 123 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 124 125 const payload = 126 bearer && bearer !== "null" 127 ? jwt.verify(bearer, env.JWT_SECRET, { ignoreExpiration: true }) 128 : {}; 129 const did = c.req.query("did") || payload.did; 130 131 if (!did) { 132 c.status(401); 133 return c.text("Unauthorized"); 134 } 135 136 const user = await ctx.client.db.users 137 .filter({ 138 $any: [{ did }, { handle: did }], 139 }) 140 .getFirst(); 141 142 if (!user) { 143 c.status(401); 144 return c.text("Unauthorized"); 145 } 146 147 const [nowPlaying, status] = await Promise.all([ 148 ctx.redis.get(`nowplaying:${user.did}`), 149 ctx.redis.get(`nowplaying:${user.did}:status`), 150 ]); 151 return c.json( 152 nowPlaying ? { ...JSON.parse(nowPlaying), is_playing: status === "1" } : {}, 153 ); 154}); 155 156app.get("/now-playings", async (c) => { 157 requestCounter.add(1, { method: "GET", route: "/now-playings" }); 158 const size = +c.req.query("size") || 10; 159 const offset = +c.req.query("offset") || 0; 160 const { data } = await ctx.analytics.post("library.getDistinctScrobbles", { 161 pagination: { 162 skip: offset, 163 take: size, 164 }, 165 }); 166 return c.json(data); 167}); 168 169app.post("/likes", async (c) => { 170 requestCounter.add(1, { method: "POST", route: "/likes" }); 171 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 172 173 if (!bearer || bearer === "null") { 174 c.status(401); 175 return c.text("Unauthorized"); 176 } 177 178 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 179 ignoreExpiration: true, 180 }); 181 const agent = await createAgent(ctx.oauthClient, did); 182 183 const user = await ctx.client.db.users.filter("did", equals(did)).getFirst(); 184 if (!user) { 185 c.status(401); 186 return c.text("Unauthorized"); 187 } 188 189 const body = await c.req.json(); 190 const parsed = trackSchema.safeParse(body); 191 192 if (parsed.error) { 193 c.status(400); 194 return c.text("Invalid track data: " + parsed.error.message); 195 } 196 const track = parsed.data; 197 await likeTrack(ctx, track, user, agent); 198 199 return c.json({ status: "ok" }); 200}); 201 202app.delete("/likes/:sha256", async (c) => { 203 requestCounter.add(1, { method: "DELETE", route: "/likes/:sha256" }); 204 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 205 206 if (!bearer || bearer === "null") { 207 c.status(401); 208 return c.text("Unauthorized"); 209 } 210 211 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 212 ignoreExpiration: true, 213 }); 214 const agent = await createAgent(ctx.oauthClient, did); 215 216 const user = await ctx.client.db.users.filter("did", equals(did)).getFirst(); 217 if (!user) { 218 c.status(401); 219 return c.text("Unauthorized"); 220 } 221 222 const sha256 = c.req.param("sha256"); 223 await unLikeTrack(ctx, sha256, user, agent); 224 return c.json({ status: "ok" }); 225}); 226 227app.get("/likes", async (c) => { 228 requestCounter.add(1, { method: "GET", route: "/likes" }); 229 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 230 231 if (!bearer || bearer === "null") { 232 c.status(401); 233 return c.text("Unauthorized"); 234 } 235 236 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 237 ignoreExpiration: true, 238 }); 239 240 const user = await ctx.client.db.users.filter("did", equals(did)).getFirst(); 241 if (!user) { 242 c.status(401); 243 return c.text("Unauthorized"); 244 } 245 246 const size = +c.req.query("size") || 10; 247 const offset = +c.req.query("offset") || 0; 248 249 const lovedTracks = await getLovedTracks(ctx, user, size, offset); 250 return c.json(lovedTracks); 251}); 252 253app.get("/public/scrobbles", async (c) => { 254 requestCounter.add(1, { method: "GET", route: "/public/scrobbles" }); 255 256 const size = +c.req.query("size") || 10; 257 const offset = +c.req.query("offset") || 0; 258 259 const scrobbles = await ctx.client.db.scrobbles 260 .select(["track_id.*", "user_id.*", "timestamp", "xata_createdat", "uri"]) 261 .sort("timestamp", "desc") 262 .getPaginated({ 263 pagination: { 264 size, 265 offset, 266 }, 267 }); 268 269 return c.json( 270 scrobbles.records.map((item) => ({ 271 cover: item.track_id.album_art, 272 artist: item.track_id.artist, 273 title: item.track_id.title, 274 date: item.timestamp, 275 user: item.user_id.handle, 276 uri: item.uri, 277 albumUri: item.track_id.album_uri, 278 artistUri: item.track_id.artist_uri, 279 tags: [], 280 listeners: 1, 281 sha256: item.track_id.sha256, 282 id: item.xata_id, 283 })), 284 ); 285}); 286 287app.get("/public/scrobbleschart", async (c) => { 288 requestCounter.add(1, { method: "GET", route: "/public/scrobbleschart" }); 289 290 const did = c.req.query("did"); 291 const artisturi = c.req.query("artisturi"); 292 const albumuri = c.req.query("albumuri"); 293 const songuri = c.req.query("songuri"); 294 295 if (did) { 296 const chart = await ctx.analytics.post("library.getScrobblesPerDay", { 297 user_did: did, 298 }); 299 return c.json(chart.data); 300 } 301 302 if (artisturi) { 303 const chart = await ctx.analytics.post("library.getArtistScrobbles", { 304 artist_id: artisturi, 305 }); 306 return c.json(chart.data); 307 } 308 309 if (albumuri) { 310 const chart = await ctx.analytics.post("library.getAlbumScrobbles", { 311 album_id: albumuri, 312 }); 313 return c.json(chart.data); 314 } 315 316 if (songuri) { 317 let uri = songuri; 318 if (songuri.includes("app.rocksky.scrobble")) { 319 const scrobble = await ctx.client.db.scrobbles 320 .select(["track_id.*", "uri"]) 321 .filter("uri", equals(songuri)) 322 .getFirst(); 323 324 uri = scrobble.track_id.uri; 325 } 326 const chart = await ctx.analytics.post("library.getTrackScrobbles", { 327 track_id: uri, 328 }); 329 return c.json(chart.data); 330 } 331 332 const chart = await ctx.analytics.post("library.getScrobblesPerDay", {}); 333 return c.json(chart.data); 334}); 335 336app.get("/scrobbles", async (c) => { 337 requestCounter.add(1, { method: "GET", route: "/scrobbles" }); 338 339 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 340 341 if (!bearer || bearer === "null") { 342 c.status(401); 343 return c.text("Unauthorized"); 344 } 345 346 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 347 ignoreExpiration: true, 348 }); 349 350 const user = await ctx.client.db.users.filter("did", equals(did)).getFirst(); 351 if (!user) { 352 c.status(401); 353 return c.text("Unauthorized"); 354 } 355 356 const size = +c.req.query("size") || 10; 357 const offset = +c.req.query("offset") || 0; 358 359 const scrobbles = await ctx.client.db.scrobbles 360 .select(["track_id.*", "uri"]) 361 .filter("user_id", equals(user.xata_id)) 362 .filter({ 363 $not: [ 364 { 365 uri: null, 366 }, 367 ], 368 }) 369 .sort("xata_createdat", "desc") 370 .getPaginated({ 371 pagination: { 372 size, 373 offset, 374 }, 375 }); 376 377 return c.json(scrobbles.records); 378}); 379 380app.post("/tracks", async (c) => { 381 requestCounter.add(1, { method: "POST", route: "/tracks" }); 382 383 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 384 385 if (!bearer || bearer === "null") { 386 c.status(401); 387 return c.text("Unauthorized"); 388 } 389 390 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 391 ignoreExpiration: true, 392 }); 393 394 const user = await ctx.client.db.users.filter("did", equals(did)).getFirst(); 395 if (!user) { 396 c.status(401); 397 return c.text("Unauthorized"); 398 } 399 400 const body = await c.req.json(); 401 const parsed = trackSchema.safeParse(body); 402 403 if (parsed.error) { 404 c.status(400); 405 return c.text("Invalid track data: " + parsed.error.message); 406 } 407 408 const track = parsed.data; 409 410 const agent = await createAgent(ctx.oauthClient, did); 411 if (!agent) { 412 c.status(401); 413 return c.text("Unauthorized"); 414 } 415 416 try { 417 await saveTrack(ctx, track, agent); 418 } catch (e) { 419 if (!e.message.includes("invalid record: column [sha256]: is not unique")) { 420 console.error("[spotify user]", e.message); 421 } 422 } 423 424 return c.json({ status: "ok" }); 425}); 426 427app.get("/tracks", async (c) => { 428 requestCounter.add(1, { method: "GET", route: "/tracks" }); 429 430 const size = +c.req.query("size") || 100; 431 const offset = +c.req.query("offset") || 0; 432 433 const tracks = await ctx.analytics.post("library.getTracks", { 434 pagination: { 435 skip: offset, 436 take: size, 437 }, 438 }); 439 440 return c.json(tracks.data); 441}); 442 443app.get("/albums", async (c) => { 444 requestCounter.add(1, { method: "GET", route: "/albums" }); 445 446 const size = +c.req.query("size") || 100; 447 const offset = +c.req.query("offset") || 0; 448 449 const albums = await ctx.analytics.post("library.getAlbums", { 450 pagination: { 451 skip: offset, 452 take: size, 453 }, 454 }); 455 456 return c.json(albums.data); 457}); 458 459app.get("/artists", async (c) => { 460 requestCounter.add(1, { method: "GET", route: "/artists" }); 461 462 const size = +c.req.query("size") || 100; 463 const offset = +c.req.query("offset") || 0; 464 465 const artists = await ctx.analytics.post("library.getArtists", { 466 pagination: { 467 skip: offset, 468 take: size, 469 }, 470 }); 471 472 return c.json(artists.data); 473}); 474 475app.get("/tracks/:sha256", async (c) => { 476 requestCounter.add(1, { method: "GET", route: "/tracks/:sha256" }); 477 478 const sha256 = c.req.param("sha256"); 479 const track = await ctx.client.db.tracks 480 .filter("sha256", equals(sha256)) 481 .getFirst(); 482 return c.json(track); 483}); 484 485app.get("/albums/:sha256", async (c) => { 486 requestCounter.add(1, { method: "GET", route: "/albums/:sha256" }); 487 488 const sha256 = c.req.param("sha256"); 489 const album = await ctx.client.db.albums 490 .filter("sha256", equals(sha256)) 491 .getFirst(); 492 493 return c.json(album); 494}); 495 496app.get("/artists/:sha256", async (c) => { 497 requestCounter.add(1, { method: "GET", route: "/artists/:sha256" }); 498 499 const sha256 = c.req.param("sha256"); 500 const artist = await ctx.client.db.artists 501 .filter("sha256", equals(sha256)) 502 .getFirst(); 503 504 return c.json(artist); 505}); 506 507app.get("/artists/:sha256/tracks", async (c) => { 508 requestCounter.add(1, { method: "GET", route: "/artists/:sha256/tracks" }); 509 const sha256 = c.req.param("sha256"); 510 511 const tracks = await ctx.client.db.artist_tracks 512 .select(["track_id.*"]) 513 .filter("artist_id.sha256", equals(sha256)) 514 .getAll(); 515 516 return c.json(tracks); 517}); 518 519app.get("/albums/:sha256/tracks", async (c) => { 520 requestCounter.add(1, { method: "GET", route: "/albums/:sha256/tracks" }); 521 const sha256 = c.req.param("sha256"); 522 const tracks = await ctx.client.db.album_tracks 523 .select(["track_id.*"]) 524 .filter("album_id.sha256", equals(sha256)) 525 .getAll(); 526 527 return c.json(tracks); 528}); 529 530app.route("/users", users); 531 532app.route("/search", search); 533 534app.route("/webscrobbler", webscrobbler); 535 536const server = serve({ 537 fetch: app.fetch, 538 port: 8000, 539}); 540 541injectWebSocket(server);