A decentralized music tracking and discovery platform built on AT Protocol 🎵 rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz
98
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: enhance scrobble handling with additional URI fields and synchronization improvements

+155 -3
+19 -2
crates/feed/src/repo/duckdb/scrobble.rs
··· 66 66 release_date, 67 67 album_art, 68 68 year, 69 + uri, 69 70 sha256 70 71 ) VALUES ( 72 + ?, 71 73 ?, 72 74 ?, 73 75 ?, ··· 83 85 record.release_date, 84 86 record.album_art_url, 85 87 record.year, 88 + record.album_uri, 86 89 album_hash, 87 90 ], 88 91 ) { ··· 97 100 id, 98 101 name, 99 102 sha256, 103 + picture, 104 + uri, 100 105 tags 101 106 ) VALUES ( 107 + ?, 102 108 ?, 103 109 ?, 104 110 ?, 111 + ?, 105 112 [{}] 106 113 )", 107 114 record ··· 114 121 .join(", ")) 115 122 .unwrap_or_default() 116 123 ), 117 - params![xid::new().to_string(), record.album_artist, artist_hash], 124 + params![ 125 + xid::new().to_string(), 126 + record.album_artist, 127 + artist_hash, 128 + record.artist_picture, 129 + record.artist_uri 130 + ], 118 131 ) { 119 132 Ok(x) => tracing::info!("Artist inserted or already exists {}", x), 120 133 Err(e) => tracing::error!(error = %e, "Error inserting artist"), ··· 143 156 composer, 144 157 duration, 145 158 mb_id, 159 + uri, 146 160 sha256 147 161 ) VALUES ( 162 + ?, 148 163 ?, 149 164 ?, 150 165 ?, ··· 184 199 record.composer, 185 200 record.duration, 186 201 record.mbid, 202 + record.song_uri, 187 203 track_hash, 188 204 ], 189 205 ) { ··· 303 319 (SELECT id FROM albums WHERE sha256 = ?), 304 320 (SELECT id FROM artists WHERE sha256 = ?), 305 321 ?, 306 - CURRENT_TIMESTAMP, 322 + ?, 307 323 )", 308 324 params![ 309 325 xid::new().to_string(), ··· 312 328 album_hash, 313 329 artist_hash, 314 330 uri, 331 + record.created_at, 315 332 ], 316 333 ) { 317 334 Ok(x) => tracing::info!("Scrobble inserted {}", x),
+128 -1
crates/feed/src/sync.rs
··· 1 1 use std::env; 2 2 3 3 use anyhow::Error; 4 - use sqlx::postgres::PgPoolOptions; 4 + use owo_colors::OwoColorize; 5 + use sqlx::postgres::{PgPoolOptions, PgRow}; 6 + use sqlx::Row; 5 7 6 8 use crate::repo::{duckdb::DuckdbRepo, Repo, RepoImpl}; 9 + use crate::types::ScrobbleRecord; 7 10 8 11 pub async fn sync_scrobbles() -> Result<(), Error> { 9 12 tracing::info!("Starting scrobble synchronization..."); 13 + 14 + let (tx, mut rx) = tokio::sync::mpsc::channel::<PgRow>(500); 15 + 16 + let handle = tokio::spawn(async move { 17 + let pool = PgPoolOptions::new() 18 + .max_connections(5) 19 + .connect(&env::var("XATA_POSTGRES_URL")?) 20 + .await?; 21 + 22 + const BATCH_SIZE: i64 = 1000; 23 + 24 + let total_scrobbles: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM scrobbles") 25 + .fetch_one(&pool) 26 + .await?; 27 + let total_scrobbles = total_scrobbles.0; 28 + tracing::info!(total = %total_scrobbles.magenta(), "Total scrobbles to sync"); 29 + 30 + for offset in (0..total_scrobbles).step_by(BATCH_SIZE as usize) { 31 + tracing::info!( 32 + offset = %(offset).magenta(), 33 + end = %(offset + BATCH_SIZE).magenta(), 34 + "Syncing scrobbles batch:", 35 + ); 36 + let result = sqlx::query( 37 + r#" 38 + SELECT 39 + s.xata_id, 40 + s.user_id, 41 + s.track_id, 42 + s.album_id, 43 + s.artist_id, 44 + s.uri, 45 + s.timestamp, 46 + a.*, 47 + ar.*, 48 + t.*, 49 + u.*, 50 + a.uri AS album_uri, 51 + ar.uri AS artist_uri, 52 + t.uri AS track_uri, 53 + a.title AS album_title, 54 + a.youtube_link AS album_youtube_link, 55 + a.spotify_link AS album_spotify_link, 56 + a.tidal_link AS album_tidal_link, 57 + a.apple_music_link AS album_apple_music_link 58 + FROM scrobbles s 59 + LEFT JOIN albums a ON s.album_id = a.xata_id 60 + LEFT JOIN artists ar ON s.artist_id = ar.xata_id 61 + LEFT JOIN tracks t ON s.track_id = t.xata_id 62 + LEFT JOIN users u ON s.user_id = u.xata_id 63 + LIMIT $1 OFFSET $2 64 + "#, 65 + ) 66 + .bind(BATCH_SIZE) 67 + .bind(offset as i64) 68 + .fetch_all(&pool) 69 + .await?; 70 + 71 + for row in result { 72 + tx.send(row).await?; 73 + } 74 + } 75 + 76 + Ok::<(), Error>(()) 77 + }); 78 + 79 + let mut i = 1; 80 + 10 81 let pool = PgPoolOptions::new() 11 82 .max_connections(5) 12 83 .connect(&env::var("XATA_POSTGRES_URL")?) 13 84 .await?; 14 85 86 + let total_scrobbles: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM scrobbles") 87 + .fetch_one(&pool) 88 + .await?; 89 + let total_scrobbles = total_scrobbles.0; 90 + 15 91 let repo = RepoImpl::Duckdb(DuckdbRepo::new().await?); 16 92 repo.create_tables().await?; 17 93 94 + while let Some(row) = rx.recv().await { 95 + // println!("{:#?}", row); 96 + tracing::info!(count = %i.magenta(), total = %total_scrobbles.magenta(), "Inserting scrobble..."); 97 + 98 + let scrobble_uri = row.get::<Option<String>, _>("uri"); 99 + if scrobble_uri.is_none() { 100 + tracing::warn!(count = %i.magenta(), "Skipping scrobble with no URI"); 101 + continue; 102 + } 103 + let scrobble_uri = scrobble_uri.unwrap(); 104 + 105 + let did = row.get::<String, _>("did"); 106 + let record: ScrobbleRecord = ScrobbleRecord { 107 + track_number: row.get::<Option<i32>, _>("track_number"), 108 + disc_number: row.get::<Option<i32>, _>("disc_number"), 109 + title: row.get::<String, _>("title"), 110 + artist: row.get::<String, _>("artist"), 111 + album_artist: row.get::<String, _>("album_artist"), 112 + album_art_url: row.get::<Option<String>, _>("album_art"), 113 + album: row.get::<String, _>("album"), 114 + duration: row.get::<i32, _>("duration"), 115 + release_date: row.get::<Option<String>, _>("release_date"), 116 + year: row.get::<Option<i32>, _>("year"), 117 + genre: row.get::<Option<String>, _>("genre"), 118 + tags: row.get::<Option<Vec<String>>, _>("genres"), 119 + composer: row.get::<Option<String>, _>("composer"), 120 + lyrics: row.get::<Option<String>, _>("lyrics"), 121 + copyright_message: row.get::<Option<String>, _>("copyright_message"), 122 + wiki: None, 123 + youtube_link: row.get::<Option<String>, _>("youtube_link"), 124 + spotify_link: row.get::<Option<String>, _>("spotify_link"), 125 + tidal_link: row.get::<Option<String>, _>("tidal_link"), 126 + apple_music_link: row.get::<Option<String>, _>("apple_music_link"), 127 + created_at: row 128 + .get::<chrono::DateTime<chrono::Utc>, _>("timestamp") 129 + .to_rfc3339(), 130 + label: row.get::<Option<String>, _>("label"), 131 + mbid: row.get::<Option<String>, _>("mb_id"), 132 + artist_picture: row.get::<Option<String>, _>("picture"), 133 + artist_uri: row.get::<Option<String>, _>("artist_uri"), 134 + album_uri: row.get::<Option<String>, _>("album_uri"), 135 + song_uri: row.get::<Option<String>, _>("track_uri"), 136 + }; 137 + 138 + let repo = DuckdbRepo::new().await?; 139 + repo.insert_scrobble(&did, &scrobble_uri, record).await?; 140 + 141 + i += 1; 142 + } 143 + 144 + handle.await??; 18 145 Ok(()) 19 146 }
+8
crates/feed/src/types.rs
··· 121 121 pub label: Option<String>, 122 122 #[serde(skip_serializing_if = "Option::is_none")] 123 123 pub mbid: Option<String>, 124 + #[serde(skip_serializing_if = "Option::is_none")] 125 + pub artist_picture: Option<String>, 126 + #[serde(skip_serializing_if = "Option::is_none")] 127 + pub artist_uri: Option<String>, 128 + #[serde(skip_serializing_if = "Option::is_none")] 129 + pub album_uri: Option<String>, 130 + #[serde(skip_serializing_if = "Option::is_none")] 131 + pub song_uri: Option<String>, 124 132 } 125 133 126 134 #[derive(Debug, Deserialize, Clone)]