A decentralized music tracking and discovery platform built on AT Protocol 🎵 rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz
97
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix collections script

+426 -30
+426 -30
apps/api/src/scripts/collections.ts
··· 3 3 import { consola } from "consola"; 4 4 import { ctx } from "context"; 5 5 import extractPdsFromDid from "lib/extractPdsFromDid"; 6 - import { chalk } from "zx/core"; 6 + import chalk from "chalk"; 7 + import * as Song from "lexicon/types/app/rocksky/song"; 8 + import * as Artist from "lexicon/types/app/rocksky/artist"; 9 + import * as Album from "lexicon/types/app/rocksky/album"; 10 + import * as Scrobble from "lexicon/types/app/rocksky/scrobble"; 11 + import schema from "schema"; 12 + import { and, eq, or } from "drizzle-orm"; 13 + import { createHash } from "node:crypto"; 14 + import { publishScrobble } from "nowplaying/nowplaying.service"; 7 15 8 16 const args = process.argv.slice(2); 9 17 ··· 19 27 did = await ctx.baseIdResolver.handle.resolve(did); 20 28 } 21 29 30 + const [user] = await ctx.db 31 + .select() 32 + .from(schema.users) 33 + .where(eq(schema.users.did, did)) 34 + .execute(); 35 + if (!user) { 36 + consola.error(`User with DID ${chalk.cyan(did)} not found in database.`); 37 + process.exit(1); 38 + } 39 + 22 40 async function getAtpAgent(did: string): Promise<AtpAgent> { 23 41 const serviceEndpoint = await extractPdsFromDid(did); 24 42 ··· 28 46 } 29 47 30 48 async function getScrobbleRecords(agent: AtpAgent, did: string) { 31 - const res = await agent.com.atproto.repo.listRecords({ 32 - repo: did, 33 - collection: "app.rocksky.scrobble", 34 - limit: 100, 35 - }); 49 + const records = []; 50 + let cursor: string | undefined = undefined; 51 + 52 + do { 53 + const res = await agent.com.atproto.repo.listRecords({ 54 + repo: did, 55 + collection: "app.rocksky.scrobble", 56 + limit: 100, 57 + cursor, 58 + }); 59 + records.push(...res.data.records); 60 + cursor = res.data.cursor; 61 + consola.info( 62 + `Fetched ${chalk.greenBright(records.length)} scrobble records so far...`, 63 + ); 64 + } while (cursor); 36 65 37 - return res.data.records; 66 + return records; 38 67 } 39 68 40 69 async function getSongRecords(agent: AtpAgent, did: string) { 41 - const res = await agent.com.atproto.repo.listRecords({ 42 - repo: did, 43 - collection: "app.rocksky.song", 44 - limit: 100, 45 - }); 70 + const records = []; 71 + let cursor: string | undefined = undefined; 46 72 47 - return res.data.records; 73 + do { 74 + const res = await agent.com.atproto.repo.listRecords({ 75 + repo: did, 76 + collection: "app.rocksky.song", 77 + limit: 100, 78 + cursor, 79 + }); 80 + records.push(...res.data.records); 81 + cursor = res.data.cursor; 82 + consola.info( 83 + `Fetched ${chalk.greenBright(records.length)} song records so far...`, 84 + ); 85 + } while (cursor); 86 + 87 + return records; 48 88 } 49 89 50 90 async function getArtistRecords(agent: AtpAgent, did: string) { 51 - const res = await agent.com.atproto.repo.listRecords({ 52 - repo: did, 53 - collection: "app.rocksky.artist", 54 - limit: 100, 55 - }); 91 + const records = []; 92 + let cursor: string | undefined = undefined; 93 + 94 + do { 95 + const res = await agent.com.atproto.repo.listRecords({ 96 + repo: did, 97 + collection: "app.rocksky.artist", 98 + limit: 100, 99 + cursor, 100 + }); 101 + records.push(...res.data.records); 102 + cursor = res.data.cursor; 103 + consola.info( 104 + `Fetched ${chalk.greenBright(records.length)} artist records so far...`, 105 + ); 106 + } while (cursor); 56 107 57 - return res.data.records; 108 + return records; 58 109 } 59 110 60 111 async function getAlbumRecords(agent: AtpAgent, did: string) { 61 - const res = await agent.com.atproto.repo.listRecords({ 62 - repo: did, 63 - collection: "app.rocksky.album", 64 - limit: 100, 65 - }); 112 + const records = []; 113 + let cursor: string | undefined = undefined; 66 114 67 - return res.data.records; 115 + do { 116 + const res = await agent.com.atproto.repo.listRecords({ 117 + repo: did, 118 + collection: "app.rocksky.album", 119 + limit: 100, 120 + cursor, 121 + }); 122 + records.push(...res.data.records); 123 + cursor = res.data.cursor; 124 + consola.info( 125 + `Fetched ${chalk.greenBright(records.length)} album records so far...`, 126 + ); 127 + } while (cursor); 128 + 129 + return records; 68 130 } 69 131 70 - async function insertScrobbles(scrobbles: Record[]) {} 132 + async function insertScrobbles(scrobbles: Record[]) { 133 + await Promise.all( 134 + scrobbles.map(async (scrobble) => { 135 + const value: Scrobble.Record = scrobble.value as Scrobble.Record; 136 + consola.info( 137 + `Inserting scrobble: ${chalk.greenBright(value.title)} ${chalk.cyan(scrobble.uri)}`, 138 + ); 139 + const trackSha256 = createHash("sha256") 140 + .update( 141 + `${value.title} - ${value.artist} - ${value.album}`.toLowerCase(), 142 + ) 143 + .digest("hex"); 144 + const albumSha256 = createHash("sha256") 145 + .update(`${value.album} - ${value.albumArtist}`.toLowerCase()) 146 + .digest("hex"); 147 + const artistSha256 = createHash("sha256") 148 + .update(value.albumArtist.toLowerCase()) 149 + .digest("hex"); 150 + 151 + const [[track], [album], [artist]] = await Promise.all([ 152 + ctx.db 153 + .select() 154 + .from(schema.tracks) 155 + .where( 156 + value.mbid 157 + ? or( 158 + eq(schema.tracks.mbId, value.mbid), 159 + eq(schema.tracks.sha256, trackSha256), 160 + ) 161 + : eq(schema.tracks.sha256, trackSha256), 162 + ) 163 + .limit(1) 164 + .execute(), 165 + ctx.db 166 + .select() 167 + .from(schema.albums) 168 + .where(eq(schema.albums.sha256, albumSha256)) 169 + .limit(1) 170 + .execute(), 171 + ctx.db 172 + .select() 173 + .from(schema.artists) 174 + .where(eq(schema.artists.sha256, artistSha256)) 175 + .limit(1) 176 + .execute(), 177 + ]); 178 + let [newScroble] = await ctx.db 179 + .insert(schema.scrobbles) 180 + .values({ 181 + albumId: album.id, 182 + trackId: track.id, 183 + artistId: artist.id, 184 + uri: scrobble.uri, 185 + userId: user.id, 186 + timestamp: new Date(value.createdAt), 187 + createdAt: new Date(value.createdAt), 188 + }) 189 + .onConflictDoNothing() 190 + .returning() 191 + .execute(); 192 + 193 + try { 194 + if (!newScroble) { 195 + [newScroble] = await ctx.db 196 + .select() 197 + .from(schema.scrobbles) 198 + .where(eq(schema.scrobbles.uri, scrobble.uri)) 199 + .limit(1) 200 + .execute(); 201 + } 202 + await publishScrobble(ctx, newScroble.id); 203 + } catch (err) { 204 + consola.error( 205 + `Failed to sync scrobble ${chalk.cyan(newScroble.id)}:`, 206 + err, 207 + ); 208 + } 209 + }), 210 + ); 211 + } 212 + 213 + async function insertSongs(songs: Record[]) { 214 + await Promise.all( 215 + songs.map(async (song) => { 216 + const value: Song.Record = song.value as Song.Record; 217 + try { 218 + consola.info( 219 + `Inserting song: ${chalk.greenBright(value.title)} ${chalk.cyan(song.uri)}`, 220 + ); 221 + 222 + const [[artist], [album]] = await Promise.all([ 223 + ctx.db 224 + .select() 225 + .from(schema.artists) 226 + .where(eq(schema.artists.name, value.albumArtist)) 227 + .limit(1) 228 + .execute(), 229 + ctx.db 230 + .select() 231 + .from(schema.albums) 232 + .where( 233 + and( 234 + eq(schema.albums.title, value.album), 235 + eq(schema.albums.artist, value.albumArtist), 236 + ), 237 + ) 238 + .limit(1) 239 + .execute(), 240 + ]); 241 + 242 + if (!artist) { 243 + consola.warn( 244 + `Artist not found for song ${chalk.cyan(value.title)}: ${chalk.yellow(value.albumArtist)} — skipping`, 245 + ); 246 + return; 247 + } 248 + if (!album) { 249 + consola.warn( 250 + `Album not found for song ${chalk.cyan(value.title)}: ${chalk.yellow(value.album)} — skipping`, 251 + ); 252 + return; 253 + } 254 + 255 + const trackSha256 = createHash("sha256") 256 + .update( 257 + `${value.title} - ${value.artist} - ${value.album}`.toLowerCase(), 258 + ) 259 + .digest("hex"); 260 + 261 + let [newTrack] = await ctx.db 262 + .insert(schema.tracks) 263 + .values({ 264 + title: value.title, 265 + artist: value.artist, 266 + albumArtist: value.albumArtist, 267 + album: value.album, 268 + uri: song.uri, 269 + albumArt: value.albumArtUrl, 270 + artistUri: artist.uri, 271 + albumUri: album.uri, 272 + sha256: trackSha256, 273 + duration: value.duration, 274 + mbId: value.mbid, 275 + trackNumber: value.trackNumber, 276 + discNumber: value.discNumber, 277 + composer: value.composer, 278 + label: value.label, 279 + lyrics: value.lyrics, 280 + genre: value.genre, 281 + copyrightMessage: value.copyrightMessage, 282 + spotifyLink: value.spotifyLink, 283 + appleMusicLink: value.appleMusicLink, 284 + tidalLink: value.tidalLink, 285 + createdAt: new Date(value.createdAt), 286 + }) 287 + .onConflictDoNothing() 288 + .returning() 289 + .execute(); 290 + 291 + if (!newTrack) { 292 + const [existingTrack] = await ctx.db 293 + .select() 294 + .from(schema.tracks) 295 + .where( 296 + value.mbid 297 + ? or( 298 + eq(schema.tracks.mbId, value.mbid), 299 + eq(schema.tracks.sha256, trackSha256), 300 + ) 301 + : eq(schema.tracks.sha256, trackSha256), 302 + ) 303 + .limit(1) 304 + .execute(); 305 + newTrack = existingTrack; 306 + if (!existingTrack) { 307 + consola.warn( 308 + `Track not found after conflict for song ${chalk.cyan(value.title)} ${value.mbid} — skipping`, 309 + ); 310 + return; 311 + } 312 + } 313 + 314 + await Promise.all([ 315 + ctx.db 316 + .insert(schema.userTracks) 317 + .values({ 318 + userId: user.id, 319 + trackId: newTrack.id, 320 + uri: song.uri, 321 + }) 322 + .onConflictDoNothing() 323 + .returning() 324 + .execute(), 325 + ctx.db 326 + .insert(schema.albumTracks) 327 + .values({ 328 + albumId: album.id, 329 + trackId: newTrack.id, 330 + }) 331 + .onConflictDoNothing() 332 + .execute(), 333 + ]); 334 + } catch (error) { 335 + const metadata = `${value.title} - ${value.artist} - ${value.album}`; 336 + consola.error( 337 + `Failed to insert song record with URI ${chalk.cyan(metadata)} ${song.uri} ${createHash( 338 + "sha256", 339 + ) 340 + .update( 341 + `${value.title} - ${value.artist} - ${value.album}`.toLowerCase(), 342 + ) 343 + .digest("hex")}`, 344 + error, 345 + ); 346 + consola.info(JSON.stringify(value, null, 2)); 347 + } 348 + }), 349 + ); 350 + } 351 + 352 + async function insertArtists(artists: Record[]) { 353 + await Promise.all( 354 + artists.map(async (artist) => { 355 + const value: Artist.Record = artist.value as Artist.Record; 356 + consola.info( 357 + `Inserting artist: ${chalk.greenBright(value.name)} ${chalk.cyan(artist.uri)}`, 358 + ); 359 + const sha256 = createHash("sha256") 360 + .update(value.name.toLowerCase()) 361 + .digest("hex"); 362 + 363 + let [newArtist] = await ctx.db 364 + .insert(schema.artists) 365 + .values({ 366 + uri: artist.uri, 367 + name: value.name, 368 + sha256, 369 + picture: value.pictureUrl, 370 + genres: value.tags, 371 + createdAt: new Date(value.createdAt), 372 + }) 373 + .onConflictDoNothing() 374 + .returning() 375 + .execute(); 376 + 377 + if (!newArtist) { 378 + const [existingArtist] = await ctx.db 379 + .select() 380 + .from(schema.artists) 381 + .where(eq(schema.artists.sha256, sha256)) 382 + .limit(1) 383 + .execute(); 384 + newArtist = existingArtist; 385 + } 386 + 387 + await ctx.db 388 + .insert(schema.userArtists) 389 + .values({ 390 + userId: user.id, 391 + artistId: newArtist.id, 392 + uri: artist.uri, 393 + }) 394 + .onConflictDoNothing() 395 + .execute(); 396 + }), 397 + ); 398 + } 399 + 400 + async function insertAlbums(albums: Record[]) { 401 + await Promise.all( 402 + albums.map(async (album) => { 403 + const value: Album.Record = album.value as Album.Record; 404 + consola.info( 405 + `Inserting album: ${chalk.greenBright(value.title)} ${chalk.cyan(album.uri)}`, 406 + ); 407 + 408 + const sha256 = createHash("sha256") 409 + .update(`${value.title} - ${value.artist}`.toLowerCase()) 410 + .digest("hex"); 411 + 412 + let [newAlbum] = await ctx.db 413 + .insert(schema.albums) 414 + .values({ 415 + title: value.title, 416 + artist: value.artist, 417 + uri: album.uri, 418 + albumArt: value.albumArtUrl, 419 + sha256, 420 + year: value.year, 421 + releaseDate: value.releaseDate, 422 + }) 423 + .onConflictDoNothing() 424 + .returning() 425 + .execute(); 426 + 427 + if (!newAlbum) { 428 + const [existingAlbum] = await ctx.db 429 + .select() 430 + .from(schema.albums) 431 + .where(eq(schema.albums.sha256, sha256)) 432 + .limit(1) 433 + .execute(); 434 + newAlbum = existingAlbum; 435 + } 436 + 437 + const [artist] = await ctx.db 438 + .select() 439 + .from(schema.artists) 440 + .where(eq(schema.artists.name, value.artist)) 441 + .limit(1) 442 + .execute(); 443 + 444 + await Promise.all([ 445 + ctx.db 446 + .insert(schema.userAlbums) 447 + .values({ 448 + userId: user.id, 449 + albumId: newAlbum.id, 450 + uri: album.uri, 451 + }) 452 + .onConflictDoNothing() 453 + .execute(), 454 + ctx.db 455 + .insert(schema.artistAlbums) 456 + .values({ 457 + albumId: newAlbum.id, 458 + artistId: artist.id, 459 + }) 460 + .onConflictDoNothing() 461 + .execute(), 462 + ]); 463 + }), 464 + ); 465 + } 71 466 72 467 async function main() { 73 468 const agent = await getAtpAgent(did); ··· 75 470 const songs = await getSongRecords(agent, did); 76 471 const artists = await getArtistRecords(agent, did); 77 472 const albums = await getAlbumRecords(agent, did); 78 - console.log(scrobbles); 79 - console.log(songs); 80 - console.log(artists); 81 - console.log(albums); 473 + 474 + await insertArtists(artists); 475 + await insertAlbums(albums); 476 + await insertSongs(songs); 477 + await insertScrobbles(scrobbles); 82 478 83 479 consola.success(`${chalk.cyan(args[0])} Collections fetched successfully!`); 84 480