A decentralized music tracking and discovery platform built on AT Protocol 馃幍 rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz
97
fork

Configure Feed

Select the types of activity you want to include in your feed.

at feat/pgpull 230 lines 6.4 kB view raw
1import type { BlobRef } from "@atproto/lexicon"; 2import { isValidHandle } from "@atproto/syntax"; 3import { equals } from "@xata.io/client"; 4import { ctx } from "context"; 5import { desc, eq } from "drizzle-orm"; 6import { Hono } from "hono"; 7import jwt from "jsonwebtoken"; 8import * as Profile from "lexicon/types/app/bsky/actor/profile"; 9import { createAgent } from "lib/agent"; 10import { env } from "lib/env"; 11import { requestCounter } from "metrics"; 12import users from "schema/users"; 13 14const app = new Hono(); 15 16app.get("/login", async (c) => { 17 requestCounter.add(1, { method: "GET", route: "/login" }); 18 const { handle, cli } = c.req.query(); 19 if (typeof handle !== "string" || !isValidHandle(handle)) { 20 c.status(400); 21 return c.text("Invalid handle"); 22 } 23 try { 24 const url = await ctx.oauthClient.authorize(handle, { 25 scope: "atproto transition:generic", 26 }); 27 if (cli) { 28 ctx.kv.set(`cli:${handle}`, "1"); 29 } 30 return c.redirect(url.toString()); 31 } catch (e) { 32 c.status(500); 33 return c.text(e.toString()); 34 } 35}); 36 37app.post("/login", async (c) => { 38 requestCounter.add(1, { method: "POST", route: "/login" }); 39 const { handle, cli } = await c.req.json(); 40 if (typeof handle !== "string" || !isValidHandle(handle)) { 41 c.status(400); 42 return c.text("Invalid handle"); 43 } 44 45 try { 46 const url = await ctx.oauthClient.authorize(handle, { 47 scope: "atproto transition:generic", 48 }); 49 50 if (cli) { 51 ctx.kv.set(`cli:${handle}`, "1"); 52 } 53 54 return c.text(url.toString()); 55 } catch (e) { 56 c.status(500); 57 return c.text(e.toString()); 58 } 59}); 60 61app.get("/oauth/callback", async (c) => { 62 requestCounter.add(1, { method: "GET", route: "/oauth/callback" }); 63 const params = new URLSearchParams(c.req.url.split("?")[1]); 64 let did, cli; 65 66 try { 67 const { session } = await ctx.oauthClient.callback(params); 68 did = session.did; 69 const handle = await ctx.resolver.resolveDidToHandle(did); 70 cli = ctx.kv.get(`cli:${handle}`); 71 ctx.kv.delete(`cli:${handle}`); 72 73 const token = jwt.sign( 74 { 75 did, 76 exp: cli 77 ? Math.floor(Date.now() / 1000) + 60 * 60 * 24 * 365 * 1000 78 : Math.floor(Date.now() / 1000) + 60 * 60 * 24 * 7, 79 }, 80 env.JWT_SECRET, 81 ); 82 ctx.kv.set(did, token); 83 } catch (err) { 84 console.error({ err }, "oauth callback failed"); 85 return c.redirect(`${env.FRONTEND_URL}?error=1`); 86 } 87 88 const spotifyUser = await ctx.client.db.spotify_accounts 89 .filter("user_id.did", equals(did)) 90 .filter("is_beta_user", equals(true)) 91 .getFirst(); 92 93 if (spotifyUser?.email) { 94 ctx.nc.publish("rocksky.spotify.user", Buffer.from(spotifyUser.email)); 95 } 96 97 if (!cli) { 98 return c.redirect(`${env.FRONTEND_URL}?did=${did}`); 99 } 100 101 return c.redirect(`${env.FRONTEND_URL}?did=${did}&cli=${cli}`); 102}); 103 104app.get("/profile", async (c) => { 105 requestCounter.add(1, { method: "GET", route: "/profile" }); 106 const bearer = (c.req.header("authorization") || "").split(" ")[1]?.trim(); 107 108 if (!bearer || bearer === "null") { 109 c.status(401); 110 return c.text("Unauthorized"); 111 } 112 113 const { did } = jwt.verify(bearer, env.JWT_SECRET, { 114 ignoreExpiration: true, 115 }); 116 117 const agent = await createAgent(ctx.oauthClient, did); 118 119 if (!agent) { 120 c.status(401); 121 return c.text("Unauthorized"); 122 } 123 124 const { data: profileRecord } = await agent.com.atproto.repo.getRecord({ 125 repo: agent.assertDid, 126 collection: "app.bsky.actor.profile", 127 rkey: "self", 128 }); 129 const handle = await ctx.resolver.resolveDidToHandle(did); 130 const profile: { handle?: string; displayName?: string; avatar?: BlobRef } = 131 Profile.isRecord(profileRecord.value) 132 ? { ...profileRecord.value, handle } 133 : {}; 134 135 if (profile.handle) { 136 try { 137 await ctx.client.db.users.create({ 138 did, 139 handle, 140 display_name: profile.displayName, 141 avatar: `https://cdn.bsky.app/img/avatar/plain/${did}/${profile.avatar.ref.toString()}@jpeg`, 142 }); 143 } catch (e) { 144 if (!e.message.includes("invalid record: column [did]: is not unique")) { 145 console.error(e.message); 146 } else { 147 await ctx.db 148 .update(users) 149 .set({ 150 handle, 151 displayName: profile.displayName, 152 avatar: `https://cdn.bsky.app/img/avatar/plain/${did}/${profile.avatar.ref.toString()}@jpeg`, 153 }) 154 .where(eq(users.did, did)) 155 .execute(); 156 } 157 } 158 159 const [user, lastUser, previousLastUser] = await Promise.all([ 160 ctx.client.db.users.select(["*"]).filter("did", equals(did)).getFirst(), 161 ctx.db 162 .select() 163 .from(users) 164 .orderBy(desc(users.createdAt)) 165 .limit(1) 166 .execute(), 167 ctx.kv.get("lastUser"), 168 ]); 169 170 ctx.nc.publish("rocksky.user", Buffer.from(JSON.stringify(user))); 171 172 await ctx.kv.set("lastUser", lastUser[0].id); 173 // if (lastUser[0].id !== previousLastUser) { 174 // ctx.nc.publish("rocksky.user", Buffer.from(JSON.stringify(user))); 175 // } 176 } 177 178 const [spotifyUser, spotifyToken, googledrive, dropbox] = await Promise.all([ 179 ctx.client.db.spotify_accounts 180 .select(["user_id.*", "email", "is_beta_user"]) 181 .filter("user_id.did", equals(did)) 182 .getFirst(), 183 ctx.client.db.spotify_tokens.filter("user_id.did", equals(did)).getFirst(), 184 ctx.client.db.google_drive_accounts 185 .select(["user_id.*", "email", "is_beta_user"]) 186 .filter("user_id.did", equals(did)) 187 .getFirst(), 188 ctx.client.db.dropbox_accounts 189 .select(["user_id.*", "email", "is_beta_user"]) 190 .filter("user_id.did", equals(did)) 191 .getFirst(), 192 ]); 193 194 return c.json({ 195 ...profile, 196 spotifyUser, 197 spotifyConnected: !!spotifyToken, 198 googledrive, 199 dropbox, 200 did, 201 }); 202}); 203 204app.get("/client-metadata.json", async (c) => { 205 requestCounter.add(1, { method: "GET", route: "/client-metadata.json" }); 206 return c.json(ctx.oauthClient.clientMetadata); 207}); 208 209app.get("/token", async (c) => { 210 requestCounter.add(1, { method: "GET", route: "/token" }); 211 const did = c.req.header("session-did"); 212 213 if (typeof did !== "string" || !did || did === "null") { 214 c.status(401); 215 return c.text("Unauthorized"); 216 } 217 218 const token = ctx.kv.get(did); 219 220 if (!token) { 221 c.status(401); 222 return c.text("Unauthorized"); 223 } 224 225 ctx.kv.delete(did); 226 227 return c.json({ token }); 228}); 229 230export default app;