A decentralized music tracking and discovery platform built on AT Protocol 馃幍 rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz
98
fork

Configure Feed

Select the types of activity you want to include in your feed.

at feat/pgpull 153 lines 4.5 kB view raw
1import { equals } from "@xata.io/client"; 2import chalk from "chalk"; 3import { ctx } from "context"; 4import { createHash } from "node:crypto"; 5import { publishScrobble } from "nowplaying/nowplaying.service"; 6 7const args = process.argv.slice(2); 8 9async function updateUris(did: string) { 10 const { records } = await ctx.client.db.scrobbles 11 .select(["track_id.*", "user_id.*"]) 12 .filter({ 13 $any: [{ "user_id.did": did }, { "user_id.handle": did }], 14 }) 15 .getPaginated({ 16 pagination: { 17 size: process.env.SYNC_SIZE ? parseInt(process.env.SYNC_SIZE, 10) : 20, 18 }, 19 sort: [{ xata_createdat: "desc" }], 20 }); 21 for (const { track_id: track } of records) { 22 const existingTrack = await ctx.client.db.tracks 23 .filter( 24 "sha256", 25 equals( 26 createHash("sha256") 27 .update( 28 `${track.title} - ${track.artist} - ${track.album}`.toLowerCase() 29 ) 30 .digest("hex") 31 ) 32 ) 33 .getFirst(); 34 35 if (existingTrack && !existingTrack.album_uri) { 36 console.log(`Updating album uri for ${chalk.cyan(track.xata_id)} ...`); 37 const album = await ctx.client.db.albums 38 .filter( 39 "sha256", 40 equals( 41 createHash("sha256") 42 .update(`${track.album} - ${track.album_artist}`.toLowerCase()) 43 .digest("hex") 44 ) 45 ) 46 .getFirst(); 47 if (album) { 48 await ctx.client.db.tracks.update(existingTrack.xata_id, { 49 album_uri: album.uri, 50 }); 51 } 52 } 53 54 if (existingTrack && !existingTrack.artist_uri) { 55 console.log(`Updating artist uri for ${chalk.cyan(track.xata_id)} ...`); 56 const artist = await ctx.client.db.artists 57 .filter( 58 "sha256", 59 equals( 60 createHash("sha256") 61 .update(track.album_artist.toLowerCase()) 62 .digest("hex") 63 ) 64 ) 65 .getFirst(); 66 if (artist) { 67 await ctx.client.db.tracks.update(existingTrack.xata_id, { 68 artist_uri: artist.uri, 69 }); 70 } 71 } 72 73 const album = await ctx.client.db.albums 74 .filter( 75 "sha256", 76 equals( 77 createHash("sha256") 78 .update(`${track.album} - ${track.album_artist}`.toLowerCase()) 79 .digest("hex") 80 ) 81 ) 82 .getFirst(); 83 84 if (existingTrack && !album.artist_uri) { 85 console.log(`Updating artist uri for ${chalk.cyan(album.xata_id)} ...`); 86 const artist = await ctx.client.db.artists 87 .filter( 88 "sha256", 89 equals( 90 createHash("sha256") 91 .update(track.album_artist.toLowerCase()) 92 .digest("hex") 93 ) 94 ) 95 .getFirst(); 96 if (artist) { 97 await ctx.client.db.albums.update(album.xata_id, { 98 artist_uri: artist.uri, 99 }); 100 } 101 } 102 } 103} 104 105if (args.includes("--background")) { 106 console.log("Wait for new scrobbles to sync ..."); 107 const sub = ctx.nc.subscribe("rocksky.user.scrobble.sync"); 108 for await (const m of sub) { 109 const did = new TextDecoder().decode(m.data); 110 // wait for 15 seconds to ensure the scrobble is fully created 111 await new Promise((resolve) => setTimeout(resolve, 15000)); 112 console.log(`Syncing scrobbles ${chalk.magenta(did)} ...`); 113 await updateUris(did); 114 const { records } = await ctx.client.db.scrobbles 115 .filter({ 116 $any: [{ "user_id.did": did }, { "user_id.handle": did }], 117 }) 118 .getPaginated({ 119 pagination: { 120 size: 5, 121 }, 122 sort: [{ xata_createdat: "desc" }], 123 }); 124 for (const scrobble of records) { 125 console.log(`Syncing scrobble ${chalk.cyan(scrobble.xata_id)} ...`); 126 await publishScrobble(ctx, scrobble.xata_id); 127 } 128 } 129 process.exit(0); 130} 131 132for (const arg of args) { 133 console.log(`Syncing scrobbles ${chalk.magenta(arg)} ...`); 134 await updateUris(arg); 135 136 const { records } = await ctx.client.db.scrobbles 137 .filter({ 138 $any: [{ "user_id.did": arg }, { "user_id.handle": arg }], 139 }) 140 .getPaginated({ 141 pagination: { 142 size: process.env.SYNC_SIZE ? parseInt(process.env.SYNC_SIZE) : 20, 143 }, 144 sort: [{ xata_createdat: "desc" }], 145 }); 146 for (const scrobble of records) { 147 console.log(`Syncing scrobble ${chalk.cyan(scrobble.xata_id)} ...`); 148 await publishScrobble(ctx, scrobble.xata_id); 149 } 150 console.log(`Synced ${chalk.greenBright(records.length)} scrobbles`); 151} 152 153process.exit(0);